From 64d96966e2ab06686a7b81e33dce4c1333826b28 Mon Sep 17 00:00:00 2001 From: Jan Kaluza Date: Fri, 17 Feb 2017 12:12:07 +0100 Subject: [PATCH] Use Queue based ThreadPool instead of the default Python's one. Improve locking and debugging in the Mock backend. --- module_build_service/builder.py | 57 ++++++++++++++++-------------- module_build_service/logger.py | 2 +- module_build_service/utils.py | 62 +++++++++++++++++++++++++++++---- 3 files changed, 87 insertions(+), 34 deletions(-) diff --git a/module_build_service/builder.py b/module_build_service/builder.py index 79ee5547..fce78234 100644 --- a/module_build_service/builder.py +++ b/module_build_service/builder.py @@ -1124,6 +1124,7 @@ class MockModuleBuilder(GenericBuilder): # Global build_id/task_id we increment when new build is executed. _build_id_lock = threading.Lock() _build_id = 1 + _config_lock = threading.Lock() MOCK_CONFIG_TEMPLATE = """ config_opts['root'] = '$root' @@ -1134,6 +1135,7 @@ config_opts['dist'] = '' config_opts['extra_chroot_dirs'] = [ '/run/lock', ] config_opts['releasever'] = '' config_opts['package_manager'] = 'dnf' +config_opts['nosync'] = True config_opts['yum.conf'] = \"\"\" $yum_conf @@ -1251,41 +1253,43 @@ mdpolicy=group:primary if MockModuleBuilder._build_id == 1: return - infile = os.path.join(self.configdir, "mock.cfg") - with open(infile, 'r') as f: - # This looks scary, but it is the way how mock itself loads the - # config file ... - config_opts = {} - code = compile(f.read(), infile, 'exec') - # pylint: disable=exec-used - exec(code) + with MockModuleBuilder._config_lock: + infile = os.path.join(self.configdir, "mock.cfg") + with open(infile, 'r') as f: + # This looks scary, but it is the way how mock itself loads the + # config file ... + config_opts = {} + code = compile(f.read(), infile, 'exec') + # pylint: disable=exec-used + exec(code) - self.groups = config_opts["chroot_setup_cmd"].split(" ")[1:] - self.yum_conf = config_opts['yum.conf'] + self.groups = config_opts["chroot_setup_cmd"].split(" ")[1:] + self.yum_conf = config_opts['yum.conf'] def _write_mock_config(self): """ Writes Mock config file to local file. """ - config = str(MockModuleBuilder.MOCK_CONFIG_TEMPLATE) - config = config.replace("$root", "%s-%s" % (self.tag_name, - str(threading.current_thread().name))) - config = config.replace("$arch", self.arch) - config = config.replace("$group", " ".join(self.groups)) - config = config.replace("$yum_conf", self.yum_conf) + with MockModuleBuilder._config_lock: + config = str(MockModuleBuilder.MOCK_CONFIG_TEMPLATE) + config = config.replace("$root", "%s-%s" % (self.tag_name, + str(threading.current_thread().name))) + config = config.replace("$arch", self.arch) + config = config.replace("$group", " ".join(self.groups)) + config = config.replace("$yum_conf", self.yum_conf) - # We write the most recent config to "mock.cfg", so thread-related - # configs can be later (re-)generated from it using _load_mock_config. - outfile = os.path.join(self.configdir, "mock.cfg") - with open(outfile, 'w') as f: - f.write(config) + # We write the most recent config to "mock.cfg", so thread-related + # configs can be later (re-)generated from it using _load_mock_config. + outfile = os.path.join(self.configdir, "mock.cfg") + with open(outfile, 'w') as f: + f.write(config) - # Write the config to thread-related configuration file. - outfile = os.path.join(self.configdir, "mock-%s.cfg" % - str(threading.current_thread().name)) - with open(outfile, 'w') as f: - f.write(config) + # Write the config to thread-related configuration file. + outfile = os.path.join(self.configdir, "mock-%s.cfg" % + str(threading.current_thread().name)) + with open(outfile, 'w') as f: + f.write(config) def buildroot_connect(self, groups): self._load_mock_config() @@ -1525,6 +1529,7 @@ def build_from_scm(artifact_name, source, config, build_srpm, except Exception as e: log.error("Error while generating SRPM for artifact %s: %s" % ( artifact_name, str(e))) + ret = (0, koji.BUILD_STATES["FAILED"], "Cannot create SRPM %s" % str(e), None) finally: try: if td is not None: diff --git a/module_build_service/logger.py b/module_build_service/logger.py index eb48cf8f..399415bc 100644 --- a/module_build_service/logger.py +++ b/module_build_service/logger.py @@ -69,7 +69,7 @@ def init_logging(conf): """ Initializes logging according to configuration file. """ - log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + log_format = '%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s' log_backend = conf.log_backend if not log_backend or len(log_backend) == 0 or log_backend == "console": diff --git a/module_build_service/utils.py b/module_build_service/utils.py index a11a16ea..9043bb00 100644 --- a/module_build_service/utils.py +++ b/module_build_service/utils.py @@ -42,6 +42,53 @@ from module_build_service.errors import (Unauthorized, Conflict) import module_build_service.messaging from multiprocessing.dummy import Pool as ThreadPool +try: + from Queue import Queue +except: + from queue import Queue + +from threading import Thread + +class Worker(Thread): + """ Thread executing tasks from a given tasks queue """ + def __init__(self, tasks): + Thread.__init__(self) + self.tasks = tasks + self.daemon = True + self.start() + + def run(self): + while True: + func, args, kargs = self.tasks.get() + try: + func(*args, **kargs) + except Exception as e: + # An exception happened in this thread + log.error(str(e)) + finally: + # Mark this task as done, whether an exception happened or not + self.tasks.task_done() + + +class QueueBasedThreadPool: + """ Pool of threads consuming tasks from a queue. """ + def __init__(self, num_threads): + self.tasks = Queue(num_threads) + for _ in range(num_threads): + Worker(self.tasks) + + def add_task(self, func, *args, **kargs): + """ Add a task to the queue """ + self.tasks.put((func, args, kargs)) + + def map(self, func, args_list): + """ Add a list of tasks to the queue """ + for args in args_list: + self.add_task(func, args) + + def wait_completion(self): + """ Wait for completion of all the tasks in the queue """ + self.tasks.join() def retry(timeout=conf.net_timeout, interval=conf.net_retry_interval, wait_on=Exception): """ A decorator that allows to retry a section of code... @@ -137,14 +184,14 @@ def start_build_batch(config, module, session, builder, components=None): def start_build_component(c): """ Submits single component build to builder. Called in thread - by ThreadPool later. + by QueueBasedThreadPool later. """ - with models.make_session(conf) as s: - if at_concurrent_component_threshold(config, s): - log.info('Concurrent build threshold met') - return - try: + with models.make_session(conf) as s: + if at_concurrent_component_threshold(config, s): + log.info('Concurrent build threshold met') + return + c.task_id, c.state, c.state_reason, c.nvr = builder.build( artifact_name=c.package, source=c.scmurl) except Exception as e: @@ -159,8 +206,9 @@ def start_build_batch(config, module, session, builder, components=None): return # Start build of components in this batch. - pool = ThreadPool(config.num_consecutive_builds) + pool = QueueBasedThreadPool(config.num_consecutive_builds) pool.map(start_build_component, unbuilt_components) + pool.wait_completion() further_work = []