diff --git a/module_build_service/builder.py b/module_build_service/builder.py index b6d9ca56..fff05c1a 100644 --- a/module_build_service/builder.py +++ b/module_build_service/builder.py @@ -46,6 +46,7 @@ import kobo.rpmlib import xmlrpclib import shutil import subprocess +import threading import munch from OpenSSL.SSL import SysCallError @@ -237,6 +238,8 @@ class GenericBuilder(six.with_metaclass(ABCMeta)): types. The actual source is usually delivered as an SCM URL from fedmsg. + Warning: This function must be thread-safe. + Example .build("bash", "git://someurl/bash#damn") #build from SCM URL .build("bash", "/path/to/srpm.src.rpm") #build from source RPM @@ -300,6 +303,7 @@ class KojiModuleBuilder(GenericBuilder): """ Koji specific builder class """ backend = "koji" + _build_lock = threading.Lock() def __init__(self, owner, module, config, tag_name): """ @@ -582,63 +586,66 @@ chmod 644 %buildroot/%_rpmconfigdir/macros.d/macros.modules :return 4-tuple of the form (koji build task id, state, reason, nvr) """ - # This code supposes that artifact_name can be built within the component - # Taken from /usr/bin/koji - def _unique_path(prefix): - """ - Create a unique path fragment by appending a path component - to prefix. The path component will consist of a string of letter and numbers - that is unlikely to be a duplicate, but is not guaranteed to be unique. - """ - # Use time() in the dirname to provide a little more information when - # browsing the filesystem. - # For some reason repr(time.time()) includes 4 or 5 - # more digits of precision than str(time.time()) - # Unnamed Engineer: Guido v. R., I am disappoint - return '%s/%r.%s' % (prefix, time.time(), - ''.join([random.choice(string.ascii_letters) for i in range(8)])) + # TODO: If we are sure that this method is thread-safe, we can just + # remove _build_lock locking. + with KojiModuleBuilder._build_lock: + # This code supposes that artifact_name can be built within the component + # Taken from /usr/bin/koji + def _unique_path(prefix): + """ + Create a unique path fragment by appending a path component + to prefix. The path component will consist of a string of letter and numbers + that is unlikely to be a duplicate, but is not guaranteed to be unique. + """ + # Use time() in the dirname to provide a little more information when + # browsing the filesystem. + # For some reason repr(time.time()) includes 4 or 5 + # more digits of precision than str(time.time()) + # Unnamed Engineer: Guido v. R., I am disappoint + return '%s/%r.%s' % (prefix, time.time(), + ''.join([random.choice(string.ascii_letters) for i in range(8)])) - if not self.__prep: - raise RuntimeError("Buildroot is not prep-ed") + if not self.__prep: + raise RuntimeError("Buildroot is not prep-ed") - # Skip existing builds - task_info = self._get_task_by_artifact(artifact_name) - if task_info: - log.info("skipping build of %s. Build already exists (task_id=%s), via %s" % ( - source, task_info['task_id'], self)) - return task_info['task_id'], koji.BUILD_STATES['COMPLETE'], 'Build already exists.', task_info['nvr'] + # Skip existing builds + task_info = self._get_task_by_artifact(artifact_name) + if task_info: + log.info("skipping build of %s. Build already exists (task_id=%s), via %s" % ( + source, task_info['task_id'], self)) + return task_info['task_id'], koji.BUILD_STATES['COMPLETE'], 'Build already exists.', task_info['nvr'] - self._koji_whitelist_packages([artifact_name,]) - if '://' not in source: - #treat source as an srpm and upload it - serverdir = _unique_path('cli-build') - callback = None - self.koji_session.uploadWrapper(source, serverdir, callback=callback) - source = "%s/%s" % (serverdir, os.path.basename(source)) + self._koji_whitelist_packages([artifact_name,]) + if '://' not in source: + #treat source as an srpm and upload it + serverdir = _unique_path('cli-build') + callback = None + self.koji_session.uploadWrapper(source, serverdir, callback=callback) + source = "%s/%s" % (serverdir, os.path.basename(source)) - # When "koji_build_macros_target" is set, we build the - # module-build-macros in this target instead of the self.module_target. - # The reason is that it is faster to build this RPM in - # already existing shared target, because Koji does not need to do - # repo-regen. - if (artifact_name == "module-build-macros" - and self.config.koji_build_macros_target): - module_target = self.config.koji_build_macros_target - else: - module_target = self.module_target['name'] + # When "koji_build_macros_target" is set, we build the + # module-build-macros in this target instead of the self.module_target. + # The reason is that it is faster to build this RPM in + # already existing shared target, because Koji does not need to do + # repo-regen. + if (artifact_name == "module-build-macros" + and self.config.koji_build_macros_target): + module_target = self.config.koji_build_macros_target + else: + module_target = self.module_target['name'] - build_opts = {"skip_tag": True} - task_id = self.koji_session.build(source, module_target, build_opts, - priority=self.build_priority) - log.info("submitted build of %s (task_id=%s), via %s" % ( - source, task_id, self)) - if task_id: - state = koji.BUILD_STATES['BUILDING'] - reason = "Submitted %s to Koji" % (artifact_name) - else: - state = koji.BUILD_STATES['FAILED'] - reason = "Failed to submit artifact %s to Koji" % (artifact_name) - return task_id, state, reason, None + build_opts = {"skip_tag": True} + task_id = self.koji_session.build(source, module_target, build_opts, + priority=self.build_priority) + log.info("submitted build of %s (task_id=%s), via %s" % ( + source, task_id, self)) + if task_id: + state = koji.BUILD_STATES['BUILDING'] + reason = "Submitted %s to Koji" % (artifact_name) + else: + state = koji.BUILD_STATES['FAILED'] + reason = "Failed to submit artifact %s to Koji" % (artifact_name) + return task_id, state, reason, None def cancel_build(self, task_id): self.koji_session.cancelTask(task_id) @@ -832,6 +839,7 @@ class CoprModuleBuilder(GenericBuilder): """ backend = "copr" + _build_lock = threading.Lock() def __init__(self, owner, module, config, tag_name): self.owner = owner @@ -945,11 +953,14 @@ class CoprModuleBuilder(GenericBuilder): """ log.info("Copr build") - # Git sources are treated specially. - if source.startswith("git://"): - return build_from_scm(artifact_name, source, self.config, self.build_srpm) - else: - return self.build_srpm(artifact_name, source) + # TODO: If we are sure that this method is thread-safe, we can just + # remove _build_lock locking. + with CoprModuleBuilder._build_lock: + # Git sources are treated specially. + if source.startswith("git://"): + return build_from_scm(artifact_name, source, self.config, self.build_srpm) + else: + return self.build_srpm(artifact_name, source) def build_srpm(self, artifact_name, source): if not self.__prep: @@ -1053,6 +1064,7 @@ class MockModuleBuilder(GenericBuilder): backend = "mock" # Global build_id/task_id we increment when new build is executed. + _build_id_lock = threading.Lock() _build_id = 1 MOCK_CONFIG_TEMPLATE = """ @@ -1109,6 +1121,14 @@ mdpolicy=group:primary if not os.path.exists(self.resultsdir): os.makedirs(self.resultsdir) + # Create "config" sub-directory. + self.configdir = os.path.join(self.tag_dir, "config") + if not os.path.exists(self.configdir): + os.makedirs(self.configdir) + + # Generate path to mock config and add local repository there. + self._add_repo("localrepo", "file://" + self.resultsdir, "metadata_expire=1\n") + # Remove old files from the previous build of this tag but only # before the first build is done, otherwise we would remove files # which we already build in this module build. @@ -1122,18 +1142,18 @@ mdpolicy=group:primary if os.path.exists(os.path.join(self.resultsdir, "repodata/repomd.xml")): os.remove(os.path.join(self.resultsdir, "repodata/repomd.xml")) - # Create "config" sub-directory. - self.configdir = os.path.join(self.tag_dir, "config") - if not os.path.exists(self.configdir): - os.makedirs(self.configdir) - - # Generate path to mock config and add local repository there. - self.mock_config = os.path.join(self.configdir, "mock.cfg") - self._add_repo("localrepo", "file://" + self.resultsdir, "metadata_expire=1\n") + # Remove old config files from config directory. + for name in os.listdir(self.configdir): + os.remove(os.path.join(self.configdir, name)) log.info("MockModuleBuilder initialized, tag_name=%s, tag_dir=%s" % (tag_name, self.tag_dir)) + @property + def module_build_tag(self): + # Workaround koji specific code in modules.py + return {"name": self.tag_name} + def _createrepo(self): """ Creates the repository using "createrepo_c" command in the resultsdir. @@ -1187,16 +1207,26 @@ mdpolicy=group:primary def _write_mock_config(self): """ - Writes Mock config file to self.configdir/mock.cfg. + Writes Mock config file to local file. """ config = str(MockModuleBuilder.MOCK_CONFIG_TEMPLATE) - config = config.replace("$root", self.tag_name) + config = config.replace("$root", "%s-%s" % (self.tag_name, + str(threading.current_thread().name))) config = config.replace("$arch", self.arch) config = config.replace("$group", " ".join(self.groups)) config = config.replace("$yum_conf", self.yum_conf) - with open(os.path.join(self.configdir, "mock.cfg"), 'w') as f: + # We write the most recent config to "mock.cfg", so thread-related + # configs can be later (re-)generated from it using _load_mock_config. + outfile = os.path.join(self.configdir, "mock.cfg") + with open(outfile, 'w') as f: + f.write(config) + + # Write the config to thread-related configuration file. + outfile = os.path.join(self.configdir, "mock-%s.cfg" % + str(threading.current_thread().name)) + with open(outfile, 'w') as f: f.write(config) def buildroot_connect(self, groups): @@ -1225,7 +1255,7 @@ mdpolicy=group:primary # right source RPM into the buildroot here, but we do not track # what RPMs are output of particular SRPM build yet. for artifact in artifacts: - if artifact.startswith("module-build-macros"): + if artifact and artifact.startswith("module-build-macros"): self._load_mock_config() self.groups.append("module-build-macros") self._write_mock_config() @@ -1267,35 +1297,57 @@ mdpolicy=group:primary ) module_build_service.scheduler.consumer.work_queue_put(msg) - def _save_log(self, log_name, artifact_name): - old_log = os.path.join(self.resultsdir, log_name) - new_log = os.path.join(self.resultsdir, artifact_name + "-" + log_name) + def _save_log(self, resultsdir, log_name, artifact_name): + old_log = os.path.join(resultsdir, log_name) + new_log = os.path.join(resultsdir, artifact_name + "-" + log_name) if os.path.exists(old_log): os.rename(old_log, new_log) - def build_srpm(self, artifact_name, source): + def build_srpm(self, artifact_name, source, build_id): """ Builds the artifact from the SRPM. """ + state = koji.BUILD_STATES['BUILDING'] + + # Use the mock config associated with this thread. + mock_config = os.path.join(self.configdir, + "mock-%s.cfg" % str(threading.current_thread().name)) + + # Clear resultsdir associated with this thread or in case it does not + # exist, create it. + resultsdir = os.path.join(self.resultsdir, + str(threading.current_thread().name)) + if os.path.exists(resultsdir): + for name in os.listdir(resultsdir): + os.remove(os.path.join(resultsdir, name)) + else: + os.makedirs(resultsdir) + + # Open the logs to which we will forward mock stdout/stderr. + mock_stdout_log = open(os.path.join(self.resultsdir, + artifact_name + "-mock-stdout.log"), "w") + mock_stderr_log = open(os.path.join(self.resultsdir, + artifact_name + "-mock-stderr.log"), "w") + try: # Initialize mock. - _execute_cmd(["mock", "-r", self.mock_config, "--init"]) + _execute_cmd(["mock", "-v", "-r", mock_config, "--init"], + stdout=mock_stdout_log, stderr=mock_stderr_log) # Start the build and store results to resultsdir - # TODO: Maybe this should not block in the future, but for local - # builds it is not a big problem. - _execute_cmd(["mock", "-r", self.mock_config, + _execute_cmd(["mock", "-v", "-r", mock_config, "--no-clean", "--rebuild", source, - "--resultdir=%s" % self.resultsdir]) + "--resultdir=%s" % resultsdir], + stdout=mock_stdout_log, stderr=mock_stderr_log) # Emit messages simulating complete build. These messages # are put in the scheduler's work queue and are handled # by MBS after the build_srpm() method returns and scope gets # back to scheduler.main.main() method. - self._send_build_change(koji.BUILD_STATES['COMPLETE'], source, - MockModuleBuilder._build_id) + state = koji.BUILD_STATES['COMPLETE'] + self._send_build_change(state, source, build_id) - with open(os.path.join(self.resultsdir, "status.log"), 'w') as f: + with open(os.path.join(resultsdir, "status.log"), 'w') as f: f.write("complete\n") except Exception as e: log.error("Error while building artifact %s: %s" % (artifact_name, @@ -1305,32 +1357,54 @@ mdpolicy=group:primary # are put in the scheduler's work queue and are handled # by MBS after the build_srpm() method returns and scope gets # back to scheduler.main.main() method. - self._send_build_change(koji.BUILD_STATES['FAILED'], source, - MockModuleBuilder._build_id) - with open(os.path.join(self.resultsdir, "status.log"), 'w') as f: + state = koji.BUILD_STATES['FAILED'] + self._send_build_change(state, source, + build_id) + with open(os.path.join(resultsdir, "status.log"), 'w') as f: f.write("failed\n") - self._save_log("state.log", artifact_name) - self._save_log("root.log", artifact_name) - self._save_log("build.log", artifact_name) - self._save_log("status.log", artifact_name) + mock_stdout_log.close() + mock_stderr_log.close() - # Return the "building" state. Real state will be taken by MBS - # from the messages emitted above. - state = koji.BUILD_STATES['BUILDING'] - reason = "Submitted %s to Koji" % (artifact_name) - return MockModuleBuilder._build_id, state, reason, None + self._save_log(resultsdir, "state.log", artifact_name) + self._save_log(resultsdir, "root.log", artifact_name) + self._save_log(resultsdir, "build.log", artifact_name) + self._save_log(resultsdir, "status.log", artifact_name) + + # Copy files from thread-related resultsdire to the main resultsdir. + for name in os.listdir(resultsdir): + shutil.copyfile(os.path.join(resultsdir, name), os.path.join(self.resultsdir, name)) + + reason = "Built %s in Mock" % (artifact_name) + return build_id, state, reason, None def build(self, artifact_name, source): log.info("Starting building artifact %s: %s" % (artifact_name, source)) - MockModuleBuilder._build_id += 1 + + # Load global mock config for this module build from mock.cfg and + # generate the thread-specific mock config by writing it to fs again. + self._load_mock_config() + self._write_mock_config() + + # Get the build-id in thread-safe manner. + build_id = None + with MockModuleBuilder._build_id_lock: + MockModuleBuilder._build_id += 1 + build_id = int(MockModuleBuilder._build_id) # Git sources are treated specially. if source.startswith("git://"): - return build_from_scm(artifact_name, source, self.config, self.build_srpm) + # Open the srpm-stdout and srpm-stderr logs and build from SCM. + srpm_stdout_fn = os.path.join(self.resultsdir, + artifact_name + "-srpm-stdout.log") + srpm_stderr_fn = os.path.join(self.resultsdir, + artifact_name + "-srpm-stderr.log") + with open(srpm_stdout_fn, "w") as srpm_stdout_log, open(srpm_stderr_fn, "w") as srpm_stderr_log: + return build_from_scm(artifact_name, source, + self.config, self.build_srpm, data=build_id, + stdout=srpm_stdout_log, stderr=srpm_stderr_log) else: - return self.build_srpm(artifact_name, source) - + return self.build_srpm(artifact_name, source, build_id) @staticmethod def get_disttag_srpm(disttag): @@ -1340,14 +1414,23 @@ mdpolicy=group:primary def cancel_build(self, task_id): pass - -def build_from_scm(artifact_name, source, config, build_srpm): +def build_from_scm(artifact_name, source, config, build_srpm, + data = None, stdout=None, stderr=None): """ Builds the artifact from the SCM based source. + + :param artifact_name: Name of the artifact. + :param source: SCM URL with artifact's sources (spec file). + :param config: Config instance. + :param build_srpm: Method to call to build the RPM from the generate SRPM. + :param data: Data to be passed to the build_srpm method. + :param stdout: Python file object to which the stdout of SRPM build + command is logged. + :param stderr: Python file object to which the stderr of SRPM build + command is logged. """ - td = None - owd = os.getcwd() ret = (0, koji.BUILD_STATES["FAILED"], "Cannot create SRPM", None) + td = None try: log.debug('Cloning source URL: %s' % source) @@ -1357,22 +1440,21 @@ def build_from_scm(artifact_name, source, config, build_srpm): cod = scm.checkout(td) # Use configured command to create SRPM out of the SCM repo. - log.debug("Creating SRPM") - os.chdir(cod) - _execute_cmd(config.mock_build_srpm_cmd.split(" ")) + log.debug("Creating SRPM in %s" % cod) + _execute_cmd(config.mock_build_srpm_cmd.split(" "), + stdout=stdout, stderr=stderr, cwd=cod) # Find out the built SRPM and build it normally. for f in os.listdir(cod): if f.endswith(".src.rpm"): log.info("Created SRPM %s" % f) source = os.path.join(cod, f) - ret = build_srpm(artifact_name, source) + ret = build_srpm(artifact_name, source, data) break except Exception as e: log.error("Error while generating SRPM for artifact %s: %s" % ( artifact_name, str(e))) finally: - os.chdir(owd) try: if td is not None: shutil.rmtree(td) @@ -1384,12 +1466,32 @@ def build_from_scm(artifact_name, source, config, build_srpm): return ret -def _execute_cmd(args): - log.debug("Executing command: %s" % args) - ret = subprocess.call(args) - if ret != 0: - raise RuntimeError("Command '%s' returned non-zero value %d" - % (args, ret)) +def _execute_cmd(args, stdout = None, stderr = None, cwd = None): + """ + Executes command defined by `args`. If `stdout` or `stderr` is set to + Python file object, the stderr/stdout output is redirecter to that file. + If `cwd` is set, current working directory is set accordingly for the + executed command. + + :param args: list defining the command to execute. + :param stdout: Python file object to redirect the stdout to. + :param stderr: Python file object to redirect the stderr to. + :param cwd: string defining the current working directory for command. + :raises RuntimeError: Raised when command exits with non-zero exit code. + """ + out_log_msg = "" + if stdout: + out_log_msg += ", stdout log: %s" % stdout.name + if stderr: + out_log_msg += ", stderr log: %s" % stderr.name + + log.info("Executing command: %s%s" % (args, out_log_msg)) + proc = subprocess.Popen(args, stdout=stdout, stderr=stderr, cwd=cwd) + proc.communicate() + + if proc.returncode != 0: + err_msg = "Command '%s' returned non-zero value %d%s" % (args, proc.returncode, out_log_msg) + raise RuntimeError(err_msg) GenericBuilder.register_backend_class(KojiModuleBuilder) diff --git a/module_build_service/utils.py b/module_build_service/utils.py index c22100ec..0c17ed69 100644 --- a/module_build_service/utils.py +++ b/module_build_service/utils.py @@ -83,7 +83,6 @@ def at_concurrent_component_threshold(config, session): return False - def start_build_batch(config, module, session, builder, components=None): """ Starts a round of the build cycle for a module. @@ -95,7 +94,14 @@ def start_build_batch(config, module, session, builder, components=None): if any([c.state == koji.BUILD_STATES['BUILDING'] for c in module.component_builds]): - raise ValueError("Cannot start a batch when another is in flight.") + err_msg = "Cannot start a batch when another is in flight." + log.error(err_msg) + unbuilt_components = [ + c for c in module.component_builds + if (c.state == koji.BUILD_STATES['BUILDING']) + ] + log.error("Components in building state: %s" % str(unbuilt_components)) + raise ValueError(err_msg) # The user can either pass in a list of components to 'seed' the batch, or # if none are provided then we just select everything that hasn't @@ -111,23 +117,33 @@ def start_build_batch(config, module, session, builder, components=None): log.info("Starting build of next batch %d, %s" % (module.batch, unbuilt_components)) - for c in unbuilt_components: - if at_concurrent_component_threshold(config, session): - log.info('Concurrent build threshold met') - break + def start_build_component(c): + """ + Submits single component build to builder. Called in thread + by ThreadPool later. + """ + with models.make_session(conf) as s: + if at_concurrent_component_threshold(config, s): + log.info('Concurrent build threshold met') + return try: c.task_id, c.state, c.state_reason, c.nvr = builder.build( artifact_name=c.package, source=c.scmurl) except Exception as e: c.state = koji.BUILD_STATES['FAILED'] - c.state_reason = "Failed to submit artifact %s to Koji: %s" % (c.package, str(e)) - continue + c.state_reason = "Failed to build artifact %s: %s" % (c.package, str(e)) + return if not c.task_id and c.state == koji.BUILD_STATES['BUILDING']: c.state = koji.BUILD_STATES['FAILED'] - c.state_reason = "Failed to submit artifact %s to Koji" % (c.package) - continue + c.state_reason = ("Failed to build artifact %s: " + "Builder did not return task ID" % (c.package)) + return + + # Start build of components in this batch. + pool = ThreadPool(config.num_consecutive_builds) + pool.map(start_build_component, unbuilt_components) further_work = []