mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-02-09 00:03:17 +08:00
Merge #1479 Minor fixes to consume and process_message
This commit is contained in:
@@ -158,7 +158,6 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
raise
|
||||
except Exception:
|
||||
monitor.messaging_rx_failed_counter.inc()
|
||||
log.exception("Failed while handling {0!r}".format(msg))
|
||||
|
||||
if self.stop_condition and self.stop_condition(message):
|
||||
self.shutdown()
|
||||
@@ -194,63 +193,83 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
raise ValueError(
|
||||
"Callback %r, state %r has argspec %r!=%r" % (callback, key, argspec, expected))
|
||||
|
||||
def process_message(self, db_session, msg):
|
||||
# set module build to None and let's populate it later
|
||||
build = None
|
||||
def _map_message(self, db_session, msg):
|
||||
"""Map message to its corresponding event handler and module build"""
|
||||
|
||||
# Choose a handler for this message
|
||||
if isinstance(msg, module_build_service.messaging.KojiBuildChange):
|
||||
handler = self.on_build_change[msg.build_new_state]
|
||||
build = models.ComponentBuild.from_component_event(db_session, msg)
|
||||
if build:
|
||||
build = build.module_build
|
||||
elif type(msg) == module_build_service.messaging.KojiRepoChange:
|
||||
handler = self.on_repo_change
|
||||
build = models.ModuleBuild.from_repo_done_event(db_session, msg)
|
||||
elif type(msg) == module_build_service.messaging.KojiTagChange:
|
||||
handler = self.on_tag_change
|
||||
build = models.ModuleBuild.from_tag_change_event(db_session, msg)
|
||||
elif type(msg) == module_build_service.messaging.MBSModule:
|
||||
handler = self.on_module_change[module_build_state_from_msg(msg)]
|
||||
build = models.ModuleBuild.from_module_event(db_session, msg)
|
||||
elif type(msg) == module_build_service.messaging.GreenwaveDecisionUpdate:
|
||||
handler = self.on_decision_update
|
||||
build = greenwave.get_corresponding_module_build(db_session, msg.subject_identifier)
|
||||
else:
|
||||
return handler, build
|
||||
|
||||
if isinstance(msg, module_build_service.messaging.KojiRepoChange):
|
||||
return (
|
||||
self.on_repo_change,
|
||||
models.ModuleBuild.from_repo_done_event(db_session, msg)
|
||||
)
|
||||
|
||||
if isinstance(msg, module_build_service.messaging.KojiTagChange):
|
||||
return (
|
||||
self.on_tag_change,
|
||||
models.ModuleBuild.from_tag_change_event(db_session, msg)
|
||||
)
|
||||
|
||||
if isinstance(msg, module_build_service.messaging.MBSModule):
|
||||
return (
|
||||
self.on_module_change[module_build_state_from_msg(msg)],
|
||||
models.ModuleBuild.from_module_event(db_session, msg)
|
||||
)
|
||||
|
||||
if isinstance(msg, module_build_service.messaging.GreenwaveDecisionUpdate):
|
||||
return (
|
||||
self.on_decision_update,
|
||||
greenwave.get_corresponding_module_build(
|
||||
db_session, msg.subject_identifier)
|
||||
)
|
||||
|
||||
return None, None
|
||||
|
||||
def process_message(self, db_session, msg):
|
||||
# Choose a handler for this message
|
||||
handler, build = self._map_message(db_session, msg)
|
||||
|
||||
if handler is None:
|
||||
log.debug("No event handler associated with msg %s", msg.msg_id)
|
||||
return
|
||||
|
||||
idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__, msg.msg_id)
|
||||
|
||||
if handler is self.NO_OP:
|
||||
log.debug("Handler is NO_OP: %s", idx)
|
||||
return
|
||||
|
||||
if not build:
|
||||
log.debug("No module associated with msg {}".format(msg.msg_id))
|
||||
log.debug("No module associated with msg %s", msg.msg_id)
|
||||
return
|
||||
|
||||
MBSConsumer.current_module_build_id = build.id
|
||||
|
||||
# Execute our chosen handler
|
||||
idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__, msg.msg_id)
|
||||
if handler is self.NO_OP:
|
||||
log.debug("Handler is NO_OP: %s" % idx)
|
||||
log.info("Calling %s", idx)
|
||||
|
||||
try:
|
||||
further_work = handler(conf, db_session, msg) or []
|
||||
except Exception as e:
|
||||
log.exception()
|
||||
db_session.rollback()
|
||||
db_session.refresh(build)
|
||||
build.transition(
|
||||
db_session,
|
||||
conf,
|
||||
state=models.BUILD_STATES["failed"],
|
||||
state_reason=str(e),
|
||||
failure_type="infra",
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
# Allow caller to do something when error is occurred.
|
||||
raise
|
||||
else:
|
||||
log.info("Calling %s" % idx)
|
||||
further_work = []
|
||||
try:
|
||||
further_work = handler(conf, db_session, msg) or []
|
||||
except Exception as e:
|
||||
msg = "Could not process message handler. See the traceback."
|
||||
log.exception(msg)
|
||||
db_session.rollback()
|
||||
if build:
|
||||
db_session.refresh(build)
|
||||
build.transition(
|
||||
db_session,
|
||||
conf,
|
||||
state=models.BUILD_STATES["failed"],
|
||||
state_reason=str(e),
|
||||
failure_type="infra",
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
log.debug("Done with %s" % idx)
|
||||
|
||||
# Handlers can *optionally* return a list of fake messages that
|
||||
# should be re-inserted back into the main work queue. We can use
|
||||
# this (for instance) when we submit a new component build but (for
|
||||
@@ -258,10 +277,11 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
# completion back to the scheduler so that work resumes as if it
|
||||
# was submitted for real and koji announced its completion.
|
||||
for event in further_work:
|
||||
log.info(" Scheduling faked event %r" % event)
|
||||
log.info(" Scheduling faked event %r", event)
|
||||
self.incoming.put(event)
|
||||
|
||||
MBSConsumer.current_module_build_id = None
|
||||
finally:
|
||||
MBSConsumer.current_module_build_id = None
|
||||
log.debug("Done with %s", idx)
|
||||
|
||||
|
||||
def get_global_consumer():
|
||||
|
||||
Reference in New Issue
Block a user