diff --git a/module_build_service/builder/MockModuleBuilder.py b/module_build_service/builder/MockModuleBuilder.py index 153aba7b..4a7810c7 100644 --- a/module_build_service/builder/MockModuleBuilder.py +++ b/module_build_service/builder/MockModuleBuilder.py @@ -161,7 +161,6 @@ def load_local_builds(local_build_nsvs): modulemd=mmd_to_str(mmd), scmurl="", username="mbs", - publish_msg=False, ) module.koji_tag = path module.state = models.BUILD_STATES["ready"] diff --git a/module_build_service/common/messaging.py b/module_build_service/common/messaging.py index 3add0396..e5f22b5c 100644 --- a/module_build_service/common/messaging.py +++ b/module_build_service/common/messaging.py @@ -5,10 +5,19 @@ from __future__ import absolute_import import pkg_resources +import six.moves.queue from module_build_service import conf, log from module_build_service.scheduler.parser import FedmsgMessageParser +# A queue containing message body that should be sent after +# ModuleBuild.transition call every time. +# The ModuleBuild.transition is responsible for generating the message body +# and put it into this queue. +# After changes to a module build made by the transition method are committed, +# the messages will be sent. +module_build_state_change_out_queue = six.moves.queue.Queue() + def publish(topic, msg, conf, service): """ @@ -114,3 +123,10 @@ if not _messaging_backends: # After loading registered messaging backends, the default messaging backend # can be determined by configured messaging backend. default_messaging_backend = _messaging_backends[conf.messaging] + + +# Helper functions to send message on specific event occurring + + +def notify_on_module_state_change(message_body): + publish("module.state.change", message_body, conf, "mbs") diff --git a/module_build_service/common/models.py b/module_build_service/common/models.py index 05947575..caf04988 100644 --- a/module_build_service/common/models.py +++ b/module_build_service/common/models.py @@ -19,7 +19,8 @@ from sqlalchemy.orm import validates, load_only from module_build_service import db, log, get_url_for, conf from module_build_service.common.errors import UnprocessableEntity -import module_build_service.common.messaging +from module_build_service.common.messaging import module_build_state_change_out_queue +from module_build_service.common.messaging import notify_on_module_state_change from module_build_service.common.utils import load_mmd from module_build_service.scheduler import events @@ -610,7 +611,6 @@ class ModuleBuild(MBSBase): rebuild_strategy=None, scratch=False, srpms=None, - publish_msg=True, **kwargs ): now = datetime.utcnow() @@ -641,15 +641,6 @@ class ModuleBuild(MBSBase): db_session.add(module) db_session.commit() - - if publish_msg: - module_build_service.common.messaging.publish( - service="mbs", - topic="module.state.change", - msg=module.json(db_session, show_tasks=False), # Note the state is "init" here... - conf=conf, - ) - return module def transition(self, db_session, conf, state, state_reason=None, failure_type="unspec"): @@ -700,12 +691,10 @@ class ModuleBuild(MBSBase): INVERSE_BUILD_STATES[old_state], new_state_name, self) if old_state != self.state: - module_build_service.common.messaging.publish( - service="mbs", - topic="module.state.change", - msg=self.json(db_session, show_tasks=False), - conf=conf, - ) + # Do not send a message now until the data changes are committed + # into database. + module_build_state_change_out_queue.put( + self.json(db_session, show_tasks=False)) @classmethod def local_modules(cls, db_session, name=None, stream=None): @@ -1319,3 +1308,15 @@ def new_and_update_module_handler(mapper, db_session, target): # Only modify time_modified if it wasn't explicitly set if not db.inspect(target).get_history("time_modified", True).has_changes(): target.time_modified = datetime.utcnow() + + +def send_message_after_module_build_state_change(db_session): + """Hook of SQLAlchemy ORM event after_commit to send messages""" + queue = module_build_state_change_out_queue + # Generally, the changes will be committed immediately after + # ModuleBuild.transition is called. In this case, queue should have only + # one message body to be sent. This while loop also ensures that messages + # are sent correctly if the commit happens after more than one call of + # ModuleBuild.transition. + while not queue.empty(): + notify_on_module_state_change(queue.get()) diff --git a/module_build_service/scheduler/db_session.py b/module_build_service/scheduler/db_session.py index 9eddaedc..1e2890f6 100644 --- a/module_build_service/scheduler/db_session.py +++ b/module_build_service/scheduler/db_session.py @@ -7,6 +7,7 @@ from sqlalchemy.pool import NullPool from sqlalchemy.orm import scoped_session, sessionmaker from module_build_service import conf +from module_build_service.common.models import send_message_after_module_build_state_change from module_build_service.common.models import session_before_commit_handlers __all__ = ("db_session",) @@ -16,8 +17,14 @@ def _setup_event_listeners(db_session): """ Starts listening for events related to the database session. """ - if not sqlalchemy.event.contains(db_session, "before_commit", session_before_commit_handlers): - sqlalchemy.event.listen(db_session, "before_commit", session_before_commit_handlers) + event_hooks = ( + ("before_commit", session_before_commit_handlers), + ("after_commit", send_message_after_module_build_state_change), + ) + + for event, handler in event_hooks: + if not sqlalchemy.event.contains(db_session, event, handler): + sqlalchemy.event.listen(db_session, event, handler) # initialize DB event listeners from the monitor module from module_build_service.common.monitor import db_hook_event_listeners diff --git a/module_build_service/web/submit.py b/module_build_service/web/submit.py index b9dedd05..96d90362 100644 --- a/module_build_service/web/submit.py +++ b/module_build_service/web/submit.py @@ -14,6 +14,7 @@ import requests from module_build_service import conf, log, Modulemd from module_build_service.common import models from module_build_service.common.errors import Conflict, Forbidden, ValidationError +from module_build_service.common.messaging import notify_on_module_state_change from module_build_service.common.submit import fetch_mmd from module_build_service.common.utils import load_mmd, mmd_to_str, to_text_type from module_build_service.web.mse import generate_expanded_mmds @@ -554,6 +555,7 @@ def submit_module_build(db_session, username, mmd, params): transition_to = models.BUILD_STATES["wait"] module.batch = 0 module.transition(db_session, conf, transition_to, "Resubmitted by %s" % username) + db_session.commit() log.info("Resumed existing module build in previous state %s" % module.state) else: # make NSVC unique for every scratch build @@ -601,10 +603,14 @@ def submit_module_build(db_session, username, mmd, params): module.build_context, module.runtime_context, module.context, \ module.build_context_no_bms = module.contexts_from_mmd(module.modulemd) module.context += context_suffix + db_session.commit() + + notify_on_module_state_change( + # Note the state is "init" here... + module.json(db_session, show_tasks=False) + ) all_modules_skipped = False - db_session.add(module) - db_session.commit() modules.append(module) log.info('The user "%s" submitted the build "%s"', username, nsvc) diff --git a/module_build_service/web/views.py b/module_build_service/web/views.py index 232c4189..75252f85 100644 --- a/module_build_service/web/views.py +++ b/module_build_service/web/views.py @@ -7,6 +7,7 @@ This is the implementation of the orchestrator's public RESTful API. from __future__ import absolute_import from io import BytesIO import json +import sqlalchemy.event from flask import request, url_for, Blueprint, Response from flask.views import MethodView @@ -18,6 +19,7 @@ from module_build_service.common import models from module_build_service.common.errors import ( ValidationError, Forbidden, NotFound, ProgrammingError ) +from module_build_service.common.models import send_message_after_module_build_state_change from module_build_service.common.monitor import registry from module_build_service.common.submit import fetch_mmd from module_build_service.common.utils import import_mmd @@ -565,3 +567,8 @@ def register_api(): register_api() + + +# Ensure the event handler is called on db.session +sqlalchemy.event.listen( + db.session, "after_commit", send_message_after_module_build_state_change) diff --git a/tests/test_scheduler/test_db_session.py b/tests/test_scheduler/test_db_session.py new file mode 100644 index 00000000..ec32debc --- /dev/null +++ b/tests/test_scheduler/test_db_session.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +# SPDX-License-Identifier: MIT + +from mock import patch + +from module_build_service import conf +from module_build_service.common import models +from module_build_service.scheduler.db_session import db_session +from tests import clean_database, make_module_in_db + + +@patch('module_build_service.common.messaging.publish') +def test_send_messages_after_several_state_transitions(mock_publish): + """ + Ensure all module build state change messages are sent after multiple + ModuleBuild.transitions are committed at once + """ + clean_database() + + build = make_module_in_db("testmodule:1:2:c3") + + build.transition(db_session, conf, models.BUILD_STATES["wait"]) + build.transition(db_session, conf, models.BUILD_STATES["done"]) + + assert 0 == mock_publish.call_count + db_session.commit() + assert 2 == mock_publish.call_count diff --git a/tests/test_scheduler/test_greenwave_handler.py b/tests/test_scheduler/test_greenwave_handler.py index e281583f..6b27b122 100644 --- a/tests/test_scheduler/test_greenwave_handler.py +++ b/tests/test_scheduler/test_greenwave_handler.py @@ -138,10 +138,10 @@ class TestDecisionUpdateHandler: # Assert this call below first_publish_call = call( - service="mbs", - topic="module.state.change", - msg=module_build.json(db_session, show_tasks=False), - conf=conf, + "module.state.change", + module_build.json(db_session, show_tasks=False), + conf, + "mbs", ) ClientSession.return_value.getBuild.return_value = { @@ -169,9 +169,9 @@ class TestDecisionUpdateHandler: publish.assert_has_calls([ first_publish_call, call( - service="mbs", - topic="module.state.change", - msg=module_build.json(db_session, show_tasks=False), - conf=conf, + "module.state.change", + module_build.json(db_session, show_tasks=False), + conf, + "mbs" ), ])