From 3bdbdaee20615fb390ab33cb5770e95847a909af Mon Sep 17 00:00:00 2001 From: Chenxiong Qi Date: Mon, 28 Oct 2019 20:49:49 +0800 Subject: [PATCH 1/2] Remove unnecessary code from process_message It is not necessary to initialize the build variable before following if-elif-else branch. After the if-elif-else branch, if no build is found, process_message just returns immediately. So, no need to check if build is None during handling error raised from handler call. Signed-off-by: Chenxiong Qi --- module_build_service/scheduler/consumer.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/module_build_service/scheduler/consumer.py b/module_build_service/scheduler/consumer.py index da3e1228..e0f4417d 100644 --- a/module_build_service/scheduler/consumer.py +++ b/module_build_service/scheduler/consumer.py @@ -195,9 +195,6 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): "Callback %r, state %r has argspec %r!=%r" % (callback, key, argspec, expected)) def process_message(self, db_session, msg): - # set module build to None and let's populate it later - build = None - # Choose a handler for this message if isinstance(msg, module_build_service.messaging.KojiBuildChange): handler = self.on_build_change[msg.build_new_state] @@ -238,16 +235,15 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): msg = "Could not process message handler. See the traceback." log.exception(msg) db_session.rollback() - if build: - db_session.refresh(build) - build.transition( - db_session, - conf, - state=models.BUILD_STATES["failed"], - state_reason=str(e), - failure_type="infra", - ) - db_session.commit() + db_session.refresh(build) + build.transition( + db_session, + conf, + state=models.BUILD_STATES["failed"], + state_reason=str(e), + failure_type="infra", + ) + db_session.commit() log.debug("Done with %s" % idx) From 251239621da617fa2b6d5187fe71c8be39bfd2c1 Mon Sep 17 00:00:00 2001 From: Chenxiong Qi Date: Mon, 28 Oct 2019 21:36:08 +0800 Subject: [PATCH 2/2] Refactor consume and process_message A new method _map_message is added for converting a message object to corresponding event handler and module build. This is used to shorten the process_message method. process_message is refactored so that: * when handler is NO_OP, just return as earlier as possible because setting it make no sense to set MBSConsumer.current_module_build_id back and forth. * Re-raise error thrown from handler execution so that the caller is able to get a chance to collect monitoring metric inside except clause handling Exception. Otherwise, no failed metric is collected. This is the major problem this patch is to fix. * Ensure MBSConsumer.current_module_build_id is set back to None. In consumer method, no traceback is logged inside except clause catching Exception. process_method does that. Signed-off-by: Chenxiong Qi --- module_build_service/scheduler/consumer.py | 112 +++++++++++++-------- 1 file changed, 68 insertions(+), 44 deletions(-) diff --git a/module_build_service/scheduler/consumer.py b/module_build_service/scheduler/consumer.py index e0f4417d..2949c7ed 100644 --- a/module_build_service/scheduler/consumer.py +++ b/module_build_service/scheduler/consumer.py @@ -158,7 +158,6 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): raise except Exception: monitor.messaging_rx_failed_counter.inc() - log.exception("Failed while handling {0!r}".format(msg)) if self.stop_condition and self.stop_condition(message): self.shutdown() @@ -194,59 +193,83 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): raise ValueError( "Callback %r, state %r has argspec %r!=%r" % (callback, key, argspec, expected)) - def process_message(self, db_session, msg): - # Choose a handler for this message + def _map_message(self, db_session, msg): + """Map message to its corresponding event handler and module build""" + if isinstance(msg, module_build_service.messaging.KojiBuildChange): handler = self.on_build_change[msg.build_new_state] build = models.ComponentBuild.from_component_event(db_session, msg) if build: build = build.module_build - elif type(msg) == module_build_service.messaging.KojiRepoChange: - handler = self.on_repo_change - build = models.ModuleBuild.from_repo_done_event(db_session, msg) - elif type(msg) == module_build_service.messaging.KojiTagChange: - handler = self.on_tag_change - build = models.ModuleBuild.from_tag_change_event(db_session, msg) - elif type(msg) == module_build_service.messaging.MBSModule: - handler = self.on_module_change[module_build_state_from_msg(msg)] - build = models.ModuleBuild.from_module_event(db_session, msg) - elif type(msg) == module_build_service.messaging.GreenwaveDecisionUpdate: - handler = self.on_decision_update - build = greenwave.get_corresponding_module_build(db_session, msg.subject_identifier) - else: + return handler, build + + if isinstance(msg, module_build_service.messaging.KojiRepoChange): + return ( + self.on_repo_change, + models.ModuleBuild.from_repo_done_event(db_session, msg) + ) + + if isinstance(msg, module_build_service.messaging.KojiTagChange): + return ( + self.on_tag_change, + models.ModuleBuild.from_tag_change_event(db_session, msg) + ) + + if isinstance(msg, module_build_service.messaging.MBSModule): + return ( + self.on_module_change[module_build_state_from_msg(msg)], + models.ModuleBuild.from_module_event(db_session, msg) + ) + + if isinstance(msg, module_build_service.messaging.GreenwaveDecisionUpdate): + return ( + self.on_decision_update, + greenwave.get_corresponding_module_build( + db_session, msg.subject_identifier) + ) + + return None, None + + def process_message(self, db_session, msg): + # Choose a handler for this message + handler, build = self._map_message(db_session, msg) + + if handler is None: + log.debug("No event handler associated with msg %s", msg.msg_id) + return + + idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__, msg.msg_id) + + if handler is self.NO_OP: + log.debug("Handler is NO_OP: %s", idx) return if not build: - log.debug("No module associated with msg {}".format(msg.msg_id)) + log.debug("No module associated with msg %s", msg.msg_id) return MBSConsumer.current_module_build_id = build.id - # Execute our chosen handler - idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__, msg.msg_id) - if handler is self.NO_OP: - log.debug("Handler is NO_OP: %s" % idx) + log.info("Calling %s", idx) + + try: + further_work = handler(conf, db_session, msg) or [] + except Exception as e: + log.exception() + db_session.rollback() + db_session.refresh(build) + build.transition( + db_session, + conf, + state=models.BUILD_STATES["failed"], + state_reason=str(e), + failure_type="infra", + ) + db_session.commit() + + # Allow caller to do something when error is occurred. + raise else: - log.info("Calling %s" % idx) - further_work = [] - try: - further_work = handler(conf, db_session, msg) or [] - except Exception as e: - msg = "Could not process message handler. See the traceback." - log.exception(msg) - db_session.rollback() - db_session.refresh(build) - build.transition( - db_session, - conf, - state=models.BUILD_STATES["failed"], - state_reason=str(e), - failure_type="infra", - ) - db_session.commit() - - log.debug("Done with %s" % idx) - # Handlers can *optionally* return a list of fake messages that # should be re-inserted back into the main work queue. We can use # this (for instance) when we submit a new component build but (for @@ -254,10 +277,11 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): # completion back to the scheduler so that work resumes as if it # was submitted for real and koji announced its completion. for event in further_work: - log.info(" Scheduling faked event %r" % event) + log.info(" Scheduling faked event %r", event) self.incoming.put(event) - - MBSConsumer.current_module_build_id = None + finally: + MBSConsumer.current_module_build_id = None + log.debug("Done with %s", idx) def get_global_consumer():