Remove KerberosAuthenticate and get kerberos username from REMOTE_USER

Signed-off-by: Chenxiong Qi <cqi@redhat.com>
This commit is contained in:
Chenxiong Qi
2019-10-10 15:21:32 +08:00
parent eeb65c97da
commit 9fd3731ff7
3 changed files with 32 additions and 311 deletions

View File

@@ -2,21 +2,11 @@
# SPDX-License-Identifier: MIT
"""Auth system based on the client certificate and FAS account"""
import json
import os
from socket import gethostname
import ssl
import requests
import kerberos
from flask import Response, g
from flask import g
# Starting with Flask 0.9, the _app_ctx_stack is the correct one,
# before that we need to use the _request_ctx_stack.
try:
from flask import _app_ctx_stack as stack
except ImportError:
from flask import _request_ctx_stack as stack
from werkzeug.exceptions import Unauthorized as FlaskUnauthorized
from dogpile.cache import make_region
from module_build_service.errors import Unauthorized, Forbidden
@@ -141,105 +131,20 @@ def get_user_oidc(request):
return username, groups
# Insired by https://pagure.io/waiverdb/blob/master/f/waiverdb/auth.py which was
# inspired by https://github.com/mkomitee/flask-kerberos/blob/master/flask_kerberos.py
class KerberosAuthenticate(object):
def __init__(self):
if conf.kerberos_http_host:
hostname = conf.kerberos_http_host
else:
hostname = gethostname()
self.service_name = "HTTP@{0}".format(hostname)
# If the config specifies a keytab to use, then override the KRB5_KTNAME
# environment variable
if conf.kerberos_keytab:
os.environ["KRB5_KTNAME"] = conf.kerberos_keytab
if "KRB5_KTNAME" in os.environ:
try:
principal = kerberos.getServerPrincipalDetails("HTTP", hostname)
except kerberos.KrbError as error:
raise Unauthorized('Kerberos: authentication failed with "{0}"'.format(str(error)))
log.debug('Kerberos: server is identifying as "{0}"'.format(principal))
else:
raise Unauthorized(
'Kerberos: set the config value of "KERBEROS_KEYTAB" or the '
'environment variable "KRB5_KTNAME" to your keytab file'
)
def _gssapi_authenticate(self, token):
"""
Performs GSSAPI Negotiate Authentication
On success also stashes the server response token for mutual authentication
at the top of request context with the name kerberos_token, along with the
authenticated user principal with the name kerberos_user.
"""
state = None
ctx = stack.top
try:
rc, state = kerberos.authGSSServerInit(self.service_name)
if rc != kerberos.AUTH_GSS_COMPLETE:
log.error("Kerberos: unable to initialize server context")
return None
rc = kerberos.authGSSServerStep(state, token)
if rc == kerberos.AUTH_GSS_COMPLETE:
log.debug("Kerberos: completed GSSAPI negotiation")
ctx.kerberos_token = kerberos.authGSSServerResponse(state)
ctx.kerberos_user = kerberos.authGSSServerUserName(state)
return rc
elif rc == kerberos.AUTH_GSS_CONTINUE:
log.debug("Kerberos: continuing GSSAPI negotiation")
return kerberos.AUTH_GSS_CONTINUE
else:
log.debug("Kerberos: unable to step server context")
return None
except kerberos.GSSError as error:
log.error("Kerberos: unable to authenticate: {0}".format(str(error)))
return None
finally:
if state:
kerberos.authGSSServerClean(state)
def process_request(self, token):
"""
Authenticates the current request using Kerberos.
"""
kerberos_user = None
kerberos_token = None
ctx = stack.top
rc = self._gssapi_authenticate(token)
if rc == kerberos.AUTH_GSS_COMPLETE:
kerberos_user = ctx.kerberos_user
kerberos_token = ctx.kerberos_token
elif rc != kerberos.AUTH_GSS_CONTINUE:
raise Forbidden("Invalid Kerberos ticket")
return kerberos_user, kerberos_token
def get_user_kerberos(request):
user = None
if "Authorization" not in request.headers:
response = Response("Unauthorized", 401, {"WWW-Authenticate": "Negotiate"})
exc = FlaskUnauthorized()
# For some reason, certain versions of werkzeug raise an exception when passing `response`
# in the constructor. This is a work-around.
exc.response = response
raise exc
header = request.headers.get("Authorization")
token = "".join(header.strip().split()[1:])
user, kerberos_token = KerberosAuthenticate().process_request(token)
remote_user = request.environ.get("REMOTE_USER")
if not remote_user:
raise Unauthorized("REMOTE_USER is not properly set in the request.")
# Remove the realm
user = user.split("@")[0]
username, _ = remote_user.split("@")
# If the user is part of the whitelist, then the group membership check is skipped
if user in conf.allowed_users:
if username in conf.allowed_users:
groups = []
else:
groups = get_ldap_group_membership(user)
return user, set(groups)
groups = get_ldap_group_membership(username)
return username, set(groups)
@region.cache_on_arguments()

View File

@@ -7,7 +7,6 @@ dogpile.cache
fedmsg
funcsigs # Python2 only
futures # Python 2 only
kerberos
kobo>=0.5.0
koji
ldap3

View File

@@ -1,14 +1,11 @@
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: MIT
from os import path, environ
from os import path
import pytest
import requests
import mock
from mock import patch, PropertyMock, Mock
import kerberos
import ldap3
from werkzeug.exceptions import Unauthorized as FlaskUnauthorized
import module_build_service.auth
import module_build_service.errors
@@ -220,209 +217,29 @@ class TestAuthModule:
module_build_service.auth.get_user(request)
assert str(cm.value) == "OIDC_REQUIRED_SCOPE must be set in server config."
@pytest.mark.parametrize("remote_username", ["", None])
def test_get_user_kerberos_unauthorized(self, remote_username):
request = Mock()
request.environ.get.return_value = remote_username
class KerberosMockConfig(object):
def __init__(
self,
uri="ldaps://test.example.local:636",
dn="ou=groups,dc=domain,dc=local",
kt="/path/to/keytab",
host="mbs.domain.local",
):
"""
:param uri: a string overriding config.ldap_uri
:param dn: a string overriding config.ldap_groups_dn
:param kt: a string overriding config.kerberos_keytab
:param host: a string overriding config.kerberos_http_host
"""
self.uri = uri
self.dn = dn
self.kt = kt
self.host = host
with pytest.raises(module_build_service.errors.Unauthorized):
module_build_service.auth.get_user_kerberos(request)
def __enter__(self):
self.auth_method_p = patch.object(
mbs_config.Config, "auth_method", new_callable=PropertyMock)
mocked_auth_method = self.auth_method_p.start()
mocked_auth_method.return_value = "kerberos"
@patch.object(module_build_service.auth.conf, "allowed_users", new=["someone", "somebody"])
def test_get_user_kerberos_user_is_in_allowed_users_group(self):
request = Mock()
request.environ.get.return_value = "someone@realm"
self.ldap_uri_p = patch.object(mbs_config.Config, "ldap_uri", new_callable=PropertyMock)
mocked_ldap_uri = self.ldap_uri_p.start()
mocked_ldap_uri.return_value = self.uri
username, groups = module_build_service.auth.get_user_kerberos(request)
assert "someone" == username
assert set() == groups
self.ldap_dn_p = patch.object(
mbs_config.Config, "ldap_groups_dn", new_callable=PropertyMock)
mocked_ldap_dn = self.ldap_dn_p.start()
mocked_ldap_dn.return_value = self.dn
@patch.object(module_build_service.auth.conf, "allowed_users", new=["someone", "somebody"])
@patch("module_build_service.auth.get_ldap_group_membership", return_value=["group1", "group2"])
def test_get_user_kerberos_user_is_not_in_allowed_users_group(self, get_ldap_group_membership):
request = Mock()
request.environ.get.return_value = "x-man@realm"
self.kerberos_keytab_p = patch.object(
mbs_config.Config, "kerberos_keytab", new_callable=PropertyMock)
mocked_kerberos_keytab = self.kerberos_keytab_p.start()
mocked_kerberos_keytab.return_value = self.kt
self.kerberos_http_host_p = patch.object(
mbs_config.Config, "kerberos_http_host", new_callable=PropertyMock)
mocked_kerberos_http_host = self.kerberos_http_host_p.start()
mocked_kerberos_http_host.return_value = self.host
def __exit__(self, *args):
self.auth_method_p.stop()
self.ldap_uri_p.stop()
self.ldap_dn_p.stop()
self.kerberos_keytab_p.stop()
self.kerberos_http_host_p.stop()
class TestAuthModuleKerberos:
@pytest.mark.parametrize("allowed_users", (set(), {"mprahl"}))
@patch("kerberos.authGSSServerInit", return_value=(kerberos.AUTH_GSS_COMPLETE, object()))
@patch("kerberos.authGSSServerStep", return_value=kerberos.AUTH_GSS_COMPLETE)
@patch("kerberos.authGSSServerResponse", return_value="STOKEN")
@patch("kerberos.authGSSServerUserName", return_value="mprahl@EXAMPLE.ORG")
@patch("kerberos.authGSSServerClean")
@patch("kerberos.getServerPrincipalDetails")
@patch.dict("os.environ")
@patch("module_build_service.auth.stack")
@patch.object(mbs_config.Config, "allowed_users", new_callable=PropertyMock)
def test_get_user_kerberos(
self, m_allowed_users, stack, principal, clean, name, response, step, init, allowed_users
):
"""
Test that authentication works with Kerberos and LDAP
"""
m_allowed_users.return_value = allowed_users
mock_top = Mock()
stack.return_value = mock_top
headers = {"Authorization": "foobar"}
request = mock.MagicMock()
request.headers.return_value = mock.MagicMock(spec_set=dict)
request.headers.__getitem__.side_effect = headers.__getitem__
request.headers.__setitem__.side_effect = headers.__setitem__
request.headers.__contains__.side_effect = headers.__contains__
# Create the mock LDAP instance
server = ldap3.Server("ldaps://test.domain.local")
connection = ldap3.Connection(server, client_strategy=ldap3.MOCK_SYNC)
base_dn = "dc=domain,dc=local"
factory_group_attrs = {
"objectClass": ["top", "posixGroup"],
"memberUid": ["mprahl", "tbrady"],
"gidNumber": 1234,
"cn": ["factory2-devs"],
}
devs_group_attrs = {
"objectClass": ["top", "posixGroup"],
"memberUid": ["mprahl", "mikeb"],
"gidNumber": 1235,
"cn": ["devs"],
}
athletes_group_attrs = {
"objectClass": ["top", "posixGroup"],
"memberUid": ["tbrady", "rgronkowski"],
"gidNumber": 1236,
"cn": ["athletes"],
}
mprahl_attrs = {
"memberOf": ["cn=Employee,ou=groups,{0}".format(base_dn)],
"uid": ["mprahl"],
"cn": ["mprahl"],
"objectClass": ["top", "person"],
}
connection.strategy.add_entry(
"cn=factory2-devs,ou=groups,{0}".format(base_dn), factory_group_attrs
)
connection.strategy.add_entry(
"cn=athletes,ou=groups,{0}".format(base_dn), athletes_group_attrs
)
connection.strategy.add_entry("cn=devs,ou=groups,{0}".format(base_dn), devs_group_attrs)
connection.strategy.add_entry("cn=mprahl,ou=users,{0}".format(base_dn), mprahl_attrs)
# If the user is in allowed_users, then group membership is not checked, and an empty set
# is just returned for the groups
if allowed_users:
expected_groups = set()
else:
expected_groups = {"devs", "factory2-devs"}
with patch("ldap3.Connection") as mock_ldap_con, KerberosMockConfig():
mock_ldap_con.return_value = connection
assert module_build_service.auth.get_user_kerberos(request) == (
"mprahl", expected_groups)
def test_auth_header_not_set(self):
"""
Test that an Unauthorized exception is returned when there is no authorization header
set.
"""
headers = {}
request = mock.MagicMock()
request.headers.return_value = mock.MagicMock(spec_set=dict)
request.headers.__getitem__.side_effect = headers.__getitem__
request.headers.__setitem__.side_effect = headers.__setitem__
request.headers.__contains__.side_effect = headers.__contains__
with KerberosMockConfig():
try:
module_build_service.auth.get_user_kerberos(request)
assert False, "Unauthorized error not raised"
except FlaskUnauthorized as error:
assert error.response.www_authenticate.to_header().strip() == "Negotiate"
assert error.response.status == "401 UNAUTHORIZED"
@patch.dict(environ)
def test_keytab_not_set(self):
"""
Test that authentication fails when the keytab is not set
"""
if "KRB5_KTNAME" in environ:
del environ["KRB5_KTNAME"]
headers = {"Authorization": "foobar"}
request = mock.MagicMock()
request.headers.return_value = mock.MagicMock(spec_set=dict)
request.headers.__getitem__.side_effect = headers.__getitem__
request.headers.__setitem__.side_effect = headers.__setitem__
request.headers.__contains__.side_effect = headers.__contains__
with KerberosMockConfig(kt=""):
try:
module_build_service.auth.get_user_kerberos(request)
assert False, "Unauthorized error not raised"
except module_build_service.errors.Unauthorized as error:
assert str(error) == (
'Kerberos: set the config value of "KERBEROS_KEYTAB" '
'or the environment variable "KRB5_KTNAME" to your keytab file'
)
# Set the return value to something not 0 (continue) or 1 (complete)
@patch("kerberos.authGSSServerInit", return_value=(100, object()))
@patch("kerberos.authGSSServerStep", return_value=kerberos.AUTH_GSS_COMPLETE)
@patch("kerberos.authGSSServerResponse", return_value="STOKEN")
@patch("kerberos.authGSSServerUserName", return_value="mprahl@EXAMPLE.ORG")
@patch("kerberos.authGSSServerClean")
@patch("kerberos.getServerPrincipalDetails")
@patch.dict("os.environ")
@patch("module_build_service.auth.stack")
def test_get_user_kerberos_invalid_ticket(
self, stack, principal, clean, name, response, step, init
):
"""
Test that authentication fails with an invalid Kerberos ticket
"""
mock_top = Mock()
stack.return_value = mock_top
headers = {"Authorization": "foobar"}
request = mock.MagicMock()
request.headers.return_value = mock.MagicMock(spec_set=dict)
request.headers.__getitem__.side_effect = headers.__getitem__
request.headers.__setitem__.side_effect = headers.__setitem__
request.headers.__contains__.side_effect = headers.__contains__
with KerberosMockConfig():
try:
module_build_service.auth.get_user_kerberos(request)
assert False, "Forbidden error not raised"
except module_build_service.errors.Forbidden as error:
assert str(error) == ("Invalid Kerberos ticket")
username, groups = module_build_service.auth.get_user_kerberos(request)
assert "x-man" == username
assert {"group1", "group2"} == groups