mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-13 16:59:52 +08:00
Split this up into three threads so we can spoof ourselves.
This commit is contained in:
@@ -38,6 +38,14 @@ import pprint
|
||||
import threading
|
||||
import time
|
||||
|
||||
try:
|
||||
# Py3
|
||||
import queue
|
||||
except ImportError:
|
||||
# Py2
|
||||
import Queue as queue
|
||||
|
||||
|
||||
import rida.config
|
||||
import rida.logger
|
||||
import rida.messaging
|
||||
@@ -61,16 +69,34 @@ else:
|
||||
config = rida.config.from_file()
|
||||
|
||||
|
||||
class STOP_WORK(object):
|
||||
""" A sentinel value, indicating that work should be stopped. """
|
||||
pass
|
||||
|
||||
|
||||
def module_build_state_from_msg(msg):
|
||||
state = int(msg['msg']['state'])
|
||||
# TODO better handling
|
||||
assert state in rida.BUILD_STATES.values(), "state=%s(%s) is not in %s" % (state, type(state), rida.BUILD_STATES.values())
|
||||
return state
|
||||
|
||||
class Messaging(threading.Thread):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Messaging, self).__init__(*args, **kwargs)
|
||||
class MessageIngest(threading.Thread):
|
||||
def __init__(self, outgoing_work_queue, *args, **kwargs):
|
||||
self.outgoing_work_queue = outgoing_work_queue
|
||||
super(MessageIngest, self).__init__(*args, **kwargs)
|
||||
|
||||
|
||||
def run(self):
|
||||
for msg in rida.messaging.listen(backend=config.messaging):
|
||||
self.outgoing_work_queue.put(msg)
|
||||
|
||||
|
||||
class MessageWorker(threading.Thread):
|
||||
|
||||
def __init__(self, incoming_work_queue, *args, **kwargs):
|
||||
self.incoming_work_queue = incoming_work_queue
|
||||
super(MessageWorker, self).__init__(*args, **kwargs)
|
||||
|
||||
# These are our main lookup tables for figuring out what to run in response
|
||||
# to what messaging events.
|
||||
@@ -114,7 +140,13 @@ class Messaging(threading.Thread):
|
||||
def run(self):
|
||||
self.sanity_check()
|
||||
|
||||
for msg in rida.messaging.listen(backend=config.messaging):
|
||||
while True:
|
||||
msg = self.incoming_work_queue.get()
|
||||
|
||||
if msg is STOP_WORK:
|
||||
log.info("Worker thread received STOP_WORK, shutting down...")
|
||||
break
|
||||
|
||||
try:
|
||||
self.process_message(msg)
|
||||
except Exception:
|
||||
@@ -146,7 +178,11 @@ class Messaging(threading.Thread):
|
||||
log.info(" %r: %s, %s" % (handler, msg['topic'], msg['msg_id']))
|
||||
handler(config, session, msg)
|
||||
|
||||
class Polling(threading.Thread):
|
||||
class Poller(threading.Thread):
|
||||
def __init__(self, outgoing_work_queue, *args, **kwargs):
|
||||
self.outgoing_work_queue = outgoing_work_queue
|
||||
super(Poller, self).__init__(*args, **kwargs)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
with rida.database.Database(config) as session:
|
||||
@@ -172,21 +208,6 @@ class Polling(threading.Thread):
|
||||
# TODO re-use
|
||||
|
||||
if config.system == "koji":
|
||||
def send_fail_task_msg(component_build, task_info):
|
||||
log.debug("Failing task=%r" % task_info)
|
||||
|
||||
rida.messaging.publish(
|
||||
modname='rida',
|
||||
topic='component.state.change',
|
||||
msg={
|
||||
"method": "build",
|
||||
"attribute": "state",
|
||||
"new": koji.BUILD_STATES['FAILED'],
|
||||
"task_id": component_build.task_id},
|
||||
|
||||
backend=config.messaging,
|
||||
)
|
||||
|
||||
koji_session, _ = rida.builder.KojiModuleBuilder.get_session_from_config(config)
|
||||
state = koji.BUILD_STATES['BUILDING'] # Check tasks that we track as BUILDING
|
||||
log.info("Querying tasks for statuses:")
|
||||
@@ -202,14 +223,27 @@ class Polling(threading.Thread):
|
||||
log.info("Checking status of task_id=%s" % component_build.task_id)
|
||||
task_info = koji_session.getTaskInfo(component_build.task_id)
|
||||
|
||||
if task_info['state'] in (koji.TASK_STATES['CANCELED'], koji.TASK_STATES['FAILED']):
|
||||
send_fail_task_msg(component_build, task_info)
|
||||
dead_states = (
|
||||
koji.TASK_STATES['CANCELED'],
|
||||
koji.TASK_STATES['FAILED'],
|
||||
)
|
||||
if task_info['state'] in dead_states:
|
||||
# Fake a fedmsg message on our internal queue
|
||||
self.outgoing_work_queue.put({
|
||||
'topic': 'org.fedoraproject.prod.buildsys.build.state.change',
|
||||
'msg': {
|
||||
'task_id': component_build.task_id,
|
||||
'new': koji.BUILD_STATES['FAILED'],
|
||||
},
|
||||
})
|
||||
|
||||
else:
|
||||
raise NotImplementedError("Buildsystem %r is not supported." % config.system)
|
||||
|
||||
def log_summary(self, session):
|
||||
log.info("Current status:")
|
||||
backlog = self.outgoing_work_queue.qsize()
|
||||
log.info(" * internal queue backlog is %i." % backlog)
|
||||
states = sorted(rida.BUILD_STATES.items(), key=operator.itemgetter(1))
|
||||
for name, code in states:
|
||||
query = session.query(rida.database.ModuleBuild)
|
||||
@@ -247,10 +281,19 @@ def main():
|
||||
rida.logger.init_logging(config)
|
||||
log.info("Starting ridad.")
|
||||
try:
|
||||
messaging_thread = Messaging()
|
||||
polling_thread = Polling()
|
||||
work_queue = queue.Queue()
|
||||
|
||||
# This ingest thread puts work on the queue
|
||||
messaging_thread = MessageIngest(work_queue)
|
||||
# This poller does other work, but also sometimes puts work in queue.
|
||||
polling_thread = Poller(work_queue)
|
||||
# This worker takes work off the queue and handles it.
|
||||
worker_thread = MessageWorker(work_queue)
|
||||
|
||||
messaging_thread.start()
|
||||
polling_thread.start()
|
||||
worker_thread.start()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
# FIXME: Make this less brutal
|
||||
os._exit()
|
||||
|
||||
Reference in New Issue
Block a user