mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-05 11:48:33 +08:00
Add the allowed_users configuration for service accounts to bypass the group membership check
This is required for monitoring use-cases, where we can have a Kerberos principal for a service account but no associated account in LDAP to check group membership.
This commit is contained in:
@@ -147,14 +147,19 @@ def get_user_oidc(request):
|
||||
log.exception(error)
|
||||
raise Exception(error)
|
||||
|
||||
try:
|
||||
groups = set(extended_data['groups'])
|
||||
except Exception as e:
|
||||
error = "Could not find groups in UserInfo from OIDC %s" % str(e)
|
||||
log.exception(extended_data)
|
||||
raise Exception(error)
|
||||
username = data["username"]
|
||||
# If the user is part of the whitelist, then the group membership check is skipped
|
||||
if username in conf.allowed_users:
|
||||
groups = set()
|
||||
else:
|
||||
try:
|
||||
groups = set(extended_data['groups'])
|
||||
except Exception as e:
|
||||
error = "Could not find groups in UserInfo from OIDC %s" % str(e)
|
||||
log.exception(extended_data)
|
||||
raise Exception(error)
|
||||
|
||||
return data["username"], groups
|
||||
return username, groups
|
||||
|
||||
|
||||
# Insired by https://pagure.io/waiverdb/blob/master/f/waiverdb/auth.py which was
|
||||
@@ -245,7 +250,11 @@ def get_user_kerberos(request):
|
||||
user, kerberos_token = KerberosAuthenticate().process_request(token)
|
||||
# Remove the realm
|
||||
user = user.split('@')[0]
|
||||
groups = get_ldap_group_membership(user)
|
||||
# If the user is part of the whitelist, then the group membership check is skipped
|
||||
if user in conf.allowed_users:
|
||||
groups = []
|
||||
else:
|
||||
groups = get_ldap_group_membership(user)
|
||||
return user, set(groups)
|
||||
|
||||
|
||||
|
||||
@@ -481,6 +481,10 @@ class Config(object):
|
||||
'type': str,
|
||||
'default': 'https://kojipkgs.fedoraproject.org/',
|
||||
'desc': 'URL prefix of base module\'s external repo.'},
|
||||
'allowed_users': {
|
||||
'type': set,
|
||||
'default': set(),
|
||||
'desc': 'The users/service accounts that don\'t require to be part of a group'}
|
||||
}
|
||||
|
||||
def __init__(self, conf_section_obj):
|
||||
|
||||
@@ -162,6 +162,9 @@ class ModuleBuildAPI(AbstractQueryableBuildAPI):
|
||||
|
||||
@staticmethod
|
||||
def check_groups(username, groups, allowed_groups=conf.allowed_groups):
|
||||
# If the user is part of the whitelist, then the group membership check is skipped
|
||||
if username in conf.allowed_users:
|
||||
return
|
||||
if allowed_groups and not (allowed_groups & groups):
|
||||
raise Forbidden("%s is not in any of %r, only %r" % (
|
||||
username, allowed_groups, groups))
|
||||
|
||||
@@ -74,9 +74,12 @@ class TestAuthModule:
|
||||
module_build_service.auth.get_user(request)
|
||||
assert str(cm.value) == "OIDC token invalid or expired."
|
||||
|
||||
@pytest.mark.parametrize('allowed_users', (set(), set(['Joey Jo Jo Junior Shabadoo'])))
|
||||
@patch.object(mbs_config.Config, 'allowed_users', new_callable=PropertyMock)
|
||||
@patch('module_build_service.auth._get_token_info')
|
||||
@patch('module_build_service.auth._get_user_info')
|
||||
def test_get_user_good(self, get_user_info, get_token_info):
|
||||
def test_get_user_good(self, get_user_info, get_token_info, m_allowed_users, allowed_users):
|
||||
m_allowed_users.return_value = allowed_users
|
||||
base_dir = path.abspath(path.dirname(__file__))
|
||||
client_secrets = path.join(base_dir, "client_secrets.json")
|
||||
with patch.dict('module_build_service.app.config', {'OIDC_CLIENT_SECRETS': client_secrets,
|
||||
@@ -99,7 +102,10 @@ class TestAuthModule:
|
||||
|
||||
username, groups = module_build_service.auth.get_user(request)
|
||||
assert username == name
|
||||
assert groups == set(get_user_info.return_value["groups"])
|
||||
if allowed_users:
|
||||
assert groups == set()
|
||||
else:
|
||||
assert groups == set(get_user_info.return_value["groups"])
|
||||
|
||||
@patch.object(mbs_config.Config, 'no_auth', new_callable=PropertyMock, return_value=True)
|
||||
def test_disable_authentication(self, conf_no_auth):
|
||||
@@ -219,6 +225,7 @@ class KerberosMockConfig(object):
|
||||
|
||||
|
||||
class TestAuthModuleKerberos:
|
||||
@pytest.mark.parametrize('allowed_users', (set(), set(['mprahl'])))
|
||||
@patch('kerberos.authGSSServerInit', return_value=(kerberos.AUTH_GSS_COMPLETE, object()))
|
||||
@patch('kerberos.authGSSServerStep', return_value=kerberos.AUTH_GSS_COMPLETE)
|
||||
@patch('kerberos.authGSSServerResponse', return_value='STOKEN')
|
||||
@@ -227,11 +234,13 @@ class TestAuthModuleKerberos:
|
||||
@patch('kerberos.getServerPrincipalDetails')
|
||||
@patch.dict('os.environ')
|
||||
@patch('module_build_service.auth.stack')
|
||||
def test_get_user_kerberos(self, stack, principal, clean, name, response,
|
||||
step, init):
|
||||
@patch.object(mbs_config.Config, 'allowed_users', new_callable=PropertyMock)
|
||||
def test_get_user_kerberos(self, m_allowed_users, stack, principal, clean, name, response,
|
||||
step, init, allowed_users):
|
||||
"""
|
||||
Test that authentication works with Kerberos and LDAP
|
||||
"""
|
||||
m_allowed_users.return_value = allowed_users
|
||||
mock_top = Mock()
|
||||
stack.return_value = mock_top
|
||||
|
||||
@@ -277,10 +286,17 @@ class TestAuthModuleKerberos:
|
||||
connection.strategy.add_entry('cn=devs,ou=groups,{0}'.format(base_dn), devs_group_attrs)
|
||||
connection.strategy.add_entry('cn=mprahl,ou=users,{0}'.format(base_dn), mprahl_attrs)
|
||||
|
||||
groups = {'devs', 'factory2-devs'}
|
||||
# If the user is in allowed_users, then group membership is not checked, and an empty set
|
||||
# is just returned for the groups
|
||||
if allowed_users:
|
||||
expected_groups = set()
|
||||
else:
|
||||
expected_groups = {'devs', 'factory2-devs'}
|
||||
|
||||
with patch('ldap3.Connection') as mock_ldap_con, KerberosMockConfig():
|
||||
mock_ldap_con.return_value = connection
|
||||
assert module_build_service.auth.get_user_kerberos(request) == ('mprahl', groups)
|
||||
assert module_build_service.auth.get_user_kerberos(request) == \
|
||||
('mprahl', expected_groups)
|
||||
|
||||
def test_auth_header_not_set(self):
|
||||
"""
|
||||
|
||||
@@ -1150,6 +1150,22 @@ class TestViews:
|
||||
build = ModuleBuild.query.filter(ModuleBuild.id == result['id']).one()
|
||||
assert (build.owner == result['owner'] == 'foo') is True
|
||||
|
||||
@patch('module_build_service.auth.get_user', return_value=('svc_account', set()))
|
||||
@patch('module_build_service.scm.SCM')
|
||||
@patch('module_build_service.config.Config.allowed_users', new_callable=PropertyMock)
|
||||
def test_submit_build_allowed_users(self, allowed_users, mocked_scm, mocked_get_user):
|
||||
FakeSCM(mocked_scm, 'testmodule', 'testmodule.yaml',
|
||||
'620ec77321b2ea7b0d67d82992dda3e1d67055b4')
|
||||
|
||||
allowed_users.return_value = {'svc_account'}
|
||||
data = {
|
||||
'branch': 'master',
|
||||
'scmurl': 'https://src.stg.fedoraproject.org/modules/'
|
||||
'testmodule.git?#68931c90de214d9d13feefbd35246a81b6cb8d49',
|
||||
}
|
||||
rv = self.client.post('/module-build-service/1/module-builds/', data=json.dumps(data))
|
||||
assert rv.status_code == 201
|
||||
|
||||
@patch('module_build_service.auth.get_user', return_value=anonymous_user)
|
||||
@patch('module_build_service.scm.SCM')
|
||||
@patch("module_build_service.config.Config.no_auth", new_callable=PropertyMock)
|
||||
|
||||
Reference in New Issue
Block a user