mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-05-11 18:37:25 +08:00
Format the coding style across the codebase using "black" and manual tweaks
The main benefit of this commit is that the use of double quotes is now consistent.
This commit is contained in:
@@ -31,6 +31,7 @@ import ssl
|
||||
import requests
|
||||
import kerberos
|
||||
from flask import Response, g
|
||||
|
||||
# Starting with Flask 0.9, the _app_ctx_stack is the correct one,
|
||||
# before that we need to use the _request_ctx_stack.
|
||||
try:
|
||||
@@ -50,12 +51,12 @@ except ImportError:
|
||||
|
||||
|
||||
client_secrets = None
|
||||
region = make_region().configure('dogpile.cache.memory')
|
||||
region = make_region().configure("dogpile.cache.memory")
|
||||
|
||||
|
||||
def _json_loads(content):
|
||||
if not isinstance(content, str):
|
||||
content = content.decode('utf-8')
|
||||
content = content.decode("utf-8")
|
||||
return json.loads(content)
|
||||
|
||||
|
||||
@@ -67,8 +68,7 @@ def _load_secrets():
|
||||
if "OIDC_CLIENT_SECRETS" not in app.config:
|
||||
raise Forbidden("OIDC_CLIENT_SECRETS must be set in server config.")
|
||||
|
||||
secrets = _json_loads(open(app.config['OIDC_CLIENT_SECRETS'],
|
||||
'r').read())
|
||||
secrets = _json_loads(open(app.config["OIDC_CLIENT_SECRETS"], "r").read())
|
||||
client_secrets = list(secrets.values())[0]
|
||||
|
||||
|
||||
@@ -79,13 +79,15 @@ def _get_token_info(token):
|
||||
if not client_secrets:
|
||||
return None
|
||||
|
||||
request = {'token': token,
|
||||
'token_type_hint': 'Bearer',
|
||||
'client_id': client_secrets['client_id'],
|
||||
'client_secret': client_secrets['client_secret']}
|
||||
headers = {'Content-type': 'application/x-www-form-urlencoded'}
|
||||
request = {
|
||||
"token": token,
|
||||
"token_type_hint": "Bearer",
|
||||
"client_id": client_secrets["client_id"],
|
||||
"client_secret": client_secrets["client_secret"],
|
||||
}
|
||||
headers = {"Content-type": "application/x-www-form-urlencoded"}
|
||||
|
||||
resp = requests.post(client_secrets['token_introspection_uri'], data=request, headers=headers)
|
||||
resp = requests.post(client_secrets["token_introspection_uri"], data=request, headers=headers)
|
||||
return resp.json()
|
||||
|
||||
|
||||
@@ -96,8 +98,8 @@ def _get_user_info(token):
|
||||
if not client_secrets:
|
||||
return None
|
||||
|
||||
headers = {'authorization': 'Bearer ' + token}
|
||||
resp = requests.get(client_secrets['userinfo_uri'], headers=headers)
|
||||
headers = {"authorization": "Bearer " + token}
|
||||
resp = requests.get(client_secrets["userinfo_uri"], headers=headers)
|
||||
return resp.json()
|
||||
|
||||
|
||||
@@ -110,8 +112,8 @@ def get_user_oidc(request):
|
||||
if "authorization" not in request.headers:
|
||||
raise Unauthorized("No 'authorization' header found.")
|
||||
|
||||
header = request.headers['authorization'].strip()
|
||||
prefix = 'Bearer '
|
||||
header = request.headers["authorization"].strip()
|
||||
prefix = "Bearer "
|
||||
if not header.startswith(prefix):
|
||||
raise Unauthorized("Authorization headers must start with %r" % prefix)
|
||||
|
||||
@@ -129,16 +131,15 @@ def get_user_oidc(request):
|
||||
if "OIDC_REQUIRED_SCOPE" not in app.config:
|
||||
raise Forbidden("OIDC_REQUIRED_SCOPE must be set in server config.")
|
||||
|
||||
presented_scopes = data['scope'].split(' ')
|
||||
presented_scopes = data["scope"].split(" ")
|
||||
required_scopes = [
|
||||
'openid',
|
||||
'https://id.fedoraproject.org/scope/groups',
|
||||
"openid",
|
||||
"https://id.fedoraproject.org/scope/groups",
|
||||
app.config["OIDC_REQUIRED_SCOPE"],
|
||||
]
|
||||
for scope in required_scopes:
|
||||
if scope not in presented_scopes:
|
||||
raise Unauthorized("Required OIDC scope %r not present: %r" % (
|
||||
scope, presented_scopes))
|
||||
raise Unauthorized("Required OIDC scope %r not present: %r" % (scope, presented_scopes))
|
||||
|
||||
try:
|
||||
extended_data = _get_user_info(token)
|
||||
@@ -153,7 +154,7 @@ def get_user_oidc(request):
|
||||
groups = set()
|
||||
else:
|
||||
try:
|
||||
groups = set(extended_data['groups'])
|
||||
groups = set(extended_data["groups"])
|
||||
except Exception as e:
|
||||
error = "Could not find groups in UserInfo from OIDC %s" % str(e)
|
||||
log.exception(extended_data)
|
||||
@@ -175,19 +176,20 @@ class KerberosAuthenticate(object):
|
||||
# If the config specifies a keytab to use, then override the KRB5_KTNAME
|
||||
# environment variable
|
||||
if conf.kerberos_keytab:
|
||||
os.environ['KRB5_KTNAME'] = conf.kerberos_keytab
|
||||
os.environ["KRB5_KTNAME"] = conf.kerberos_keytab
|
||||
|
||||
if 'KRB5_KTNAME' in os.environ:
|
||||
if "KRB5_KTNAME" in os.environ:
|
||||
try:
|
||||
principal = kerberos.getServerPrincipalDetails('HTTP', hostname)
|
||||
principal = kerberos.getServerPrincipalDetails("HTTP", hostname)
|
||||
except kerberos.KrbError as error:
|
||||
raise Unauthorized(
|
||||
'Kerberos: authentication failed with "{0}"'.format(str(error)))
|
||||
raise Unauthorized('Kerberos: authentication failed with "{0}"'.format(str(error)))
|
||||
|
||||
log.debug('Kerberos: server is identifying as "{0}"'.format(principal))
|
||||
else:
|
||||
raise Unauthorized('Kerberos: set the config value of "KERBEROS_KEYTAB" or the '
|
||||
'environment variable "KRB5_KTNAME" to your keytab file')
|
||||
raise Unauthorized(
|
||||
'Kerberos: set the config value of "KERBEROS_KEYTAB" or the '
|
||||
'environment variable "KRB5_KTNAME" to your keytab file'
|
||||
)
|
||||
|
||||
def _gssapi_authenticate(self, token):
|
||||
"""
|
||||
@@ -201,23 +203,23 @@ class KerberosAuthenticate(object):
|
||||
try:
|
||||
rc, state = kerberos.authGSSServerInit(self.service_name)
|
||||
if rc != kerberos.AUTH_GSS_COMPLETE:
|
||||
log.error('Kerberos: unable to initialize server context')
|
||||
log.error("Kerberos: unable to initialize server context")
|
||||
return None
|
||||
|
||||
rc = kerberos.authGSSServerStep(state, token)
|
||||
if rc == kerberos.AUTH_GSS_COMPLETE:
|
||||
log.debug('Kerberos: completed GSSAPI negotiation')
|
||||
log.debug("Kerberos: completed GSSAPI negotiation")
|
||||
ctx.kerberos_token = kerberos.authGSSServerResponse(state)
|
||||
ctx.kerberos_user = kerberos.authGSSServerUserName(state)
|
||||
return rc
|
||||
elif rc == kerberos.AUTH_GSS_CONTINUE:
|
||||
log.debug('Kerberos: continuing GSSAPI negotiation')
|
||||
log.debug("Kerberos: continuing GSSAPI negotiation")
|
||||
return kerberos.AUTH_GSS_CONTINUE
|
||||
else:
|
||||
log.debug('Kerberos: unable to step server context')
|
||||
log.debug("Kerberos: unable to step server context")
|
||||
return None
|
||||
except kerberos.GSSError as error:
|
||||
log.error('Kerberos: unable to authenticate: {0}'.format(str(error)))
|
||||
log.error("Kerberos: unable to authenticate: {0}".format(str(error)))
|
||||
return None
|
||||
finally:
|
||||
if state:
|
||||
@@ -235,25 +237,25 @@ class KerberosAuthenticate(object):
|
||||
kerberos_user = ctx.kerberos_user
|
||||
kerberos_token = ctx.kerberos_token
|
||||
elif rc != kerberos.AUTH_GSS_CONTINUE:
|
||||
raise Forbidden('Invalid Kerberos ticket')
|
||||
raise Forbidden("Invalid Kerberos ticket")
|
||||
|
||||
return kerberos_user, kerberos_token
|
||||
|
||||
|
||||
def get_user_kerberos(request):
|
||||
user = None
|
||||
if 'Authorization' not in request.headers:
|
||||
response = Response('Unauthorized', 401, {'WWW-Authenticate': 'Negotiate'})
|
||||
if "Authorization" not in request.headers:
|
||||
response = Response("Unauthorized", 401, {"WWW-Authenticate": "Negotiate"})
|
||||
exc = FlaskUnauthorized()
|
||||
# For some reason, certain versions of werkzeug raise an exception when passing `response`
|
||||
# in the constructor. This is a work-around.
|
||||
exc.response = response
|
||||
raise exc
|
||||
header = request.headers.get('Authorization')
|
||||
token = ''.join(header.strip().split()[1:])
|
||||
header = request.headers.get("Authorization")
|
||||
token = "".join(header.strip().split()[1:])
|
||||
user, kerberos_token = KerberosAuthenticate().process_request(token)
|
||||
# Remove the realm
|
||||
user = user.split('@')[0]
|
||||
user = user.split("@")[0]
|
||||
# If the user is part of the whitelist, then the group membership check is skipped
|
||||
if user in conf.allowed_users:
|
||||
groups = []
|
||||
@@ -275,20 +277,21 @@ def get_ldap_group_membership(uid):
|
||||
class Ldap(object):
|
||||
""" A class that handles LDAP connections and queries
|
||||
"""
|
||||
|
||||
connection = None
|
||||
base_dn = None
|
||||
|
||||
def __init__(self):
|
||||
if not conf.ldap_uri:
|
||||
raise Forbidden('LDAP_URI must be set in server config.')
|
||||
raise Forbidden("LDAP_URI must be set in server config.")
|
||||
if conf.ldap_groups_dn:
|
||||
self.base_dn = conf.ldap_groups_dn
|
||||
else:
|
||||
raise Forbidden('LDAP_GROUPS_DN must be set in server config.')
|
||||
raise Forbidden("LDAP_GROUPS_DN must be set in server config.")
|
||||
|
||||
if conf.ldap_uri.startswith('ldaps://'):
|
||||
tls = ldap3.Tls(ca_certs_file='/etc/pki/tls/certs/ca-bundle.crt',
|
||||
validate=ssl.CERT_REQUIRED)
|
||||
if conf.ldap_uri.startswith("ldaps://"):
|
||||
tls = ldap3.Tls(
|
||||
ca_certs_file="/etc/pki/tls/certs/ca-bundle.crt", validate=ssl.CERT_REQUIRED)
|
||||
server = ldap3.Server(conf.ldap_uri, use_ssl=True, tls=tls)
|
||||
else:
|
||||
server = ldap3.Server(conf.ldap_uri)
|
||||
@@ -296,26 +299,28 @@ class Ldap(object):
|
||||
try:
|
||||
self.connection.open()
|
||||
except ldap3.core.exceptions.LDAPSocketOpenError as error:
|
||||
log.error('The connection to "{0}" failed. The following error was raised: {1}'
|
||||
.format(conf.ldap_uri, str(error)))
|
||||
raise Forbidden('The connection to the LDAP server failed. Group membership '
|
||||
'couldn\'t be obtained.')
|
||||
log.error(
|
||||
'The connection to "{0}" failed. The following error was raised: {1}'.format(
|
||||
conf.ldap_uri, str(error)))
|
||||
raise Forbidden(
|
||||
"The connection to the LDAP server failed. Group membership couldn't be obtained.")
|
||||
|
||||
def get_user_membership(self, uid):
|
||||
""" Gets the group membership of a user
|
||||
:param uid: a string of the uid of the user
|
||||
:return: a list of common names of the posixGroups the user is a member of
|
||||
"""
|
||||
ldap_filter = '(memberUid={0})'.format(uid)
|
||||
ldap_filter = "(memberUid={0})".format(uid)
|
||||
# Only get the groups in the base container/OU
|
||||
self.connection.search(self.base_dn, ldap_filter, search_scope=ldap3.LEVEL,
|
||||
attributes=['cn'])
|
||||
self.connection.search(
|
||||
self.base_dn, ldap_filter, search_scope=ldap3.LEVEL, attributes=["cn"])
|
||||
groups = self.connection.response
|
||||
try:
|
||||
return [group['attributes']['cn'][0] for group in groups]
|
||||
return [group["attributes"]["cn"][0] for group in groups]
|
||||
except KeyError:
|
||||
log.exception('The LDAP groups could not be determined based on the search results '
|
||||
'of "{0}"'.format(str(groups)))
|
||||
log.exception(
|
||||
"The LDAP groups could not be determined based on the search results "
|
||||
'of "{0}"'.format(str(groups)))
|
||||
return []
|
||||
|
||||
|
||||
@@ -326,11 +331,11 @@ def get_user(request):
|
||||
membership such as ('mprahl', {'factory2', 'devel'})
|
||||
"""
|
||||
if conf.no_auth is True:
|
||||
log.debug('Authorization is disabled.')
|
||||
return 'anonymous', {'packager'}
|
||||
log.debug("Authorization is disabled.")
|
||||
return "anonymous", {"packager"}
|
||||
|
||||
if "user" not in g and "groups" not in g:
|
||||
get_user_func_name = 'get_user_{0}'.format(conf.auth_method)
|
||||
get_user_func_name = "get_user_{0}".format(conf.auth_method)
|
||||
get_user_func = globals().get(get_user_func_name)
|
||||
if not get_user_func:
|
||||
raise RuntimeError('The function "{0}" is not implemented'.format(get_user_func_name))
|
||||
|
||||
Reference in New Issue
Block a user