diff --git a/module_build_service/scheduler/consumer.py b/module_build_service/scheduler/consumer.py index da3e1228..2949c7ed 100644 --- a/module_build_service/scheduler/consumer.py +++ b/module_build_service/scheduler/consumer.py @@ -158,7 +158,6 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): raise except Exception: monitor.messaging_rx_failed_counter.inc() - log.exception("Failed while handling {0!r}".format(msg)) if self.stop_condition and self.stop_condition(message): self.shutdown() @@ -194,63 +193,83 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): raise ValueError( "Callback %r, state %r has argspec %r!=%r" % (callback, key, argspec, expected)) - def process_message(self, db_session, msg): - # set module build to None and let's populate it later - build = None + def _map_message(self, db_session, msg): + """Map message to its corresponding event handler and module build""" - # Choose a handler for this message if isinstance(msg, module_build_service.messaging.KojiBuildChange): handler = self.on_build_change[msg.build_new_state] build = models.ComponentBuild.from_component_event(db_session, msg) if build: build = build.module_build - elif type(msg) == module_build_service.messaging.KojiRepoChange: - handler = self.on_repo_change - build = models.ModuleBuild.from_repo_done_event(db_session, msg) - elif type(msg) == module_build_service.messaging.KojiTagChange: - handler = self.on_tag_change - build = models.ModuleBuild.from_tag_change_event(db_session, msg) - elif type(msg) == module_build_service.messaging.MBSModule: - handler = self.on_module_change[module_build_state_from_msg(msg)] - build = models.ModuleBuild.from_module_event(db_session, msg) - elif type(msg) == module_build_service.messaging.GreenwaveDecisionUpdate: - handler = self.on_decision_update - build = greenwave.get_corresponding_module_build(db_session, msg.subject_identifier) - else: + return handler, build + + if isinstance(msg, module_build_service.messaging.KojiRepoChange): + return ( + self.on_repo_change, + models.ModuleBuild.from_repo_done_event(db_session, msg) + ) + + if isinstance(msg, module_build_service.messaging.KojiTagChange): + return ( + self.on_tag_change, + models.ModuleBuild.from_tag_change_event(db_session, msg) + ) + + if isinstance(msg, module_build_service.messaging.MBSModule): + return ( + self.on_module_change[module_build_state_from_msg(msg)], + models.ModuleBuild.from_module_event(db_session, msg) + ) + + if isinstance(msg, module_build_service.messaging.GreenwaveDecisionUpdate): + return ( + self.on_decision_update, + greenwave.get_corresponding_module_build( + db_session, msg.subject_identifier) + ) + + return None, None + + def process_message(self, db_session, msg): + # Choose a handler for this message + handler, build = self._map_message(db_session, msg) + + if handler is None: + log.debug("No event handler associated with msg %s", msg.msg_id) + return + + idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__, msg.msg_id) + + if handler is self.NO_OP: + log.debug("Handler is NO_OP: %s", idx) return if not build: - log.debug("No module associated with msg {}".format(msg.msg_id)) + log.debug("No module associated with msg %s", msg.msg_id) return MBSConsumer.current_module_build_id = build.id - # Execute our chosen handler - idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__, msg.msg_id) - if handler is self.NO_OP: - log.debug("Handler is NO_OP: %s" % idx) + log.info("Calling %s", idx) + + try: + further_work = handler(conf, db_session, msg) or [] + except Exception as e: + log.exception() + db_session.rollback() + db_session.refresh(build) + build.transition( + db_session, + conf, + state=models.BUILD_STATES["failed"], + state_reason=str(e), + failure_type="infra", + ) + db_session.commit() + + # Allow caller to do something when error is occurred. + raise else: - log.info("Calling %s" % idx) - further_work = [] - try: - further_work = handler(conf, db_session, msg) or [] - except Exception as e: - msg = "Could not process message handler. See the traceback." - log.exception(msg) - db_session.rollback() - if build: - db_session.refresh(build) - build.transition( - db_session, - conf, - state=models.BUILD_STATES["failed"], - state_reason=str(e), - failure_type="infra", - ) - db_session.commit() - - log.debug("Done with %s" % idx) - # Handlers can *optionally* return a list of fake messages that # should be re-inserted back into the main work queue. We can use # this (for instance) when we submit a new component build but (for @@ -258,10 +277,11 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): # completion back to the scheduler so that work resumes as if it # was submitted for real and koji announced its completion. for event in further_work: - log.info(" Scheduling faked event %r" % event) + log.info(" Scheduling faked event %r", event) self.incoming.put(event) - - MBSConsumer.current_module_build_id = None + finally: + MBSConsumer.current_module_build_id = None + log.debug("Done with %s", idx) def get_global_consumer():