Merge #681 Add Kerberos + LDAP Authentication

This commit is contained in:
Ralph Bean
2017-09-15 16:10:22 +00:00
7 changed files with 468 additions and 22 deletions

View File

@@ -19,16 +19,21 @@
# SOFTWARE.
#
# Written by Ralph Bean <rbean@redhat.com>
# Written by Matt Prahl <mprahl@redhat.com>
from os import path, environ
from nose.tools import eq_
import unittest
import mock
from mock import patch
from mock import patch, PropertyMock, Mock
import kerberos
import ldap3
from flask import Response
from werkzeug.exceptions import Unauthorized as FlaskUnauthorized
import module_build_service.auth
import module_build_service.errors
from os import path
import module_build_service.config as mbs_config
class TestAuthModule(unittest.TestCase):
@@ -100,12 +105,12 @@ class TestAuthModule(unittest.TestCase):
eq_(username, name)
eq_(groups, set(get_user_info.return_value["groups"]))
def test_disable_authentication(self):
with patch.dict('module_build_service.app.config', {'NO_AUTH': True}, clear=True):
request = mock.MagicMock()
username, groups = module_build_service.auth.get_user(request)
eq_(username, "anonymous")
eq_(groups, {"packager"})
@patch.object(mbs_config.Config, 'no_auth', new_callable=PropertyMock, return_value=True)
def test_disable_authentication(self, conf_no_auth):
request = mock.MagicMock()
username, groups = module_build_service.auth.get_user(request)
eq_(username, "anonymous")
eq_(groups, {"packager"})
@patch('module_build_service.auth.client_secrets', None)
def test_misconfiguring_oidc_client_secrets_should_be_failed(self):
@@ -173,3 +178,191 @@ class TestAuthModule(unittest.TestCase):
self.assertEquals(str(cm.exception),
"OIDC_REQUIRED_SCOPE must be set in server config.")
class KerberosMockConfig(object):
def __init__(self, uri='ldaps://test.example.local:636', dn='ou=groups,dc=domain,dc=local',
kt='/path/to/keytab', host='mbs.domain.local'):
"""
:param uri: a string overriding config.ldap_uri
:param dn: a string overriding config.ldap_groups_dn
:param kt: a string overriding config.kerberos_keytab
:param host: a string overriding config.kerberos_http_host
"""
self.uri = uri
self.dn = dn
self.kt = kt
self.host = host
def __enter__(self):
self.auth_method_p = patch.object(
mbs_config.Config, 'auth_method', new_callable=PropertyMock)
mocked_auth_method = self.auth_method_p.start()
mocked_auth_method.return_value = 'kerberos'
self.ldap_uri_p = patch.object(
mbs_config.Config, 'ldap_uri', new_callable=PropertyMock)
mocked_ldap_uri = self.ldap_uri_p.start()
mocked_ldap_uri.return_value = self.uri
self.ldap_dn_p = patch.object(
mbs_config.Config, 'ldap_groups_dn', new_callable=PropertyMock)
mocked_ldap_dn = self.ldap_dn_p.start()
mocked_ldap_dn.return_value = self.dn
self.kerberos_keytab_p = patch.object(
mbs_config.Config, 'kerberos_keytab', new_callable=PropertyMock)
mocked_kerberos_keytab = self.kerberos_keytab_p.start()
mocked_kerberos_keytab.return_value = self.kt
self.kerberos_http_host_p = patch.object(
mbs_config.Config, 'kerberos_http_host', new_callable=PropertyMock)
mocked_kerberos_http_host = self.kerberos_http_host_p.start()
mocked_kerberos_http_host.return_value = self.host
def __exit__(self, *args):
self.auth_method_p.stop()
self.ldap_uri_p.stop()
self.ldap_dn_p.stop()
self.kerberos_keytab_p.stop()
self.kerberos_http_host_p.stop()
class TestAuthModuleKerberos(unittest.TestCase):
@patch('kerberos.authGSSServerInit', return_value=(kerberos.AUTH_GSS_COMPLETE, object()))
@patch('kerberos.authGSSServerStep', return_value=kerberos.AUTH_GSS_COMPLETE)
@patch('kerberos.authGSSServerResponse', return_value='STOKEN')
@patch('kerberos.authGSSServerUserName', return_value='mprahl@EXAMPLE.ORG')
@patch('kerberos.authGSSServerClean')
@patch('kerberos.getServerPrincipalDetails')
@patch.dict('os.environ')
@patch('module_build_service.auth.stack')
def test_get_user_kerberos(self, stack, principal, clean, name, response,
step, init):
"""
Test that authentication works with Kerberos and LDAP
"""
mock_top = Mock()
stack.return_value = mock_top
headers = {'Authorization': 'foobar'}
request = mock.MagicMock()
request.headers.return_value = mock.MagicMock(spec_set=dict)
request.headers.__getitem__.side_effect = headers.__getitem__
request.headers.__setitem__.side_effect = headers.__setitem__
request.headers.__contains__.side_effect = headers.__contains__
# Create the mock LDAP instance
server = ldap3.Server('ldaps://test.domain.local')
connection = ldap3.Connection(server, client_strategy=ldap3.MOCK_SYNC)
base_dn = 'dc=domain,dc=local'
factory_group_attrs = {
'objectClass': ['top', 'posixGroup'],
'memberUid': ['mprahl', 'tbrady'],
'gidNumber': 1234,
'cn': ['factory2-devs']
}
devs_group_attrs = {
'objectClass': ['top', 'posixGroup'],
'memberUid': ['mprahl', 'mikeb'],
'gidNumber': 1235,
'cn': ['devs']
}
athletes_group_attrs = {
'objectClass': ['top', 'posixGroup'],
'memberUid': ['tbrady', 'rgronkowski'],
'gidNumber': 1236,
'cn': ['athletes']
}
mprahl_attrs = {
'memberOf': ['cn=Employee,ou=groups,{0}'.format(base_dn)],
'uid': ['mprahl'],
'cn': ['mprahl'],
'objectClass': ['top', 'person']
}
connection.strategy.add_entry('cn=factory2-devs,ou=groups,{0}'.format(base_dn),
factory_group_attrs)
connection.strategy.add_entry('cn=athletes,ou=groups,{0}'.format(base_dn),
athletes_group_attrs)
connection.strategy.add_entry('cn=devs,ou=groups,{0}'.format(base_dn), devs_group_attrs)
connection.strategy.add_entry('cn=mprahl,ou=users,{0}'.format(base_dn), mprahl_attrs)
groups = {'devs', 'factory2-devs'}
with patch('ldap3.Connection') as mock_ldap_con, KerberosMockConfig():
mock_ldap_con.return_value = connection
assert module_build_service.auth.get_user_kerberos(request) == ('mprahl', groups)
def test_auth_header_not_set(self):
"""
Test that an Unauthorized exception is returned when there is no authorization header
set.
"""
headers = {}
request = mock.MagicMock()
request.headers.return_value = mock.MagicMock(spec_set=dict)
request.headers.__getitem__.side_effect = headers.__getitem__
request.headers.__setitem__.side_effect = headers.__setitem__
request.headers.__contains__.side_effect = headers.__contains__
with KerberosMockConfig():
try:
module_build_service.auth.get_user_kerberos(request)
assert False, 'Unauthorized error not raised'
except FlaskUnauthorized as error:
assert error.response.www_authenticate.to_header().strip() == 'Negotiate'
assert error.response.status == '401 UNAUTHORIZED'
@patch.dict(environ)
def test_keytab_not_set(self):
"""
Test that authentication fails when the keytab is not set
"""
if 'KRB5_KTNAME' in environ:
del environ['KRB5_KTNAME']
headers = {'Authorization': 'foobar'}
request = mock.MagicMock()
request.headers.return_value = mock.MagicMock(spec_set=dict)
request.headers.__getitem__.side_effect = headers.__getitem__
request.headers.__setitem__.side_effect = headers.__setitem__
request.headers.__contains__.side_effect = headers.__contains__
with KerberosMockConfig(kt=''):
try:
module_build_service.auth.get_user_kerberos(request)
assert False, 'Unauthorized error not raised'
except module_build_service.errors.Unauthorized as error:
assert str(error) == ('Kerberos: set the config value of "KERBEROS_KEYTAB" '
'or the environment variable "KRB5_KTNAME" to your '
'keytab file')
# Set the return value to something not 0 (continue) or 1 (complete)
@patch('kerberos.authGSSServerInit', return_value=(100, object()))
@patch('kerberos.authGSSServerStep', return_value=kerberos.AUTH_GSS_COMPLETE)
@patch('kerberos.authGSSServerResponse', return_value='STOKEN')
@patch('kerberos.authGSSServerUserName', return_value='mprahl@EXAMPLE.ORG')
@patch('kerberos.authGSSServerClean')
@patch('kerberos.getServerPrincipalDetails')
@patch.dict('os.environ')
@patch('module_build_service.auth.stack')
def test_get_user_kerberos_invalid_ticket(self, stack, principal, clean, name, response,
step, init):
"""
Test that authentication fails with an invalid Kerberos ticket
"""
mock_top = Mock()
stack.return_value = mock_top
headers = {'Authorization': 'foobar'}
request = mock.MagicMock()
request.headers.return_value = mock.MagicMock(spec_set=dict)
request.headers.__getitem__.side_effect = headers.__getitem__
request.headers.__setitem__.side_effect = headers.__setitem__
request.headers.__contains__.side_effect = headers.__contains__
with KerberosMockConfig():
try:
module_build_service.auth.get_user_kerberos(request)
assert False, 'Forbidden error not raised'
except module_build_service.errors.Forbidden as error:
assert str(error) == ('Invalid Kerberos ticket')