mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-01 09:50:33 +08:00
Merge #402 Add OIDC_REQUIRED_SCOPE and fix the test_auth.py to use Authorization header.
This commit is contained in:
@@ -132,6 +132,7 @@ class DevConfiguration(BaseConfiguration):
|
||||
KOJI_REPOSITORY_URL = 'http://kojipkgs.stg.fedoraproject.org/repos'
|
||||
|
||||
OIDC_CLIENT_SECRETS = path.join(confdir, 'client_secrets.json')
|
||||
OIDC_REQUIRED_SCOPE = 'https://mbs.fedoraproject.org/oidc/submit-build'
|
||||
|
||||
SSL_CERTIFICATE_FILE = path.join(confdir, 'server.crt')
|
||||
SSL_CERTIFICATE_KEY_FILE = path.join(confdir, 'server.key')
|
||||
|
||||
@@ -108,11 +108,14 @@ def get_user(request):
|
||||
if not data or not "active" in data or not data["active"]:
|
||||
raise Unauthorized("OIDC token invalid or expired.")
|
||||
|
||||
if not "OIDC_REQUIRED_SCOPE" in app.config:
|
||||
raise Unauthorized("OIDC_REQUIRED_SCOPE must be set in server config.")
|
||||
|
||||
presented_scopes = data['scope'].split(' ')
|
||||
required_scopes = [
|
||||
'openid',
|
||||
'https://id.fedoraproject.org/scope/groups',
|
||||
'https://mbs.fedoraproject.org/oidc/submit-build',
|
||||
app.config["OIDC_REQUIRED_SCOPE"],
|
||||
]
|
||||
for scope in required_scopes:
|
||||
if scope not in presented_scopes:
|
||||
|
||||
13
tests/client_secrets.json
Normal file
13
tests/client_secrets.json
Normal file
@@ -0,0 +1,13 @@
|
||||
{
|
||||
"web": {
|
||||
"auth_uri": "https://id.stg.fedoraproject.org/openidc/Authorization",
|
||||
"client_id": "mbs-authorizer",
|
||||
"client_secret": "notsecret",
|
||||
"redirect_uris": [
|
||||
"http://localhost:13747/"
|
||||
],
|
||||
"token_uri": "https://id.stg.fedoraproject.org/openidc/Token",
|
||||
"token_introspection_uri": "https://id.stg.fedoraproject.org/openidc/TokenInfo",
|
||||
"userinfo_uri": "https://id.stg.fedoraproject.org/openidc/UserInfo"
|
||||
}
|
||||
}
|
||||
@@ -29,46 +29,144 @@ from mock import patch
|
||||
|
||||
import module_build_service.auth
|
||||
import module_build_service.errors
|
||||
from os import path
|
||||
|
||||
|
||||
class TestAuthModule(unittest.TestCase):
|
||||
@raises(module_build_service.errors.Unauthorized)
|
||||
def test_get_user_no_token(self):
|
||||
request = mock.MagicMock()
|
||||
request.cookies.return_value = {}
|
||||
module_build_service.auth.get_user(request)
|
||||
base_dir = path.abspath(path.dirname(__file__))
|
||||
client_secrets = path.join(base_dir, "client_secrets.json")
|
||||
with patch.dict('module_build_service.app.config', {'OIDC_CLIENT_SECRETS': client_secrets,
|
||||
'OIDC_REQUIRED_SCOPE': 'mbs-scope'}):
|
||||
request = mock.MagicMock()
|
||||
request.cookies.return_value = {}
|
||||
|
||||
with self.assertRaises(module_build_service.errors.Unauthorized) as cm:
|
||||
result = module_build_service.auth.get_user(request)
|
||||
|
||||
self.assertEquals(str(cm.exception),
|
||||
"No 'authorization' header found.")
|
||||
|
||||
@raises(module_build_service.errors.Unauthorized)
|
||||
@patch('module_build_service.auth._get_token_info')
|
||||
def test_get_user_failure(self, get_token_info):
|
||||
def mocked_get_token_info(token):
|
||||
return {"active": False}
|
||||
get_token_info.return_value = mocked_get_token_info
|
||||
@patch('module_build_service.auth._get_user_info')
|
||||
def test_get_user_failure(self, get_user_info, get_token_info):
|
||||
base_dir = path.abspath(path.dirname(__file__))
|
||||
client_secrets = path.join(base_dir, "client_secrets.json")
|
||||
with patch.dict('module_build_service.app.config', {'OIDC_CLIENT_SECRETS': client_secrets,
|
||||
'OIDC_REQUIRED_SCOPE': 'mbs-scope'}):
|
||||
# https://www.youtube.com/watch?v=G-LtddOgUCE
|
||||
name = "Joey Jo Jo Junior Shabadoo"
|
||||
mocked_get_token_info = {"active": False, "username": name,
|
||||
"scope": "openid https://id.fedoraproject.org/scope/groups mbs-scope"}
|
||||
get_token_info.return_value = mocked_get_token_info
|
||||
|
||||
request = mock.MagicMock()
|
||||
request.cookies.return_value = {"oidc_token", "1234"}
|
||||
module_build_service.auth.get_user(request)
|
||||
get_user_info.return_value = {"groups":["group"]}
|
||||
|
||||
headers = {"authorization": "Bearer foobar"}
|
||||
request = mock.MagicMock()
|
||||
request.headers.return_value = mock.MagicMock(spec_set=dict)
|
||||
request.headers.__getitem__.side_effect = headers.__getitem__
|
||||
request.headers.__setitem__.side_effect = headers.__setitem__
|
||||
request.headers.__contains__.side_effect = headers.__contains__
|
||||
|
||||
with self.assertRaises(module_build_service.errors.Unauthorized) as cm:
|
||||
result = module_build_service.auth.get_user(request)
|
||||
|
||||
self.assertEquals(str(cm.exception),
|
||||
"OIDC token invalid or expired.")
|
||||
|
||||
@raises(module_build_service.errors.Unauthorized)
|
||||
@patch('module_build_service.auth._get_token_info')
|
||||
def test_get_user_good(self, get_token_info):
|
||||
# https://www.youtube.com/watch?v=G-LtddOgUCE
|
||||
name = "Joey Jo Jo Junior Shabadoo"
|
||||
def mocked_get_token_info(token):
|
||||
return {"active": True, "username": name}
|
||||
get_token_info.return_value = mocked_get_token_info
|
||||
@patch('module_build_service.auth._get_user_info')
|
||||
def test_get_user_good(self, get_user_info, get_token_info):
|
||||
base_dir = path.abspath(path.dirname(__file__))
|
||||
client_secrets = path.join(base_dir, "client_secrets.json")
|
||||
with patch.dict('module_build_service.app.config', {'OIDC_CLIENT_SECRETS': client_secrets,
|
||||
'OIDC_REQUIRED_SCOPE': 'mbs-scope'}):
|
||||
# https://www.youtube.com/watch?v=G-LtddOgUCE
|
||||
name = "Joey Jo Jo Junior Shabadoo"
|
||||
mocked_get_token_info = {"active": True, "username": name,
|
||||
"scope": "openid https://id.fedoraproject.org/scope/groups mbs-scope"}
|
||||
get_token_info.return_value = mocked_get_token_info
|
||||
|
||||
request = mock.MagicMock()
|
||||
request.cookies.return_value = {"oidc_token", "1234"}
|
||||
result = module_build_service.auth.get_user(request)
|
||||
eq_(result, name)
|
||||
get_user_info.return_value = {"groups":["group"]}
|
||||
|
||||
headers = {"authorization": "Bearer foobar"}
|
||||
request = mock.MagicMock()
|
||||
request.headers.return_value = mock.MagicMock(spec_set=dict)
|
||||
request.headers.__getitem__.side_effect = headers.__getitem__
|
||||
request.headers.__setitem__.side_effect = headers.__setitem__
|
||||
request.headers.__contains__.side_effect = headers.__contains__
|
||||
|
||||
username, groups = module_build_service.auth.get_user(request)
|
||||
eq_(username, name)
|
||||
eq_(groups, set(get_user_info.return_value["groups"]))
|
||||
|
||||
def test_disable_authentication(self):
|
||||
with patch.dict('module_build_service.app.config', {'NO_AUTH': True}, clear=True):
|
||||
request = mock.MagicMock()
|
||||
eq_(module_build_service.auth.get_user(request), None)
|
||||
|
||||
@raises(module_build_service.errors.Unauthorized)
|
||||
@patch('module_build_service.auth.client_secrets', None)
|
||||
def test_misconfiguring_oidc_client_secrets_should_be_failed(self):
|
||||
request = mock.MagicMock()
|
||||
module_build_service.auth.get_user(request)
|
||||
with self.assertRaises(module_build_service.errors.Unauthorized) as cm:
|
||||
module_build_service.auth.get_user(request)
|
||||
|
||||
self.assertEquals(str(cm.exception),
|
||||
"OIDC_CLIENT_SECRETS must be set in server config.")
|
||||
|
||||
@patch('module_build_service.auth._get_token_info')
|
||||
@patch('module_build_service.auth._get_user_info')
|
||||
def test_get_required_scope_not_present(self, get_user_info, get_token_info):
|
||||
base_dir = path.abspath(path.dirname(__file__))
|
||||
client_secrets = path.join(base_dir, "client_secrets.json")
|
||||
with patch.dict('module_build_service.app.config', {'OIDC_CLIENT_SECRETS': client_secrets,
|
||||
'OIDC_REQUIRED_SCOPE': 'mbs-scope'}):
|
||||
# https://www.youtube.com/watch?v=G-LtddOgUCE
|
||||
name = "Joey Jo Jo Junior Shabadoo"
|
||||
mocked_get_token_info = {"active": True, "username": name,
|
||||
"scope": "openid https://id.fedoraproject.org/scope/groups"}
|
||||
get_token_info.return_value = mocked_get_token_info
|
||||
|
||||
get_user_info.return_value = {"groups":["group"]}
|
||||
|
||||
headers = {"authorization": "Bearer foobar"}
|
||||
request = mock.MagicMock()
|
||||
request.headers.return_value = mock.MagicMock(spec_set=dict)
|
||||
request.headers.__getitem__.side_effect = headers.__getitem__
|
||||
request.headers.__setitem__.side_effect = headers.__setitem__
|
||||
request.headers.__contains__.side_effect = headers.__contains__
|
||||
|
||||
with self.assertRaises(module_build_service.errors.Unauthorized) as cm:
|
||||
result = module_build_service.auth.get_user(request)
|
||||
|
||||
self.assertEquals(str(cm.exception),
|
||||
"Required OIDC scope 'mbs-scope' not present: "
|
||||
"['openid', 'https://id.fedoraproject.org/scope/groups']")
|
||||
|
||||
@patch('module_build_service.auth._get_token_info')
|
||||
@patch('module_build_service.auth._get_user_info')
|
||||
def test_get_required_scope_not_set_in_cfg(self, get_user_info, get_token_info):
|
||||
base_dir = path.abspath(path.dirname(__file__))
|
||||
client_secrets = path.join(base_dir, "client_secrets.json")
|
||||
with patch.dict('module_build_service.app.config', {'OIDC_CLIENT_SECRETS': client_secrets}):
|
||||
# https://www.youtube.com/watch?v=G-LtddOgUCE
|
||||
name = "Joey Jo Jo Junior Shabadoo"
|
||||
mocked_get_token_info = {"active": True, "username": name,
|
||||
"scope": "openid https://id.fedoraproject.org/scope/groups"}
|
||||
get_token_info.return_value = mocked_get_token_info
|
||||
|
||||
get_user_info.return_value = {"groups":["group"]}
|
||||
|
||||
headers = {"authorization": "Bearer foobar"}
|
||||
request = mock.MagicMock()
|
||||
request.headers.return_value = mock.MagicMock(spec_set=dict)
|
||||
request.headers.__getitem__.side_effect = headers.__getitem__
|
||||
request.headers.__setitem__.side_effect = headers.__setitem__
|
||||
request.headers.__contains__.side_effect = headers.__contains__
|
||||
|
||||
with self.assertRaises(module_build_service.errors.Unauthorized) as cm:
|
||||
result = module_build_service.auth.get_user(request)
|
||||
|
||||
self.assertEquals(str(cm.exception),
|
||||
"OIDC_REQUIRED_SCOPE must be set in server config.")
|
||||
|
||||
Reference in New Issue
Block a user