Files
2022-04-13 10:42:53 -04:00

369 lines
13 KiB
Python

# -*- coding: utf-8 -*-
# SPDX-License-Identifier: MIT
"""SCM handler functions."""
from __future__ import absolute_import
import os
import subprocess as sp
import re
import shutil
import tempfile
from module_build_service.common import log, conf
from module_build_service.common.errors import (
Forbidden,
ValidationError,
UnprocessableEntity,
ProgrammingError,
)
from module_build_service.common.retry import retry
from module_build_service.common.utils import provide_module_stream_version_from_timestamp
def scm_url_schemes(terse=False):
"""
Definition of URL schemes supported by both frontend and scheduler.
NOTE: only git URLs in the following formats are supported atm:
git://
git+http://
git+https://
git+rsync://
http://
https://
file://
:param terse=False: Whether to return terse list of unique URL schemes
even without the "://".
"""
scm_types = {
"git": (
"git://",
"git+http://",
"git+https://",
"git+rsync://",
"http://",
"https://",
"file://",
)
}
if not terse:
return scm_types
else:
scheme_list = []
for scm_type, scm_schemes in scm_types.items():
scheme_list.extend([scheme[:-3] for scheme in scm_schemes])
return list(set(scheme_list))
class SCM(object):
"SCM abstraction class"
# Assuming git for HTTP schemas
types = scm_url_schemes()
def __init__(self, url, branch=None, allowed_scm=None, allow_local=False):
"""
Initialize the SCM object using the specified SCM URL.
If url is not in the list of allowed_scm, an error will be raised.
:param str url: The unmodified scmurl
:param str branch: The optional source control branch. This defaults to the default_branch
config option when git is used.
:param list allowed_scm: The optional list of allowed SCM URL prefixes
:param bool allow_local: Allow SCM URLs that start with "file://"
:raises: Forbidden or ValidationError
"""
if allowed_scm:
if not (
url.startswith(tuple(allowed_scm)) or (allow_local and url.startswith("file://"))
):
raise Forbidden("%s is not in the list of allowed SCMs" % url)
# If we are given the option for the git protocol or the http(s) protocol,
# then just use http(s)
if re.match(r"(git\+http(?:s)?:\/\/)", url):
url = url[4:]
url = url.rstrip("/")
self.url = url
self.sourcedir = None
# once we have more than one SCM provider, we will need some more
# sophisticated lookup logic
for scmtype, schemes in SCM.types.items():
if self.url.startswith(schemes):
self.scheme = scmtype
break
else:
raise ValidationError("Invalid SCM URL: %s" % url)
# git is the only one supported SCM provider atm
if self.scheme == "git":
match = re.search(r"^(?P<repository>.*/(?P<name>[^?]*))(\?#(?P<commit>.*))?", url)
self.repository = match.group("repository")
self.name = match.group("name")
self.repository_root = self.repository[: -len(self.name)]
if self.name.endswith(".git"):
self.name = self.name[:-4]
self.commit = match.group("commit")
self.branch = branch if branch else conf.default_branch
self.version = None
self._cloned = False
else:
raise ValidationError("Unhandled SCM scheme: %s" % self.scheme)
def verify(self):
"""
Verifies that the information provided by a user in SCM URL and branch
matches the information in SCM repository. For example verifies that
the commit hash really belongs to the provided branch.
:raises ValidationError
"""
if not self.sourcedir:
raise ProgrammingError("Do .checkout() first.")
found = False
branches = SCM._run(
["git", "branch", "-r", "--contains", self.commit], chdir=self.sourcedir
)[1]
for branch in branches.decode("utf-8").split("\n"):
branch = branch.strip()
if branch[len("origin/"):] == self.branch:
found = True
break
if not found:
raise ValidationError("Commit %s is not in branch %s." % (self.commit, self.branch))
def scm_url_from_name(self, name):
"""
Generates new SCM URL for another module defined by a name. The new URL
is based on the root of current SCM URL.
"""
if self.scheme == "git":
return self.repository_root + name + ".git"
return None
@staticmethod
def _run_without_retry(cmd, chdir=None, log_stdout=False):
proc = sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE, cwd=chdir)
stdout, stderr = proc.communicate()
if log_stdout and stdout:
log.debug(stdout)
if stderr:
log.warning(stderr)
if proc.returncode != 0:
raise UnprocessableEntity(
"Failed on %r, retcode %r, out %r, err %r" % (cmd, proc.returncode, stdout, stderr)
)
return proc.returncode, stdout, stderr
@staticmethod
@retry(
timeout=conf.scm_net_timeout,
interval=conf.scm_net_retry_interval,
wait_on=UnprocessableEntity,
)
def _run(cmd, chdir=None, log_stdout=False):
return SCM._run_without_retry(cmd, chdir, log_stdout)
def clone(self, scmdir):
"""
Clone the repo from SCM.
:param str scmdir: the working directory
:raises UnprocessableEntity: if the clone fails
"""
if self._cloned:
return
if not self.scheme == "git":
raise RuntimeError("clone: Unhandled SCM scheme.")
if not self.sourcedir:
self.sourcedir = os.path.join(scmdir, self.name)
module_clone_cmd = ["git", "clone", "-q", "--no-checkout", self.repository, self.sourcedir]
SCM._run(module_clone_cmd, chdir=scmdir)
self._cloned = True
def checkout_ref(self, ref):
"""
Checkout the input reference.
:param str ref: the SCM reference (hash, branch, etc.) to check out
:raises UnprocessableEntity: if the checkout fails
"""
module_checkout_cmd = ["git", "checkout", "-q", ref]
SCM._run(module_checkout_cmd, chdir=self.sourcedir)
def checkout(self, scmdir):
"""Checkout the module from SCM.
:param str scmdir: The working directory
:returns: str -- the directory that the module was checked-out into
:raises: RuntimeError
"""
# TODO: sanity check arguments
if self.scheme == "git":
if not self._cloned:
self.clone(scmdir)
try:
self.checkout_ref(self.commit)
except UnprocessableEntity as e:
if (
str(e).endswith(' did not match any file(s) known to git.\\n"')
or "fatal: reference is not a tree: " in str(e)
):
raise UnprocessableEntity(
"checkout: The requested commit hash was not found within the repository. "
"Perhaps you forgot to push. The original message was: %s" % str(e)
)
raise
timestamp = SCM._run(["git", "show", "-s", "--format=%ct"], chdir=self.sourcedir)[1]
self.version = provide_module_stream_version_from_timestamp(timestamp=timestamp)
else:
raise RuntimeError("checkout: Unhandled SCM scheme.")
return self.sourcedir
def get_latest(self, ref=None):
""" Get the latest commit hash based on the provided git ref
:param ref: a string of a git ref (either a branch or commit hash). This defaults to
self.branch.
:returns: a string of the latest commit hash
:raises: RuntimeError
"""
if ref is None:
ref = self.branch
if self.scheme == "git":
log.debug("Getting/verifying commit hash for %s" % self.repository)
try:
# This will fail if `ref` is not a branch name, but this works for commit hashes.
# If the ref is not a branch, then fallback to `get_full_commit_hash`. We do not
# want to retry here, since if it's a commit, it would block for a very long time.
cmd = ["git", "ls-remote", "--exit-code", self.repository, "refs/heads/" + ref]
log.debug("Checking to see if the ref %s is a branch with `%s`", ref, " ".join(cmd))
_, output, _ = SCM._run_without_retry(cmd)
except UnprocessableEntity:
log.debug("The ref %s is not a branch. Checking to see if it's a commit hash", ref)
# The call below will either return the commit hash as is (if a full one was
# provided) or the full commit hash (if a short hash was provided). If ref is not
# a commit hash, then this will raise an exception.
return self.get_full_commit_hash(commit_hash=ref)
else:
# git-ls-remote prints output like this, where the first commit
# hash is what to return.
# bf028e573e7c18533d89c7873a411de92d4d913e refs/heads/master
return output.split()[0].decode("utf-8")
else:
raise RuntimeError("get_latest: Unhandled SCM scheme.")
def get_full_commit_hash(self, commit_hash=None):
"""
Takes a shortened commit hash and returns the full hash
:param commit_hash: a shortened commit hash. If not specified, the
one in the URL will be used
:return: string of the full commit hash
"""
if commit_hash:
commit_to_check = commit_hash
elif self._commit:
commit_to_check = self._commit
else:
try:
# If self._commit was None, then calling `self.commit` will resolve the ref based
# on the branch
return self.commit
except UnprocessableEntity:
# If there was an exception resolving the ref based on the branch (could be the
# default branch that doesn't exist), then there is not enough information to get
# the commit hash
raise RuntimeError('No commit hash was specified for "{0}"'.format(self.url))
if self.scheme == "git":
log.debug(
"Getting the full commit hash on %s from %s", self.repository, commit_to_check)
td = None
try:
td = tempfile.mkdtemp()
SCM._run(["git", "clone", "-q", self.repository, td, "--bare"])
cmd = ["git", "rev-parse", commit_to_check]
log.debug(
"Running `%s` to get the full commit hash for %s",
" ".join(cmd),
commit_to_check
)
output = SCM._run(cmd, chdir=td)[1]
finally:
if td and os.path.exists(td):
shutil.rmtree(td)
if output:
return str(output.decode("utf-8").strip("\n"))
raise UnprocessableEntity(
'The full commit hash of "{0}" for "{1}" could not be found'.format(
commit_hash, self.repository)
)
else:
raise RuntimeError("get_full_commit_hash: Unhandled SCM scheme.")
def get_module_yaml(self):
"""
Get full path to the module's YAML file.
:return: path as a string
:raises UnprocessableEntity
"""
if not self.sourcedir:
raise ProgrammingError("Do .checkout() first.")
path_to_yaml = os.path.join(self.sourcedir, (self.name + ".yaml"))
try:
with open(path_to_yaml):
return path_to_yaml
except IOError:
msg = (
"get_module_yaml: The SCM repository doesn't contain a modulemd file. "
"Couldn't access: %s" % path_to_yaml
)
log.error(msg)
raise UnprocessableEntity(msg)
@staticmethod
def is_full_commit_hash(scheme, commit):
"""
Determines if a commit hash is the full commit hash. For instance, if
the scheme is git, it will determine if the commit is a full SHA1 hash
:param scheme: a string containing the SCM type (e.g. git)
:param commit: a string containing the commit
:return: boolean
"""
if scheme == "git":
sha1_pattern = re.compile(r"^[0-9a-f]{40}$")
return bool(re.match(sha1_pattern, commit))
else:
raise RuntimeError("is_full_commit_hash: Unhandled SCM scheme.")
@property
def commit(self):
"""The commit ID, for example the git hash, or None."""
if not self._commit:
self._commit = self.get_latest(self.branch)
return self._commit
@commit.setter
def commit(self, s):
self._commit = str(s) if s else None