mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-13 21:29:57 +08:00
Use Queue based ThreadPool instead of the default Python's one. Improve locking and debugging in the Mock backend.
This commit is contained in:
@@ -1124,6 +1124,7 @@ class MockModuleBuilder(GenericBuilder):
|
||||
# Global build_id/task_id we increment when new build is executed.
|
||||
_build_id_lock = threading.Lock()
|
||||
_build_id = 1
|
||||
_config_lock = threading.Lock()
|
||||
|
||||
MOCK_CONFIG_TEMPLATE = """
|
||||
config_opts['root'] = '$root'
|
||||
@@ -1134,6 +1135,7 @@ config_opts['dist'] = ''
|
||||
config_opts['extra_chroot_dirs'] = [ '/run/lock', ]
|
||||
config_opts['releasever'] = ''
|
||||
config_opts['package_manager'] = 'dnf'
|
||||
config_opts['nosync'] = True
|
||||
|
||||
config_opts['yum.conf'] = \"\"\"
|
||||
$yum_conf
|
||||
@@ -1251,41 +1253,43 @@ mdpolicy=group:primary
|
||||
if MockModuleBuilder._build_id == 1:
|
||||
return
|
||||
|
||||
infile = os.path.join(self.configdir, "mock.cfg")
|
||||
with open(infile, 'r') as f:
|
||||
# This looks scary, but it is the way how mock itself loads the
|
||||
# config file ...
|
||||
config_opts = {}
|
||||
code = compile(f.read(), infile, 'exec')
|
||||
# pylint: disable=exec-used
|
||||
exec(code)
|
||||
with MockModuleBuilder._config_lock:
|
||||
infile = os.path.join(self.configdir, "mock.cfg")
|
||||
with open(infile, 'r') as f:
|
||||
# This looks scary, but it is the way how mock itself loads the
|
||||
# config file ...
|
||||
config_opts = {}
|
||||
code = compile(f.read(), infile, 'exec')
|
||||
# pylint: disable=exec-used
|
||||
exec(code)
|
||||
|
||||
self.groups = config_opts["chroot_setup_cmd"].split(" ")[1:]
|
||||
self.yum_conf = config_opts['yum.conf']
|
||||
self.groups = config_opts["chroot_setup_cmd"].split(" ")[1:]
|
||||
self.yum_conf = config_opts['yum.conf']
|
||||
|
||||
def _write_mock_config(self):
|
||||
"""
|
||||
Writes Mock config file to local file.
|
||||
"""
|
||||
|
||||
config = str(MockModuleBuilder.MOCK_CONFIG_TEMPLATE)
|
||||
config = config.replace("$root", "%s-%s" % (self.tag_name,
|
||||
str(threading.current_thread().name)))
|
||||
config = config.replace("$arch", self.arch)
|
||||
config = config.replace("$group", " ".join(self.groups))
|
||||
config = config.replace("$yum_conf", self.yum_conf)
|
||||
with MockModuleBuilder._config_lock:
|
||||
config = str(MockModuleBuilder.MOCK_CONFIG_TEMPLATE)
|
||||
config = config.replace("$root", "%s-%s" % (self.tag_name,
|
||||
str(threading.current_thread().name)))
|
||||
config = config.replace("$arch", self.arch)
|
||||
config = config.replace("$group", " ".join(self.groups))
|
||||
config = config.replace("$yum_conf", self.yum_conf)
|
||||
|
||||
# We write the most recent config to "mock.cfg", so thread-related
|
||||
# configs can be later (re-)generated from it using _load_mock_config.
|
||||
outfile = os.path.join(self.configdir, "mock.cfg")
|
||||
with open(outfile, 'w') as f:
|
||||
f.write(config)
|
||||
# We write the most recent config to "mock.cfg", so thread-related
|
||||
# configs can be later (re-)generated from it using _load_mock_config.
|
||||
outfile = os.path.join(self.configdir, "mock.cfg")
|
||||
with open(outfile, 'w') as f:
|
||||
f.write(config)
|
||||
|
||||
# Write the config to thread-related configuration file.
|
||||
outfile = os.path.join(self.configdir, "mock-%s.cfg" %
|
||||
str(threading.current_thread().name))
|
||||
with open(outfile, 'w') as f:
|
||||
f.write(config)
|
||||
# Write the config to thread-related configuration file.
|
||||
outfile = os.path.join(self.configdir, "mock-%s.cfg" %
|
||||
str(threading.current_thread().name))
|
||||
with open(outfile, 'w') as f:
|
||||
f.write(config)
|
||||
|
||||
def buildroot_connect(self, groups):
|
||||
self._load_mock_config()
|
||||
@@ -1525,6 +1529,7 @@ def build_from_scm(artifact_name, source, config, build_srpm,
|
||||
except Exception as e:
|
||||
log.error("Error while generating SRPM for artifact %s: %s" % (
|
||||
artifact_name, str(e)))
|
||||
ret = (0, koji.BUILD_STATES["FAILED"], "Cannot create SRPM %s" % str(e), None)
|
||||
finally:
|
||||
try:
|
||||
if td is not None:
|
||||
|
||||
@@ -69,7 +69,7 @@ def init_logging(conf):
|
||||
"""
|
||||
Initializes logging according to configuration file.
|
||||
"""
|
||||
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
log_format = '%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s'
|
||||
log_backend = conf.log_backend
|
||||
|
||||
if not log_backend or len(log_backend) == 0 or log_backend == "console":
|
||||
|
||||
@@ -42,6 +42,53 @@ from module_build_service.errors import (Unauthorized, Conflict)
|
||||
import module_build_service.messaging
|
||||
from multiprocessing.dummy import Pool as ThreadPool
|
||||
|
||||
try:
|
||||
from Queue import Queue
|
||||
except:
|
||||
from queue import Queue
|
||||
|
||||
from threading import Thread
|
||||
|
||||
class Worker(Thread):
|
||||
""" Thread executing tasks from a given tasks queue """
|
||||
def __init__(self, tasks):
|
||||
Thread.__init__(self)
|
||||
self.tasks = tasks
|
||||
self.daemon = True
|
||||
self.start()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
func, args, kargs = self.tasks.get()
|
||||
try:
|
||||
func(*args, **kargs)
|
||||
except Exception as e:
|
||||
# An exception happened in this thread
|
||||
log.error(str(e))
|
||||
finally:
|
||||
# Mark this task as done, whether an exception happened or not
|
||||
self.tasks.task_done()
|
||||
|
||||
|
||||
class QueueBasedThreadPool:
|
||||
""" Pool of threads consuming tasks from a queue. """
|
||||
def __init__(self, num_threads):
|
||||
self.tasks = Queue(num_threads)
|
||||
for _ in range(num_threads):
|
||||
Worker(self.tasks)
|
||||
|
||||
def add_task(self, func, *args, **kargs):
|
||||
""" Add a task to the queue """
|
||||
self.tasks.put((func, args, kargs))
|
||||
|
||||
def map(self, func, args_list):
|
||||
""" Add a list of tasks to the queue """
|
||||
for args in args_list:
|
||||
self.add_task(func, args)
|
||||
|
||||
def wait_completion(self):
|
||||
""" Wait for completion of all the tasks in the queue """
|
||||
self.tasks.join()
|
||||
|
||||
def retry(timeout=conf.net_timeout, interval=conf.net_retry_interval, wait_on=Exception):
|
||||
""" A decorator that allows to retry a section of code...
|
||||
@@ -137,14 +184,14 @@ def start_build_batch(config, module, session, builder, components=None):
|
||||
def start_build_component(c):
|
||||
"""
|
||||
Submits single component build to builder. Called in thread
|
||||
by ThreadPool later.
|
||||
by QueueBasedThreadPool later.
|
||||
"""
|
||||
with models.make_session(conf) as s:
|
||||
if at_concurrent_component_threshold(config, s):
|
||||
log.info('Concurrent build threshold met')
|
||||
return
|
||||
|
||||
try:
|
||||
with models.make_session(conf) as s:
|
||||
if at_concurrent_component_threshold(config, s):
|
||||
log.info('Concurrent build threshold met')
|
||||
return
|
||||
|
||||
c.task_id, c.state, c.state_reason, c.nvr = builder.build(
|
||||
artifact_name=c.package, source=c.scmurl)
|
||||
except Exception as e:
|
||||
@@ -159,8 +206,9 @@ def start_build_batch(config, module, session, builder, components=None):
|
||||
return
|
||||
|
||||
# Start build of components in this batch.
|
||||
pool = ThreadPool(config.num_consecutive_builds)
|
||||
pool = QueueBasedThreadPool(config.num_consecutive_builds)
|
||||
pool.map(start_build_component, unbuilt_components)
|
||||
pool.wait_completion()
|
||||
|
||||
further_work = []
|
||||
|
||||
|
||||
Reference in New Issue
Block a user