mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-02-02 20:59:06 +08:00
318 lines
11 KiB
Python
Executable File
318 lines
11 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
# Copyright (c) 2019 Red Hat, Inc.
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in all
|
|
# copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
# SOFTWARE.
|
|
#
|
|
# Written by Chenxiong Qi <cqi@redhat.com>
|
|
# Jan Kaluza <jkaluza@redhat.com>
|
|
|
|
from __future__ import print_function
|
|
import argparse
|
|
import enum
|
|
import json
|
|
from pprint import pprint
|
|
import sys
|
|
|
|
import requests
|
|
import requests.exceptions
|
|
from six.moves import urllib_parse
|
|
|
|
|
|
env_config = {
|
|
"fedora": {
|
|
"prod": {
|
|
"server_url": "https://mbs.fedoraproject.org"
|
|
},
|
|
"staging": {
|
|
"server_url": "https://mbs.stg.fedoraproject.org"
|
|
},
|
|
},
|
|
"redhat": {
|
|
"prod": {"server_url": "https://mbs.engineering.redhat.com"},
|
|
"staging": {"server_url": "https://mbs.stage.engineering.redhat.com"},
|
|
},
|
|
}
|
|
|
|
|
|
id_provider_config = {
|
|
"prod": "https://id.fedoraproject.org/openidc/",
|
|
"staging": "https://id.stg.fedoraproject.org/openidc/",
|
|
}
|
|
|
|
|
|
class AuthMech(enum.IntEnum):
|
|
OpenIDC = 1
|
|
Kerberos = 2
|
|
Anonymous = 3
|
|
|
|
|
|
class MBSCli(object):
|
|
def __init__(
|
|
self,
|
|
server_url,
|
|
api_version="2",
|
|
verify_ssl=True,
|
|
auth_mech=None,
|
|
openidc_token=None,
|
|
):
|
|
"""Initialize MBS client
|
|
|
|
:param str server_url: Base server URL of MBS (For example "https://localhost.tld").
|
|
:param str api_version: API version client will call. Version 2 is the default.
|
|
:param bool verify_ssl: whether to verify SSL certificate over HTTP. By
|
|
default, always verify, but you are also always able to disable it
|
|
by passing False.
|
|
:param AuthMech auth_mech: specify what authentication mechanism is
|
|
used to interact with MBS server. Choose one mechanism from
|
|
AuthMech. Anonymous can be passed to force client not send
|
|
any authentication information. If this parameter is omitted,
|
|
same as Anonymous.
|
|
:param str openidc_token: token got from OpenIDC so that client can be
|
|
authenticated by MBS server. This is only required if
|
|
``AuthMech.OpenIDC`` is passed to parameter ``auth_mech``.
|
|
"""
|
|
self._server_url = server_url
|
|
self._api_version = api_version
|
|
self._verify_ssl = verify_ssl
|
|
if auth_mech == AuthMech.OpenIDC and not openidc_token:
|
|
raise ValueError(
|
|
"OpenIDC token must be specified when OpenIDC authentication is enabled.")
|
|
self._openidc_token = openidc_token
|
|
|
|
if auth_mech is None:
|
|
self._auth_mech = AuthMech.Anonymous
|
|
else:
|
|
self._auth_mech = auth_mech
|
|
|
|
@classmethod
|
|
def get_auth_mech(cls, server_url):
|
|
"""
|
|
Asks the MBS server running on `server_url` about the available
|
|
auth mechanisum and returns the AuthMech representing that mechanism.
|
|
"""
|
|
cli = MBSCli(server_url, verify_ssl=False, auth_mech=AuthMech.Anonymous)
|
|
r = cli._get("about")
|
|
data = r.json()
|
|
if data["auth_method"] == "oidc":
|
|
return AuthMech.OpenIDC
|
|
elif data["auth_method"] == "kerberos":
|
|
return AuthMech.Kerberos
|
|
else:
|
|
raise ValueError("Unknown auth_method: %r".format(data["auth_method"]))
|
|
|
|
def _make_endpoint(self, resource_path):
|
|
"""Helper method to construct URL to requested resource
|
|
|
|
URL of requested resource consists of the server URL, API version and
|
|
the resource path.
|
|
|
|
:param str resource_path: the part after API version representing
|
|
the concrete resource.
|
|
:return: the whole complete URL of requested resource.
|
|
:rtype: str
|
|
"""
|
|
return urllib_parse.urljoin(
|
|
self._server_url,
|
|
"module-build-service/{0}/{1}".format(self._api_version, resource_path.lstrip("/")),
|
|
)
|
|
|
|
def _make_request(self, method, resource_path, data=None):
|
|
"""Make a HTTP request to server
|
|
|
|
:param str method: HTTP request method to send, GET, POST and DELETE
|
|
are supported.
|
|
:param str resource_path: path of requested resource.
|
|
:param dict data: corresponding data with specific request. It is
|
|
optional. None is default that means no data is send along with
|
|
request.
|
|
:return: requests Response object.
|
|
:rtype: requests.Response
|
|
:raises: if MBS does not response 200, exception will be raised
|
|
by ``requests.Response.raise_for_status``.
|
|
"""
|
|
request_data = {}
|
|
headers = {}
|
|
if data:
|
|
if method in ("post", "patch"):
|
|
request_data["data"] = json.dumps(data)
|
|
headers["Content-Type"] = "application/json"
|
|
if method == "get":
|
|
request_data["params"] = data
|
|
if not self._verify_ssl:
|
|
request_data["verify"] = False
|
|
if self._auth_mech == AuthMech.OpenIDC:
|
|
headers["Authorization"] = "Bearer {0}".format(self._openidc_token)
|
|
elif self._auth_mech == AuthMech.Kerberos:
|
|
import requests_kerberos
|
|
|
|
# MBS server does not support mutual auth, so make it optional.
|
|
request_data["auth"] = requests_kerberos.HTTPKerberosAuth(
|
|
mutual_authentication=requests_kerberos.OPTIONAL)
|
|
|
|
if headers:
|
|
request_data["headers"] = headers
|
|
|
|
request_method = getattr(requests, method)
|
|
resource_url = self._make_endpoint(resource_path)
|
|
r = request_method(resource_url, **request_data)
|
|
|
|
# Print error, for debugging
|
|
if r.status_code != 200:
|
|
print(r.text, file=sys.stderr)
|
|
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def _get(self, resource_path, data=None):
|
|
"""Make a GET HTTP request to server"""
|
|
return self._make_request("get", resource_path, data)
|
|
|
|
def _post(self, resource_path, data=None):
|
|
"""Make a POST HTTP request to server"""
|
|
return self._make_request("post", resource_path, data)
|
|
|
|
def _patch(self, resource_path, data=None):
|
|
"""Make a PATCH HTTP request to server"""
|
|
return self._make_request("patch", resource_path, data)
|
|
|
|
def import_module(self, scmurl):
|
|
"""
|
|
Imports the module defined by yaml file on SCM URL to MBS.
|
|
"""
|
|
r = self._post("import-module/", {"scmurl": scmurl})
|
|
pprint(r.json())
|
|
return 0
|
|
|
|
def execute(self, args):
|
|
"""
|
|
Executes the command based on the parsed arguments `args`.
|
|
"""
|
|
if args.command == "import":
|
|
return self.import_module(args.scmurl)
|
|
return 1
|
|
|
|
|
|
def parse_args():
|
|
"""
|
|
Parses command line arguments using argparse and returns the result.
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
description="""\
|
|
%(prog)s - MBS API client
|
|
|
|
If you have problems authenticating with OpenID Connect, try:
|
|
|
|
$ rm -rf ~/.openidc/
|
|
""",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
parser.add_argument(
|
|
"--redhat",
|
|
action="store_const",
|
|
const="redhat",
|
|
default="fedora",
|
|
dest="infra",
|
|
help="Use internal MBS infra environment. If omitted, Fedora Infra will be used by "
|
|
"default.",
|
|
)
|
|
parser.add_argument(
|
|
"--staging",
|
|
action="store_const",
|
|
const="staging",
|
|
default="prod",
|
|
dest="env",
|
|
help="Use Fedora Infra or internal staging environment, which depends on if --redhat "
|
|
"is specified. If omitted, production environment will be used.",
|
|
)
|
|
parser.add_argument("--server", default=None, help="Use custom MBS server.")
|
|
|
|
subparsers = parser.add_subparsers(description="Commands you can use in MBS client.")
|
|
|
|
import_parser = subparsers.add_parser("import", help="Import new virtual module.")
|
|
import_parser.set_defaults(command="import")
|
|
import_parser.add_argument("scmurl", default="", help="SCM URL of module to import.")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not hasattr(args, "command"):
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
return args
|
|
|
|
|
|
def create_mbs_client(args):
|
|
"""
|
|
Creates the MBSCli instance according to parsed command line arguments
|
|
`args`.
|
|
"""
|
|
if args.server is None:
|
|
mbs_url = env_config[args.infra][args.env]["server_url"]
|
|
else:
|
|
mbs_url = args.server
|
|
|
|
auth_mech = MBSCli.get_auth_mech(mbs_url)
|
|
openidc_token = None
|
|
|
|
if auth_mech == AuthMech.OpenIDC:
|
|
try:
|
|
import openidc_client
|
|
except ImportError:
|
|
print("The python-openidc-client package must be installed", file=sys.stderr)
|
|
sys.exit(1)
|
|
id_provider = id_provider_config[args.env]
|
|
|
|
# Get the auth token using the OpenID client.
|
|
oidc = openidc_client.OpenIDCClient(
|
|
"mbs",
|
|
id_provider,
|
|
{"Token": "Token", "Authorization": "Authorization"},
|
|
"mbs-authorizer",
|
|
"notsecret",
|
|
)
|
|
|
|
scopes = [
|
|
"openid",
|
|
"https://id.fedoraproject.org/scope/groups",
|
|
"https://mbs.fedoraproject.org/oidc/submit-build",
|
|
]
|
|
try:
|
|
token = oidc.get_token(scopes, new_token=True)
|
|
token = oidc.report_token_issue()
|
|
except requests.exceptions.HTTPError as e:
|
|
print(e.response.text, file=sys.stderr)
|
|
raise
|
|
|
|
return MBSCli(mbs_url, auth_mech=auth_mech, openidc_token=token)
|
|
elif auth_mech == AuthMech.Kerberos:
|
|
try:
|
|
import requests_kerberos # noqa
|
|
except ImportError:
|
|
print("The python-requests-kerberos package must be installed", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
return MBSCli(mbs_url, auth_mech=auth_mech, openidc_token=openidc_token)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
cli = create_mbs_client(args)
|
|
sys.exit(cli.execute(args))
|