Mock: WIP threading support

This commit is contained in:
Jan Kaluza
2017-01-29 12:20:48 +01:00
parent c769d481ca
commit 0b307550c4
2 changed files with 240 additions and 122 deletions

View File

@@ -46,6 +46,7 @@ import kobo.rpmlib
import xmlrpclib
import shutil
import subprocess
import threading
import munch
from OpenSSL.SSL import SysCallError
@@ -237,6 +238,8 @@ class GenericBuilder(six.with_metaclass(ABCMeta)):
types. The actual source is usually delivered as an SCM URL from
fedmsg.
Warning: This function must be thread-safe.
Example
.build("bash", "git://someurl/bash#damn") #build from SCM URL
.build("bash", "/path/to/srpm.src.rpm") #build from source RPM
@@ -300,6 +303,7 @@ class KojiModuleBuilder(GenericBuilder):
""" Koji specific builder class """
backend = "koji"
_build_lock = threading.Lock()
def __init__(self, owner, module, config, tag_name):
"""
@@ -582,63 +586,66 @@ chmod 644 %buildroot/%_rpmconfigdir/macros.d/macros.modules
:return 4-tuple of the form (koji build task id, state, reason, nvr)
"""
# This code supposes that artifact_name can be built within the component
# Taken from /usr/bin/koji
def _unique_path(prefix):
"""
Create a unique path fragment by appending a path component
to prefix. The path component will consist of a string of letter and numbers
that is unlikely to be a duplicate, but is not guaranteed to be unique.
"""
# Use time() in the dirname to provide a little more information when
# browsing the filesystem.
# For some reason repr(time.time()) includes 4 or 5
# more digits of precision than str(time.time())
# Unnamed Engineer: Guido v. R., I am disappoint
return '%s/%r.%s' % (prefix, time.time(),
''.join([random.choice(string.ascii_letters) for i in range(8)]))
# TODO: If we are sure that this method is thread-safe, we can just
# remove _build_lock locking.
with KojiModuleBuilder._build_lock:
# This code supposes that artifact_name can be built within the component
# Taken from /usr/bin/koji
def _unique_path(prefix):
"""
Create a unique path fragment by appending a path component
to prefix. The path component will consist of a string of letter and numbers
that is unlikely to be a duplicate, but is not guaranteed to be unique.
"""
# Use time() in the dirname to provide a little more information when
# browsing the filesystem.
# For some reason repr(time.time()) includes 4 or 5
# more digits of precision than str(time.time())
# Unnamed Engineer: Guido v. R., I am disappoint
return '%s/%r.%s' % (prefix, time.time(),
''.join([random.choice(string.ascii_letters) for i in range(8)]))
if not self.__prep:
raise RuntimeError("Buildroot is not prep-ed")
if not self.__prep:
raise RuntimeError("Buildroot is not prep-ed")
# Skip existing builds
task_info = self._get_task_by_artifact(artifact_name)
if task_info:
log.info("skipping build of %s. Build already exists (task_id=%s), via %s" % (
source, task_info['task_id'], self))
return task_info['task_id'], koji.BUILD_STATES['COMPLETE'], 'Build already exists.', task_info['nvr']
# Skip existing builds
task_info = self._get_task_by_artifact(artifact_name)
if task_info:
log.info("skipping build of %s. Build already exists (task_id=%s), via %s" % (
source, task_info['task_id'], self))
return task_info['task_id'], koji.BUILD_STATES['COMPLETE'], 'Build already exists.', task_info['nvr']
self._koji_whitelist_packages([artifact_name,])
if '://' not in source:
#treat source as an srpm and upload it
serverdir = _unique_path('cli-build')
callback = None
self.koji_session.uploadWrapper(source, serverdir, callback=callback)
source = "%s/%s" % (serverdir, os.path.basename(source))
self._koji_whitelist_packages([artifact_name,])
if '://' not in source:
#treat source as an srpm and upload it
serverdir = _unique_path('cli-build')
callback = None
self.koji_session.uploadWrapper(source, serverdir, callback=callback)
source = "%s/%s" % (serverdir, os.path.basename(source))
# When "koji_build_macros_target" is set, we build the
# module-build-macros in this target instead of the self.module_target.
# The reason is that it is faster to build this RPM in
# already existing shared target, because Koji does not need to do
# repo-regen.
if (artifact_name == "module-build-macros"
and self.config.koji_build_macros_target):
module_target = self.config.koji_build_macros_target
else:
module_target = self.module_target['name']
# When "koji_build_macros_target" is set, we build the
# module-build-macros in this target instead of the self.module_target.
# The reason is that it is faster to build this RPM in
# already existing shared target, because Koji does not need to do
# repo-regen.
if (artifact_name == "module-build-macros"
and self.config.koji_build_macros_target):
module_target = self.config.koji_build_macros_target
else:
module_target = self.module_target['name']
build_opts = {"skip_tag": True}
task_id = self.koji_session.build(source, module_target, build_opts,
priority=self.build_priority)
log.info("submitted build of %s (task_id=%s), via %s" % (
source, task_id, self))
if task_id:
state = koji.BUILD_STATES['BUILDING']
reason = "Submitted %s to Koji" % (artifact_name)
else:
state = koji.BUILD_STATES['FAILED']
reason = "Failed to submit artifact %s to Koji" % (artifact_name)
return task_id, state, reason, None
build_opts = {"skip_tag": True}
task_id = self.koji_session.build(source, module_target, build_opts,
priority=self.build_priority)
log.info("submitted build of %s (task_id=%s), via %s" % (
source, task_id, self))
if task_id:
state = koji.BUILD_STATES['BUILDING']
reason = "Submitted %s to Koji" % (artifact_name)
else:
state = koji.BUILD_STATES['FAILED']
reason = "Failed to submit artifact %s to Koji" % (artifact_name)
return task_id, state, reason, None
def cancel_build(self, task_id):
self.koji_session.cancelTask(task_id)
@@ -832,6 +839,7 @@ class CoprModuleBuilder(GenericBuilder):
"""
backend = "copr"
_build_lock = threading.Lock()
def __init__(self, owner, module, config, tag_name):
self.owner = owner
@@ -945,11 +953,14 @@ class CoprModuleBuilder(GenericBuilder):
"""
log.info("Copr build")
# Git sources are treated specially.
if source.startswith("git://"):
return build_from_scm(artifact_name, source, self.config, self.build_srpm)
else:
return self.build_srpm(artifact_name, source)
# TODO: If we are sure that this method is thread-safe, we can just
# remove _build_lock locking.
with CoprModuleBuilder._build_lock:
# Git sources are treated specially.
if source.startswith("git://"):
return build_from_scm(artifact_name, source, self.config, self.build_srpm)
else:
return self.build_srpm(artifact_name, source)
def build_srpm(self, artifact_name, source):
if not self.__prep:
@@ -1053,6 +1064,7 @@ class MockModuleBuilder(GenericBuilder):
backend = "mock"
# Global build_id/task_id we increment when new build is executed.
_build_id_lock = threading.Lock()
_build_id = 1
MOCK_CONFIG_TEMPLATE = """
@@ -1109,6 +1121,14 @@ mdpolicy=group:primary
if not os.path.exists(self.resultsdir):
os.makedirs(self.resultsdir)
# Create "config" sub-directory.
self.configdir = os.path.join(self.tag_dir, "config")
if not os.path.exists(self.configdir):
os.makedirs(self.configdir)
# Generate path to mock config and add local repository there.
self._add_repo("localrepo", "file://" + self.resultsdir, "metadata_expire=1\n")
# Remove old files from the previous build of this tag but only
# before the first build is done, otherwise we would remove files
# which we already build in this module build.
@@ -1122,18 +1142,18 @@ mdpolicy=group:primary
if os.path.exists(os.path.join(self.resultsdir, "repodata/repomd.xml")):
os.remove(os.path.join(self.resultsdir, "repodata/repomd.xml"))
# Create "config" sub-directory.
self.configdir = os.path.join(self.tag_dir, "config")
if not os.path.exists(self.configdir):
os.makedirs(self.configdir)
# Generate path to mock config and add local repository there.
self.mock_config = os.path.join(self.configdir, "mock.cfg")
self._add_repo("localrepo", "file://" + self.resultsdir, "metadata_expire=1\n")
# Remove old config files from config directory.
for name in os.listdir(self.configdir):
os.remove(os.path.join(self.configdir, name))
log.info("MockModuleBuilder initialized, tag_name=%s, tag_dir=%s" %
(tag_name, self.tag_dir))
@property
def module_build_tag(self):
# Workaround koji specific code in modules.py
return {"name": self.tag_name}
def _createrepo(self):
"""
Creates the repository using "createrepo_c" command in the resultsdir.
@@ -1187,16 +1207,26 @@ mdpolicy=group:primary
def _write_mock_config(self):
"""
Writes Mock config file to self.configdir/mock.cfg.
Writes Mock config file to local file.
"""
config = str(MockModuleBuilder.MOCK_CONFIG_TEMPLATE)
config = config.replace("$root", self.tag_name)
config = config.replace("$root", "%s-%s" % (self.tag_name,
str(threading.current_thread().name)))
config = config.replace("$arch", self.arch)
config = config.replace("$group", " ".join(self.groups))
config = config.replace("$yum_conf", self.yum_conf)
with open(os.path.join(self.configdir, "mock.cfg"), 'w') as f:
# We write the most recent config to "mock.cfg", so thread-related
# configs can be later (re-)generated from it using _load_mock_config.
outfile = os.path.join(self.configdir, "mock.cfg")
with open(outfile, 'w') as f:
f.write(config)
# Write the config to thread-related configuration file.
outfile = os.path.join(self.configdir, "mock-%s.cfg" %
str(threading.current_thread().name))
with open(outfile, 'w') as f:
f.write(config)
def buildroot_connect(self, groups):
@@ -1225,7 +1255,7 @@ mdpolicy=group:primary
# right source RPM into the buildroot here, but we do not track
# what RPMs are output of particular SRPM build yet.
for artifact in artifacts:
if artifact.startswith("module-build-macros"):
if artifact and artifact.startswith("module-build-macros"):
self._load_mock_config()
self.groups.append("module-build-macros")
self._write_mock_config()
@@ -1267,35 +1297,57 @@ mdpolicy=group:primary
)
module_build_service.scheduler.consumer.work_queue_put(msg)
def _save_log(self, log_name, artifact_name):
old_log = os.path.join(self.resultsdir, log_name)
new_log = os.path.join(self.resultsdir, artifact_name + "-" + log_name)
def _save_log(self, resultsdir, log_name, artifact_name):
old_log = os.path.join(resultsdir, log_name)
new_log = os.path.join(resultsdir, artifact_name + "-" + log_name)
if os.path.exists(old_log):
os.rename(old_log, new_log)
def build_srpm(self, artifact_name, source):
def build_srpm(self, artifact_name, source, build_id):
"""
Builds the artifact from the SRPM.
"""
state = koji.BUILD_STATES['BUILDING']
# Use the mock config associated with this thread.
mock_config = os.path.join(self.configdir,
"mock-%s.cfg" % str(threading.current_thread().name))
# Clear resultsdir associated with this thread or in case it does not
# exist, create it.
resultsdir = os.path.join(self.resultsdir,
str(threading.current_thread().name))
if os.path.exists(resultsdir):
for name in os.listdir(resultsdir):
os.remove(os.path.join(resultsdir, name))
else:
os.makedirs(resultsdir)
# Open the logs to which we will forward mock stdout/stderr.
mock_stdout_log = open(os.path.join(self.resultsdir,
artifact_name + "-mock-stdout.log"), "w")
mock_stderr_log = open(os.path.join(self.resultsdir,
artifact_name + "-mock-stderr.log"), "w")
try:
# Initialize mock.
_execute_cmd(["mock", "-r", self.mock_config, "--init"])
_execute_cmd(["mock", "-v", "-r", mock_config, "--init"],
stdout=mock_stdout_log, stderr=mock_stderr_log)
# Start the build and store results to resultsdir
# TODO: Maybe this should not block in the future, but for local
# builds it is not a big problem.
_execute_cmd(["mock", "-r", self.mock_config,
_execute_cmd(["mock", "-v", "-r", mock_config,
"--no-clean", "--rebuild", source,
"--resultdir=%s" % self.resultsdir])
"--resultdir=%s" % resultsdir],
stdout=mock_stdout_log, stderr=mock_stderr_log)
# Emit messages simulating complete build. These messages
# are put in the scheduler's work queue and are handled
# by MBS after the build_srpm() method returns and scope gets
# back to scheduler.main.main() method.
self._send_build_change(koji.BUILD_STATES['COMPLETE'], source,
MockModuleBuilder._build_id)
state = koji.BUILD_STATES['COMPLETE']
self._send_build_change(state, source, build_id)
with open(os.path.join(self.resultsdir, "status.log"), 'w') as f:
with open(os.path.join(resultsdir, "status.log"), 'w') as f:
f.write("complete\n")
except Exception as e:
log.error("Error while building artifact %s: %s" % (artifact_name,
@@ -1305,32 +1357,54 @@ mdpolicy=group:primary
# are put in the scheduler's work queue and are handled
# by MBS after the build_srpm() method returns and scope gets
# back to scheduler.main.main() method.
self._send_build_change(koji.BUILD_STATES['FAILED'], source,
MockModuleBuilder._build_id)
with open(os.path.join(self.resultsdir, "status.log"), 'w') as f:
state = koji.BUILD_STATES['FAILED']
self._send_build_change(state, source,
build_id)
with open(os.path.join(resultsdir, "status.log"), 'w') as f:
f.write("failed\n")
self._save_log("state.log", artifact_name)
self._save_log("root.log", artifact_name)
self._save_log("build.log", artifact_name)
self._save_log("status.log", artifact_name)
mock_stdout_log.close()
mock_stderr_log.close()
# Return the "building" state. Real state will be taken by MBS
# from the messages emitted above.
state = koji.BUILD_STATES['BUILDING']
reason = "Submitted %s to Koji" % (artifact_name)
return MockModuleBuilder._build_id, state, reason, None
self._save_log(resultsdir, "state.log", artifact_name)
self._save_log(resultsdir, "root.log", artifact_name)
self._save_log(resultsdir, "build.log", artifact_name)
self._save_log(resultsdir, "status.log", artifact_name)
# Copy files from thread-related resultsdire to the main resultsdir.
for name in os.listdir(resultsdir):
shutil.copyfile(os.path.join(resultsdir, name), os.path.join(self.resultsdir, name))
reason = "Built %s in Mock" % (artifact_name)
return build_id, state, reason, None
def build(self, artifact_name, source):
log.info("Starting building artifact %s: %s" % (artifact_name, source))
MockModuleBuilder._build_id += 1
# Load global mock config for this module build from mock.cfg and
# generate the thread-specific mock config by writing it to fs again.
self._load_mock_config()
self._write_mock_config()
# Get the build-id in thread-safe manner.
build_id = None
with MockModuleBuilder._build_id_lock:
MockModuleBuilder._build_id += 1
build_id = int(MockModuleBuilder._build_id)
# Git sources are treated specially.
if source.startswith("git://"):
return build_from_scm(artifact_name, source, self.config, self.build_srpm)
# Open the srpm-stdout and srpm-stderr logs and build from SCM.
srpm_stdout_fn = os.path.join(self.resultsdir,
artifact_name + "-srpm-stdout.log")
srpm_stderr_fn = os.path.join(self.resultsdir,
artifact_name + "-srpm-stderr.log")
with open(srpm_stdout_fn, "w") as srpm_stdout_log, open(srpm_stderr_fn, "w") as srpm_stderr_log:
return build_from_scm(artifact_name, source,
self.config, self.build_srpm, data=build_id,
stdout=srpm_stdout_log, stderr=srpm_stderr_log)
else:
return self.build_srpm(artifact_name, source)
return self.build_srpm(artifact_name, source, build_id)
@staticmethod
def get_disttag_srpm(disttag):
@@ -1340,14 +1414,23 @@ mdpolicy=group:primary
def cancel_build(self, task_id):
pass
def build_from_scm(artifact_name, source, config, build_srpm):
def build_from_scm(artifact_name, source, config, build_srpm,
data = None, stdout=None, stderr=None):
"""
Builds the artifact from the SCM based source.
:param artifact_name: Name of the artifact.
:param source: SCM URL with artifact's sources (spec file).
:param config: Config instance.
:param build_srpm: Method to call to build the RPM from the generate SRPM.
:param data: Data to be passed to the build_srpm method.
:param stdout: Python file object to which the stdout of SRPM build
command is logged.
:param stderr: Python file object to which the stderr of SRPM build
command is logged.
"""
td = None
owd = os.getcwd()
ret = (0, koji.BUILD_STATES["FAILED"], "Cannot create SRPM", None)
td = None
try:
log.debug('Cloning source URL: %s' % source)
@@ -1357,22 +1440,21 @@ def build_from_scm(artifact_name, source, config, build_srpm):
cod = scm.checkout(td)
# Use configured command to create SRPM out of the SCM repo.
log.debug("Creating SRPM")
os.chdir(cod)
_execute_cmd(config.mock_build_srpm_cmd.split(" "))
log.debug("Creating SRPM in %s" % cod)
_execute_cmd(config.mock_build_srpm_cmd.split(" "),
stdout=stdout, stderr=stderr, cwd=cod)
# Find out the built SRPM and build it normally.
for f in os.listdir(cod):
if f.endswith(".src.rpm"):
log.info("Created SRPM %s" % f)
source = os.path.join(cod, f)
ret = build_srpm(artifact_name, source)
ret = build_srpm(artifact_name, source, data)
break
except Exception as e:
log.error("Error while generating SRPM for artifact %s: %s" % (
artifact_name, str(e)))
finally:
os.chdir(owd)
try:
if td is not None:
shutil.rmtree(td)
@@ -1384,12 +1466,32 @@ def build_from_scm(artifact_name, source, config, build_srpm):
return ret
def _execute_cmd(args):
log.debug("Executing command: %s" % args)
ret = subprocess.call(args)
if ret != 0:
raise RuntimeError("Command '%s' returned non-zero value %d"
% (args, ret))
def _execute_cmd(args, stdout = None, stderr = None, cwd = None):
"""
Executes command defined by `args`. If `stdout` or `stderr` is set to
Python file object, the stderr/stdout output is redirecter to that file.
If `cwd` is set, current working directory is set accordingly for the
executed command.
:param args: list defining the command to execute.
:param stdout: Python file object to redirect the stdout to.
:param stderr: Python file object to redirect the stderr to.
:param cwd: string defining the current working directory for command.
:raises RuntimeError: Raised when command exits with non-zero exit code.
"""
out_log_msg = ""
if stdout:
out_log_msg += ", stdout log: %s" % stdout.name
if stderr:
out_log_msg += ", stderr log: %s" % stderr.name
log.info("Executing command: %s%s" % (args, out_log_msg))
proc = subprocess.Popen(args, stdout=stdout, stderr=stderr, cwd=cwd)
proc.communicate()
if proc.returncode != 0:
err_msg = "Command '%s' returned non-zero value %d%s" % (args, proc.returncode, out_log_msg)
raise RuntimeError(err_msg)
GenericBuilder.register_backend_class(KojiModuleBuilder)

View File

@@ -83,7 +83,6 @@ def at_concurrent_component_threshold(config, session):
return False
def start_build_batch(config, module, session, builder, components=None):
"""
Starts a round of the build cycle for a module.
@@ -95,7 +94,14 @@ def start_build_batch(config, module, session, builder, components=None):
if any([c.state == koji.BUILD_STATES['BUILDING']
for c in module.component_builds]):
raise ValueError("Cannot start a batch when another is in flight.")
err_msg = "Cannot start a batch when another is in flight."
log.error(err_msg)
unbuilt_components = [
c for c in module.component_builds
if (c.state == koji.BUILD_STATES['BUILDING'])
]
log.error("Components in building state: %s" % str(unbuilt_components))
raise ValueError(err_msg)
# The user can either pass in a list of components to 'seed' the batch, or
# if none are provided then we just select everything that hasn't
@@ -111,23 +117,33 @@ def start_build_batch(config, module, session, builder, components=None):
log.info("Starting build of next batch %d, %s" % (module.batch,
unbuilt_components))
for c in unbuilt_components:
if at_concurrent_component_threshold(config, session):
log.info('Concurrent build threshold met')
break
def start_build_component(c):
"""
Submits single component build to builder. Called in thread
by ThreadPool later.
"""
with models.make_session(conf) as s:
if at_concurrent_component_threshold(config, s):
log.info('Concurrent build threshold met')
return
try:
c.task_id, c.state, c.state_reason, c.nvr = builder.build(
artifact_name=c.package, source=c.scmurl)
except Exception as e:
c.state = koji.BUILD_STATES['FAILED']
c.state_reason = "Failed to submit artifact %s to Koji: %s" % (c.package, str(e))
continue
c.state_reason = "Failed to build artifact %s: %s" % (c.package, str(e))
return
if not c.task_id and c.state == koji.BUILD_STATES['BUILDING']:
c.state = koji.BUILD_STATES['FAILED']
c.state_reason = "Failed to submit artifact %s to Koji" % (c.package)
continue
c.state_reason = ("Failed to build artifact %s: "
"Builder did not return task ID" % (c.package))
return
# Start build of components in this batch.
pool = ThreadPool(config.num_consecutive_builds)
pool.map(start_build_component, unbuilt_components)
further_work = []