mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-14 00:59:51 +08:00
@@ -70,7 +70,6 @@ class MessageIngest(threading.Thread):
|
||||
super(MessageIngest, self).__init__(*args, **kwargs)
|
||||
self.stop_after_build = stop_after_build
|
||||
|
||||
|
||||
def run(self):
|
||||
for msg in module_build_service.messaging.listen(conf):
|
||||
self.outgoing_work_queue.put(msg)
|
||||
@@ -103,7 +102,7 @@ class MessageWorker(threading.Thread):
|
||||
models.BUILD_STATES["wait"]: module_build_service.scheduler.handlers.modules.wait,
|
||||
models.BUILD_STATES["build"]: NO_OP,
|
||||
models.BUILD_STATES["failed"]: module_build_service.scheduler.handlers.modules.failed,
|
||||
models.BUILD_STATES["done"]: module_build_service.scheduler.handlers.modules.done, # XXX: DIRECT TRANSITION TO READY
|
||||
models.BUILD_STATES["done"]: module_build_service.scheduler.handlers.modules.done, # XXX: DIRECT TRANSITION TO READY
|
||||
models.BUILD_STATES["ready"]: NO_OP,
|
||||
}
|
||||
# Only one kind of repo change event, though...
|
||||
@@ -223,7 +222,7 @@ class Poller(threading.Thread):
|
||||
log.info("Checking status for %d tasks." % len(res))
|
||||
for component_build in res:
|
||||
log.debug(component_build.json())
|
||||
if not component_build.task_id: # Don't check tasks which has not been triggered yet
|
||||
if not component_build.task_id: # Don't check tasks which has not been triggered yet
|
||||
continue
|
||||
|
||||
log.info("Checking status of task_id=%s" % component_build.task_id)
|
||||
@@ -269,7 +268,7 @@ class Poller(threading.Thread):
|
||||
log.info(" * %r" % module_build)
|
||||
for i in range(module_build.batch):
|
||||
n = len([c for c in module_build.component_builds
|
||||
if c.batch == i ])
|
||||
if c.batch == i])
|
||||
log.info(" * %i components in batch %i" % (n, i))
|
||||
|
||||
def process_waiting_module_builds(self, session):
|
||||
@@ -307,30 +306,48 @@ class Poller(threading.Thread):
|
||||
|
||||
_work_queue = queue.Queue()
|
||||
|
||||
|
||||
def outgoing_work_queue_put(msg):
|
||||
_work_queue.put(msg)
|
||||
|
||||
def main(initial_msgs = [], return_after_build = False):
|
||||
|
||||
def graceful_stop():
|
||||
"""
|
||||
Here is the place to perform shutdown actions.
|
||||
|
||||
Do whatever is needed to do except for leaving the main thread, which
|
||||
would result in losing control of POSIX signals handling.
|
||||
"""
|
||||
log.warning("graceful_stop is not yet implemented, press Ctrl+C again...")
|
||||
while True:
|
||||
time.sleep(30)
|
||||
|
||||
|
||||
def main(initial_msgs=[], return_after_build=False):
|
||||
log.info("Starting module_build_service_daemon.")
|
||||
|
||||
for msg in initial_msgs:
|
||||
outgoing_work_queue_put(msg)
|
||||
|
||||
# This ingest thread puts work on the queue
|
||||
messaging_thread = MessageIngest(_work_queue, return_after_build)
|
||||
# This poller does other work, but also sometimes puts work in queue.
|
||||
polling_thread = Poller(_work_queue)
|
||||
# This worker takes work off the queue and handles it.
|
||||
worker_thread = MessageWorker(_work_queue, return_after_build)
|
||||
|
||||
messaging_thread.start()
|
||||
polling_thread.start()
|
||||
worker_thread.start()
|
||||
|
||||
try:
|
||||
# This ingest thread puts work on the queue
|
||||
messaging_thread = MessageIngest(_work_queue, return_after_build)
|
||||
# This poller does other work, but also sometimes puts work in queue.
|
||||
polling_thread = Poller(_work_queue)
|
||||
# This worker takes work off the queue and handles it.
|
||||
worker_thread = MessageWorker(_work_queue, return_after_build)
|
||||
|
||||
messaging_thread.start()
|
||||
polling_thread.start()
|
||||
worker_thread.start()
|
||||
|
||||
worker_thread.join()
|
||||
polling_thread.stop = True
|
||||
|
||||
while worker_thread.is_alive():
|
||||
time.sleep(3)
|
||||
except KeyboardInterrupt:
|
||||
# FIXME: Make this less brutal
|
||||
os._exit(0)
|
||||
log.info("Stopping module_build_service_daemon. Press Ctrl+C again to force.")
|
||||
try:
|
||||
graceful_stop()
|
||||
except KeyboardInterrupt:
|
||||
os._exit(0)
|
||||
finally:
|
||||
polling_thread.stop = True
|
||||
|
||||
Reference in New Issue
Block a user