mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-13 16:29:49 +08:00
This also removes the outdated comments around authorship of each file. If there is still interest in this information, one can just look at the git history.
396 lines
17 KiB
Python
396 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
|
# SPDX-License-Identifier: MIT
|
|
from os import path, environ
|
|
|
|
import pytest
|
|
import mock
|
|
from mock import patch, PropertyMock, Mock
|
|
import kerberos
|
|
import ldap3
|
|
from werkzeug.exceptions import Unauthorized as FlaskUnauthorized
|
|
|
|
import module_build_service.auth
|
|
import module_build_service.errors
|
|
import module_build_service.config as mbs_config
|
|
from module_build_service import app
|
|
|
|
|
|
class TestAuthModule:
|
|
def test_get_user_no_token(self):
|
|
base_dir = path.abspath(path.dirname(__file__))
|
|
client_secrets = path.join(base_dir, "client_secrets.json")
|
|
with patch.dict(
|
|
"module_build_service.app.config",
|
|
{"OIDC_CLIENT_SECRETS": client_secrets, "OIDC_REQUIRED_SCOPE": "mbs-scope"},
|
|
):
|
|
request = mock.MagicMock()
|
|
request.cookies.return_value = {}
|
|
|
|
with pytest.raises(module_build_service.errors.Unauthorized) as cm:
|
|
with app.app_context():
|
|
module_build_service.auth.get_user(request)
|
|
assert str(cm.value) == "No 'authorization' header found."
|
|
|
|
@patch("module_build_service.auth._get_token_info")
|
|
@patch("module_build_service.auth._get_user_info")
|
|
def test_get_user_failure(self, get_user_info, get_token_info):
|
|
base_dir = path.abspath(path.dirname(__file__))
|
|
client_secrets = path.join(base_dir, "client_secrets.json")
|
|
with patch.dict(
|
|
"module_build_service.app.config",
|
|
{"OIDC_CLIENT_SECRETS": client_secrets, "OIDC_REQUIRED_SCOPE": "mbs-scope"},
|
|
):
|
|
# https://www.youtube.com/watch?v=G-LtddOgUCE
|
|
name = "Joey Jo Jo Junior Shabadoo"
|
|
mocked_get_token_info = {
|
|
"active": False,
|
|
"username": name,
|
|
"scope": ("openid https://id.fedoraproject.org/scope/groups mbs-scope"),
|
|
}
|
|
get_token_info.return_value = mocked_get_token_info
|
|
|
|
get_user_info.return_value = {"groups": ["group"]}
|
|
|
|
headers = {"authorization": "Bearer foobar"}
|
|
request = mock.MagicMock()
|
|
request.headers.return_value = mock.MagicMock(spec_set=dict)
|
|
request.headers.__getitem__.side_effect = headers.__getitem__
|
|
request.headers.__setitem__.side_effect = headers.__setitem__
|
|
request.headers.__contains__.side_effect = headers.__contains__
|
|
|
|
with pytest.raises(module_build_service.errors.Unauthorized) as cm:
|
|
with app.app_context():
|
|
module_build_service.auth.get_user(request)
|
|
assert str(cm.value) == "OIDC token invalid or expired."
|
|
|
|
@pytest.mark.parametrize("allowed_users", (set(), {"Joey Jo Jo Junior Shabadoo"}))
|
|
@patch.object(mbs_config.Config, "allowed_users", new_callable=PropertyMock)
|
|
@patch("module_build_service.auth._get_token_info")
|
|
@patch("module_build_service.auth._get_user_info")
|
|
def test_get_user_good(self, get_user_info, get_token_info, m_allowed_users, allowed_users):
|
|
m_allowed_users.return_value = allowed_users
|
|
base_dir = path.abspath(path.dirname(__file__))
|
|
client_secrets = path.join(base_dir, "client_secrets.json")
|
|
with patch.dict(
|
|
"module_build_service.app.config",
|
|
{"OIDC_CLIENT_SECRETS": client_secrets, "OIDC_REQUIRED_SCOPE": "mbs-scope"},
|
|
):
|
|
# https://www.youtube.com/watch?v=G-LtddOgUCE
|
|
name = "Joey Jo Jo Junior Shabadoo"
|
|
mocked_get_token_info = {
|
|
"active": True,
|
|
"username": name,
|
|
"scope": ("openid https://id.fedoraproject.org/scope/groups mbs-scope"),
|
|
}
|
|
get_token_info.return_value = mocked_get_token_info
|
|
|
|
get_user_info.return_value = {"groups": ["group"]}
|
|
|
|
headers = {"authorization": "Bearer foobar"}
|
|
request = mock.MagicMock()
|
|
request.headers.return_value = mock.MagicMock(spec_set=dict)
|
|
request.headers.__getitem__.side_effect = headers.__getitem__
|
|
request.headers.__setitem__.side_effect = headers.__setitem__
|
|
request.headers.__contains__.side_effect = headers.__contains__
|
|
|
|
with app.app_context():
|
|
username, groups = module_build_service.auth.get_user(request)
|
|
username_second_call, groups_second_call = module_build_service.auth.get_user(
|
|
request)
|
|
assert username == name
|
|
if allowed_users:
|
|
assert groups == set()
|
|
else:
|
|
assert groups == set(get_user_info.return_value["groups"])
|
|
|
|
# Test the real auth method has been called just once.
|
|
get_user_info.assert_called_once()
|
|
assert username_second_call == username
|
|
assert groups_second_call == groups
|
|
|
|
@patch.object(mbs_config.Config, "no_auth", new_callable=PropertyMock, return_value=True)
|
|
def test_disable_authentication(self, conf_no_auth):
|
|
request = mock.MagicMock()
|
|
username, groups = module_build_service.auth.get_user(request)
|
|
assert username == "anonymous"
|
|
assert groups == {"packager"}
|
|
|
|
@patch("module_build_service.auth.client_secrets", None)
|
|
def test_misconfiguring_oidc_client_secrets_should_be_failed(self):
|
|
request = mock.MagicMock()
|
|
with pytest.raises(module_build_service.errors.Forbidden) as cm:
|
|
with app.app_context():
|
|
module_build_service.auth.get_user(request)
|
|
assert str(cm.value) == "OIDC_CLIENT_SECRETS must be set in server config."
|
|
|
|
@patch("module_build_service.auth._get_token_info")
|
|
@patch("module_build_service.auth._get_user_info")
|
|
def test_get_required_scope_not_present(self, get_user_info, get_token_info):
|
|
base_dir = path.abspath(path.dirname(__file__))
|
|
client_secrets = path.join(base_dir, "client_secrets.json")
|
|
with patch.dict(
|
|
"module_build_service.app.config",
|
|
{"OIDC_CLIENT_SECRETS": client_secrets, "OIDC_REQUIRED_SCOPE": "mbs-scope"},
|
|
):
|
|
# https://www.youtube.com/watch?v=G-LtddOgUCE
|
|
name = "Joey Jo Jo Junior Shabadoo"
|
|
mocked_get_token_info = {
|
|
"active": True,
|
|
"username": name,
|
|
"scope": "openid https://id.fedoraproject.org/scope/groups",
|
|
}
|
|
get_token_info.return_value = mocked_get_token_info
|
|
|
|
get_user_info.return_value = {"groups": ["group"]}
|
|
|
|
headers = {"authorization": "Bearer foobar"}
|
|
request = mock.MagicMock()
|
|
request.headers.return_value = mock.MagicMock(spec_set=dict)
|
|
request.headers.__getitem__.side_effect = headers.__getitem__
|
|
request.headers.__setitem__.side_effect = headers.__setitem__
|
|
request.headers.__contains__.side_effect = headers.__contains__
|
|
|
|
with pytest.raises(module_build_service.errors.Unauthorized) as cm:
|
|
with app.app_context():
|
|
module_build_service.auth.get_user(request)
|
|
assert str(cm.value) == (
|
|
"Required OIDC scope 'mbs-scope' not present: "
|
|
"['openid', 'https://id.fedoraproject.org/scope/groups']"
|
|
)
|
|
|
|
@patch("module_build_service.auth._get_token_info")
|
|
@patch("module_build_service.auth._get_user_info")
|
|
def test_get_required_scope_not_set_in_cfg(self, get_user_info, get_token_info):
|
|
base_dir = path.abspath(path.dirname(__file__))
|
|
client_secrets = path.join(base_dir, "client_secrets.json")
|
|
with patch.dict("module_build_service.app.config", {"OIDC_CLIENT_SECRETS": client_secrets}):
|
|
# https://www.youtube.com/watch?v=G-LtddOgUCE
|
|
name = "Joey Jo Jo Junior Shabadoo"
|
|
mocked_get_token_info = {
|
|
"active": True,
|
|
"username": name,
|
|
"scope": "openid https://id.fedoraproject.org/scope/groups",
|
|
}
|
|
get_token_info.return_value = mocked_get_token_info
|
|
|
|
get_user_info.return_value = {"groups": ["group"]}
|
|
|
|
headers = {"authorization": "Bearer foobar"}
|
|
request = mock.MagicMock()
|
|
request.headers.return_value = mock.MagicMock(spec_set=dict)
|
|
request.headers.__getitem__.side_effect = headers.__getitem__
|
|
request.headers.__setitem__.side_effect = headers.__setitem__
|
|
request.headers.__contains__.side_effect = headers.__contains__
|
|
|
|
with pytest.raises(module_build_service.errors.Forbidden) as cm:
|
|
with app.app_context():
|
|
module_build_service.auth.get_user(request)
|
|
assert str(cm.value) == "OIDC_REQUIRED_SCOPE must be set in server config."
|
|
|
|
|
|
class KerberosMockConfig(object):
|
|
def __init__(
|
|
self,
|
|
uri="ldaps://test.example.local:636",
|
|
dn="ou=groups,dc=domain,dc=local",
|
|
kt="/path/to/keytab",
|
|
host="mbs.domain.local",
|
|
):
|
|
"""
|
|
:param uri: a string overriding config.ldap_uri
|
|
:param dn: a string overriding config.ldap_groups_dn
|
|
:param kt: a string overriding config.kerberos_keytab
|
|
:param host: a string overriding config.kerberos_http_host
|
|
"""
|
|
self.uri = uri
|
|
self.dn = dn
|
|
self.kt = kt
|
|
self.host = host
|
|
|
|
def __enter__(self):
|
|
self.auth_method_p = patch.object(
|
|
mbs_config.Config, "auth_method", new_callable=PropertyMock)
|
|
mocked_auth_method = self.auth_method_p.start()
|
|
mocked_auth_method.return_value = "kerberos"
|
|
|
|
self.ldap_uri_p = patch.object(mbs_config.Config, "ldap_uri", new_callable=PropertyMock)
|
|
mocked_ldap_uri = self.ldap_uri_p.start()
|
|
mocked_ldap_uri.return_value = self.uri
|
|
|
|
self.ldap_dn_p = patch.object(
|
|
mbs_config.Config, "ldap_groups_dn", new_callable=PropertyMock)
|
|
mocked_ldap_dn = self.ldap_dn_p.start()
|
|
mocked_ldap_dn.return_value = self.dn
|
|
|
|
self.kerberos_keytab_p = patch.object(
|
|
mbs_config.Config, "kerberos_keytab", new_callable=PropertyMock)
|
|
mocked_kerberos_keytab = self.kerberos_keytab_p.start()
|
|
mocked_kerberos_keytab.return_value = self.kt
|
|
|
|
self.kerberos_http_host_p = patch.object(
|
|
mbs_config.Config, "kerberos_http_host", new_callable=PropertyMock)
|
|
mocked_kerberos_http_host = self.kerberos_http_host_p.start()
|
|
mocked_kerberos_http_host.return_value = self.host
|
|
|
|
def __exit__(self, *args):
|
|
self.auth_method_p.stop()
|
|
self.ldap_uri_p.stop()
|
|
self.ldap_dn_p.stop()
|
|
self.kerberos_keytab_p.stop()
|
|
self.kerberos_http_host_p.stop()
|
|
|
|
|
|
class TestAuthModuleKerberos:
|
|
@pytest.mark.parametrize("allowed_users", (set(), {"mprahl"}))
|
|
@patch("kerberos.authGSSServerInit", return_value=(kerberos.AUTH_GSS_COMPLETE, object()))
|
|
@patch("kerberos.authGSSServerStep", return_value=kerberos.AUTH_GSS_COMPLETE)
|
|
@patch("kerberos.authGSSServerResponse", return_value="STOKEN")
|
|
@patch("kerberos.authGSSServerUserName", return_value="mprahl@EXAMPLE.ORG")
|
|
@patch("kerberos.authGSSServerClean")
|
|
@patch("kerberos.getServerPrincipalDetails")
|
|
@patch.dict("os.environ")
|
|
@patch("module_build_service.auth.stack")
|
|
@patch.object(mbs_config.Config, "allowed_users", new_callable=PropertyMock)
|
|
def test_get_user_kerberos(
|
|
self, m_allowed_users, stack, principal, clean, name, response, step, init, allowed_users
|
|
):
|
|
"""
|
|
Test that authentication works with Kerberos and LDAP
|
|
"""
|
|
m_allowed_users.return_value = allowed_users
|
|
mock_top = Mock()
|
|
stack.return_value = mock_top
|
|
|
|
headers = {"Authorization": "foobar"}
|
|
request = mock.MagicMock()
|
|
request.headers.return_value = mock.MagicMock(spec_set=dict)
|
|
request.headers.__getitem__.side_effect = headers.__getitem__
|
|
request.headers.__setitem__.side_effect = headers.__setitem__
|
|
request.headers.__contains__.side_effect = headers.__contains__
|
|
|
|
# Create the mock LDAP instance
|
|
server = ldap3.Server("ldaps://test.domain.local")
|
|
connection = ldap3.Connection(server, client_strategy=ldap3.MOCK_SYNC)
|
|
base_dn = "dc=domain,dc=local"
|
|
factory_group_attrs = {
|
|
"objectClass": ["top", "posixGroup"],
|
|
"memberUid": ["mprahl", "tbrady"],
|
|
"gidNumber": 1234,
|
|
"cn": ["factory2-devs"],
|
|
}
|
|
devs_group_attrs = {
|
|
"objectClass": ["top", "posixGroup"],
|
|
"memberUid": ["mprahl", "mikeb"],
|
|
"gidNumber": 1235,
|
|
"cn": ["devs"],
|
|
}
|
|
athletes_group_attrs = {
|
|
"objectClass": ["top", "posixGroup"],
|
|
"memberUid": ["tbrady", "rgronkowski"],
|
|
"gidNumber": 1236,
|
|
"cn": ["athletes"],
|
|
}
|
|
mprahl_attrs = {
|
|
"memberOf": ["cn=Employee,ou=groups,{0}".format(base_dn)],
|
|
"uid": ["mprahl"],
|
|
"cn": ["mprahl"],
|
|
"objectClass": ["top", "person"],
|
|
}
|
|
connection.strategy.add_entry(
|
|
"cn=factory2-devs,ou=groups,{0}".format(base_dn), factory_group_attrs
|
|
)
|
|
connection.strategy.add_entry(
|
|
"cn=athletes,ou=groups,{0}".format(base_dn), athletes_group_attrs
|
|
)
|
|
connection.strategy.add_entry("cn=devs,ou=groups,{0}".format(base_dn), devs_group_attrs)
|
|
connection.strategy.add_entry("cn=mprahl,ou=users,{0}".format(base_dn), mprahl_attrs)
|
|
|
|
# If the user is in allowed_users, then group membership is not checked, and an empty set
|
|
# is just returned for the groups
|
|
if allowed_users:
|
|
expected_groups = set()
|
|
else:
|
|
expected_groups = {"devs", "factory2-devs"}
|
|
|
|
with patch("ldap3.Connection") as mock_ldap_con, KerberosMockConfig():
|
|
mock_ldap_con.return_value = connection
|
|
assert module_build_service.auth.get_user_kerberos(request) == (
|
|
"mprahl", expected_groups)
|
|
|
|
def test_auth_header_not_set(self):
|
|
"""
|
|
Test that an Unauthorized exception is returned when there is no authorization header
|
|
set.
|
|
"""
|
|
headers = {}
|
|
request = mock.MagicMock()
|
|
request.headers.return_value = mock.MagicMock(spec_set=dict)
|
|
request.headers.__getitem__.side_effect = headers.__getitem__
|
|
request.headers.__setitem__.side_effect = headers.__setitem__
|
|
request.headers.__contains__.side_effect = headers.__contains__
|
|
|
|
with KerberosMockConfig():
|
|
try:
|
|
module_build_service.auth.get_user_kerberos(request)
|
|
assert False, "Unauthorized error not raised"
|
|
except FlaskUnauthorized as error:
|
|
assert error.response.www_authenticate.to_header().strip() == "Negotiate"
|
|
assert error.response.status == "401 UNAUTHORIZED"
|
|
|
|
@patch.dict(environ)
|
|
def test_keytab_not_set(self):
|
|
"""
|
|
Test that authentication fails when the keytab is not set
|
|
"""
|
|
if "KRB5_KTNAME" in environ:
|
|
del environ["KRB5_KTNAME"]
|
|
|
|
headers = {"Authorization": "foobar"}
|
|
request = mock.MagicMock()
|
|
request.headers.return_value = mock.MagicMock(spec_set=dict)
|
|
request.headers.__getitem__.side_effect = headers.__getitem__
|
|
request.headers.__setitem__.side_effect = headers.__setitem__
|
|
request.headers.__contains__.side_effect = headers.__contains__
|
|
|
|
with KerberosMockConfig(kt=""):
|
|
try:
|
|
module_build_service.auth.get_user_kerberos(request)
|
|
assert False, "Unauthorized error not raised"
|
|
except module_build_service.errors.Unauthorized as error:
|
|
assert str(error) == (
|
|
'Kerberos: set the config value of "KERBEROS_KEYTAB" '
|
|
'or the environment variable "KRB5_KTNAME" to your keytab file'
|
|
)
|
|
|
|
# Set the return value to something not 0 (continue) or 1 (complete)
|
|
@patch("kerberos.authGSSServerInit", return_value=(100, object()))
|
|
@patch("kerberos.authGSSServerStep", return_value=kerberos.AUTH_GSS_COMPLETE)
|
|
@patch("kerberos.authGSSServerResponse", return_value="STOKEN")
|
|
@patch("kerberos.authGSSServerUserName", return_value="mprahl@EXAMPLE.ORG")
|
|
@patch("kerberos.authGSSServerClean")
|
|
@patch("kerberos.getServerPrincipalDetails")
|
|
@patch.dict("os.environ")
|
|
@patch("module_build_service.auth.stack")
|
|
def test_get_user_kerberos_invalid_ticket(
|
|
self, stack, principal, clean, name, response, step, init
|
|
):
|
|
"""
|
|
Test that authentication fails with an invalid Kerberos ticket
|
|
"""
|
|
mock_top = Mock()
|
|
stack.return_value = mock_top
|
|
|
|
headers = {"Authorization": "foobar"}
|
|
request = mock.MagicMock()
|
|
request.headers.return_value = mock.MagicMock(spec_set=dict)
|
|
request.headers.__getitem__.side_effect = headers.__getitem__
|
|
request.headers.__setitem__.side_effect = headers.__setitem__
|
|
request.headers.__contains__.side_effect = headers.__contains__
|
|
|
|
with KerberosMockConfig():
|
|
try:
|
|
module_build_service.auth.get_user_kerberos(request)
|
|
assert False, "Forbidden error not raised"
|
|
except module_build_service.errors.Forbidden as error:
|
|
assert str(error) == ("Invalid Kerberos ticket")
|