diff --git a/module_build_service/scheduler/main.py b/module_build_service/scheduler/main.py index 2cfa01da..274726e9 100644 --- a/module_build_service/scheduler/main.py +++ b/module_build_service/scheduler/main.py @@ -70,7 +70,6 @@ class MessageIngest(threading.Thread): super(MessageIngest, self).__init__(*args, **kwargs) self.stop_after_build = stop_after_build - def run(self): for msg in module_build_service.messaging.listen(conf): self.outgoing_work_queue.put(msg) @@ -103,7 +102,7 @@ class MessageWorker(threading.Thread): models.BUILD_STATES["wait"]: module_build_service.scheduler.handlers.modules.wait, models.BUILD_STATES["build"]: NO_OP, models.BUILD_STATES["failed"]: module_build_service.scheduler.handlers.modules.failed, - models.BUILD_STATES["done"]: module_build_service.scheduler.handlers.modules.done, # XXX: DIRECT TRANSITION TO READY + models.BUILD_STATES["done"]: module_build_service.scheduler.handlers.modules.done, # XXX: DIRECT TRANSITION TO READY models.BUILD_STATES["ready"]: NO_OP, } # Only one kind of repo change event, though... @@ -223,7 +222,7 @@ class Poller(threading.Thread): log.info("Checking status for %d tasks." % len(res)) for component_build in res: log.debug(component_build.json()) - if not component_build.task_id: # Don't check tasks which has not been triggered yet + if not component_build.task_id: # Don't check tasks which has not been triggered yet continue log.info("Checking status of task_id=%s" % component_build.task_id) @@ -269,7 +268,7 @@ class Poller(threading.Thread): log.info(" * %r" % module_build) for i in range(module_build.batch): n = len([c for c in module_build.component_builds - if c.batch == i ]) + if c.batch == i]) log.info(" * %i components in batch %i" % (n, i)) def process_waiting_module_builds(self, session): @@ -307,30 +306,48 @@ class Poller(threading.Thread): _work_queue = queue.Queue() + def outgoing_work_queue_put(msg): _work_queue.put(msg) -def main(initial_msgs = [], return_after_build = False): + +def graceful_stop(): + """ + Here is the place to perform shutdown actions. + + Do whatever is needed to do except for leaving the main thread, which + would result in losing control of POSIX signals handling. + """ + log.warning("graceful_stop is not yet implemented, press Ctrl+C again...") + while True: + time.sleep(30) + + +def main(initial_msgs=[], return_after_build=False): log.info("Starting module_build_service_daemon.") for msg in initial_msgs: outgoing_work_queue_put(msg) + # This ingest thread puts work on the queue + messaging_thread = MessageIngest(_work_queue, return_after_build) + # This poller does other work, but also sometimes puts work in queue. + polling_thread = Poller(_work_queue) + # This worker takes work off the queue and handles it. + worker_thread = MessageWorker(_work_queue, return_after_build) + + messaging_thread.start() + polling_thread.start() + worker_thread.start() + try: - # This ingest thread puts work on the queue - messaging_thread = MessageIngest(_work_queue, return_after_build) - # This poller does other work, but also sometimes puts work in queue. - polling_thread = Poller(_work_queue) - # This worker takes work off the queue and handles it. - worker_thread = MessageWorker(_work_queue, return_after_build) - - messaging_thread.start() - polling_thread.start() - worker_thread.start() - - worker_thread.join() - polling_thread.stop = True - + while worker_thread.is_alive(): + time.sleep(3) except KeyboardInterrupt: - # FIXME: Make this less brutal - os._exit(0) + log.info("Stopping module_build_service_daemon. Press Ctrl+C again to force.") + try: + graceful_stop() + except KeyboardInterrupt: + os._exit(0) + finally: + polling_thread.stop = True