From 033a0da5ba94841240f2026ac52dd2dd3ac21a31 Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Wed, 20 Jul 2016 10:15:28 -0400 Subject: [PATCH] Split this up into three threads so we can spoof ourselves. --- rida/scheduler/main.py | 91 +++++++++++++++++++++++++++++++----------- 1 file changed, 67 insertions(+), 24 deletions(-) diff --git a/rida/scheduler/main.py b/rida/scheduler/main.py index 3854588e..fd8a8924 100644 --- a/rida/scheduler/main.py +++ b/rida/scheduler/main.py @@ -38,6 +38,14 @@ import pprint import threading import time +try: + # Py3 + import queue +except ImportError: + # Py2 + import Queue as queue + + import rida.config import rida.logger import rida.messaging @@ -61,16 +69,34 @@ else: config = rida.config.from_file() +class STOP_WORK(object): + """ A sentinel value, indicating that work should be stopped. """ + pass + + def module_build_state_from_msg(msg): state = int(msg['msg']['state']) # TODO better handling assert state in rida.BUILD_STATES.values(), "state=%s(%s) is not in %s" % (state, type(state), rida.BUILD_STATES.values()) return state -class Messaging(threading.Thread): - def __init__(self, *args, **kwargs): - super(Messaging, self).__init__(*args, **kwargs) +class MessageIngest(threading.Thread): + def __init__(self, outgoing_work_queue, *args, **kwargs): + self.outgoing_work_queue = outgoing_work_queue + super(MessageIngest, self).__init__(*args, **kwargs) + + + def run(self): + for msg in rida.messaging.listen(backend=config.messaging): + self.outgoing_work_queue.put(msg) + + +class MessageWorker(threading.Thread): + + def __init__(self, incoming_work_queue, *args, **kwargs): + self.incoming_work_queue = incoming_work_queue + super(MessageWorker, self).__init__(*args, **kwargs) # These are our main lookup tables for figuring out what to run in response # to what messaging events. @@ -114,7 +140,13 @@ class Messaging(threading.Thread): def run(self): self.sanity_check() - for msg in rida.messaging.listen(backend=config.messaging): + while True: + msg = self.incoming_work_queue.get() + + if msg is STOP_WORK: + log.info("Worker thread received STOP_WORK, shutting down...") + break + try: self.process_message(msg) except Exception: @@ -146,7 +178,11 @@ class Messaging(threading.Thread): log.info(" %r: %s, %s" % (handler, msg['topic'], msg['msg_id'])) handler(config, session, msg) -class Polling(threading.Thread): +class Poller(threading.Thread): + def __init__(self, outgoing_work_queue, *args, **kwargs): + self.outgoing_work_queue = outgoing_work_queue + super(Poller, self).__init__(*args, **kwargs) + def run(self): while True: with rida.database.Database(config) as session: @@ -172,21 +208,6 @@ class Polling(threading.Thread): # TODO re-use if config.system == "koji": - def send_fail_task_msg(component_build, task_info): - log.debug("Failing task=%r" % task_info) - - rida.messaging.publish( - modname='rida', - topic='component.state.change', - msg={ - "method": "build", - "attribute": "state", - "new": koji.BUILD_STATES['FAILED'], - "task_id": component_build.task_id}, - - backend=config.messaging, - ) - koji_session, _ = rida.builder.KojiModuleBuilder.get_session_from_config(config) state = koji.BUILD_STATES['BUILDING'] # Check tasks that we track as BUILDING log.info("Querying tasks for statuses:") @@ -202,14 +223,27 @@ class Polling(threading.Thread): log.info("Checking status of task_id=%s" % component_build.task_id) task_info = koji_session.getTaskInfo(component_build.task_id) - if task_info['state'] in (koji.TASK_STATES['CANCELED'], koji.TASK_STATES['FAILED']): - send_fail_task_msg(component_build, task_info) + dead_states = ( + koji.TASK_STATES['CANCELED'], + koji.TASK_STATES['FAILED'], + ) + if task_info['state'] in dead_states: + # Fake a fedmsg message on our internal queue + self.outgoing_work_queue.put({ + 'topic': 'org.fedoraproject.prod.buildsys.build.state.change', + 'msg': { + 'task_id': component_build.task_id, + 'new': koji.BUILD_STATES['FAILED'], + }, + }) else: raise NotImplementedError("Buildsystem %r is not supported." % config.system) def log_summary(self, session): log.info("Current status:") + backlog = self.outgoing_work_queue.qsize() + log.info(" * internal queue backlog is %i." % backlog) states = sorted(rida.BUILD_STATES.items(), key=operator.itemgetter(1)) for name, code in states: query = session.query(rida.database.ModuleBuild) @@ -247,10 +281,19 @@ def main(): rida.logger.init_logging(config) log.info("Starting ridad.") try: - messaging_thread = Messaging() - polling_thread = Polling() + work_queue = queue.Queue() + + # This ingest thread puts work on the queue + messaging_thread = MessageIngest(work_queue) + # This poller does other work, but also sometimes puts work in queue. + polling_thread = Poller(work_queue) + # This worker takes work off the queue and handles it. + worker_thread = MessageWorker(work_queue) + messaging_thread.start() polling_thread.start() + worker_thread.start() + except KeyboardInterrupt: # FIXME: Make this less brutal os._exit()