Send module build state change message after commit to database

In MBS, there are two cases to send a message when a module build moves
to a new state. One is to create a new module build, with
ModuleBuild.create particularly, when user submit a module build.
Another one is to transition a module build to a new state with
ModuleBuild.transition. This commit handles these two cases in a little
different ways.

For the former, existing code is refactored by moving the publish call
outside ModuleBuild.create.

For the latter, message is sent in a hook of SQLAlchemy ORM event
after_commit rather than immediately inside the ModuleBuild.transition.

Both of these changes ensure the message is sent after the changes are
committed into database successfully. Then, the backend can have
confidence that the database has the module build data when receive a
message.

Signed-off-by: Chenxiong Qi <cqi@redhat.com>
This commit is contained in:
Chenxiong Qi
2020-01-16 19:51:46 +08:00
committed by mprahl
parent b21e130555
commit b70c632a70
8 changed files with 93 additions and 30 deletions

View File

@@ -161,7 +161,6 @@ def load_local_builds(local_build_nsvs):
modulemd=mmd_to_str(mmd),
scmurl="",
username="mbs",
publish_msg=False,
)
module.koji_tag = path
module.state = models.BUILD_STATES["ready"]

View File

@@ -5,10 +5,19 @@
from __future__ import absolute_import
import pkg_resources
import six.moves.queue
from module_build_service import conf, log
from module_build_service.scheduler.parser import FedmsgMessageParser
# A queue containing message body that should be sent after
# ModuleBuild.transition call every time.
# The ModuleBuild.transition is responsible for generating the message body
# and put it into this queue.
# After changes to a module build made by the transition method are committed,
# the messages will be sent.
module_build_state_change_out_queue = six.moves.queue.Queue()
def publish(topic, msg, conf, service):
"""
@@ -114,3 +123,10 @@ if not _messaging_backends:
# After loading registered messaging backends, the default messaging backend
# can be determined by configured messaging backend.
default_messaging_backend = _messaging_backends[conf.messaging]
# Helper functions to send message on specific event occurring
def notify_on_module_state_change(message_body):
publish("module.state.change", message_body, conf, "mbs")

View File

@@ -19,7 +19,8 @@ from sqlalchemy.orm import validates, load_only
from module_build_service import db, log, get_url_for, conf
from module_build_service.common.errors import UnprocessableEntity
import module_build_service.common.messaging
from module_build_service.common.messaging import module_build_state_change_out_queue
from module_build_service.common.messaging import notify_on_module_state_change
from module_build_service.common.utils import load_mmd
from module_build_service.scheduler import events
@@ -610,7 +611,6 @@ class ModuleBuild(MBSBase):
rebuild_strategy=None,
scratch=False,
srpms=None,
publish_msg=True,
**kwargs
):
now = datetime.utcnow()
@@ -641,15 +641,6 @@ class ModuleBuild(MBSBase):
db_session.add(module)
db_session.commit()
if publish_msg:
module_build_service.common.messaging.publish(
service="mbs",
topic="module.state.change",
msg=module.json(db_session, show_tasks=False), # Note the state is "init" here...
conf=conf,
)
return module
def transition(self, db_session, conf, state, state_reason=None, failure_type="unspec"):
@@ -700,12 +691,10 @@ class ModuleBuild(MBSBase):
INVERSE_BUILD_STATES[old_state], new_state_name, self)
if old_state != self.state:
module_build_service.common.messaging.publish(
service="mbs",
topic="module.state.change",
msg=self.json(db_session, show_tasks=False),
conf=conf,
)
# Do not send a message now until the data changes are committed
# into database.
module_build_state_change_out_queue.put(
self.json(db_session, show_tasks=False))
@classmethod
def local_modules(cls, db_session, name=None, stream=None):
@@ -1319,3 +1308,15 @@ def new_and_update_module_handler(mapper, db_session, target):
# Only modify time_modified if it wasn't explicitly set
if not db.inspect(target).get_history("time_modified", True).has_changes():
target.time_modified = datetime.utcnow()
def send_message_after_module_build_state_change(db_session):
"""Hook of SQLAlchemy ORM event after_commit to send messages"""
queue = module_build_state_change_out_queue
# Generally, the changes will be committed immediately after
# ModuleBuild.transition is called. In this case, queue should have only
# one message body to be sent. This while loop also ensures that messages
# are sent correctly if the commit happens after more than one call of
# ModuleBuild.transition.
while not queue.empty():
notify_on_module_state_change(queue.get())

View File

@@ -7,6 +7,7 @@ from sqlalchemy.pool import NullPool
from sqlalchemy.orm import scoped_session, sessionmaker
from module_build_service import conf
from module_build_service.common.models import send_message_after_module_build_state_change
from module_build_service.common.models import session_before_commit_handlers
__all__ = ("db_session",)
@@ -16,8 +17,14 @@ def _setup_event_listeners(db_session):
"""
Starts listening for events related to the database session.
"""
if not sqlalchemy.event.contains(db_session, "before_commit", session_before_commit_handlers):
sqlalchemy.event.listen(db_session, "before_commit", session_before_commit_handlers)
event_hooks = (
("before_commit", session_before_commit_handlers),
("after_commit", send_message_after_module_build_state_change),
)
for event, handler in event_hooks:
if not sqlalchemy.event.contains(db_session, event, handler):
sqlalchemy.event.listen(db_session, event, handler)
# initialize DB event listeners from the monitor module
from module_build_service.common.monitor import db_hook_event_listeners

View File

@@ -14,6 +14,7 @@ import requests
from module_build_service import conf, log, Modulemd
from module_build_service.common import models
from module_build_service.common.errors import Conflict, Forbidden, ValidationError
from module_build_service.common.messaging import notify_on_module_state_change
from module_build_service.common.submit import fetch_mmd
from module_build_service.common.utils import load_mmd, mmd_to_str, to_text_type
from module_build_service.web.mse import generate_expanded_mmds
@@ -554,6 +555,7 @@ def submit_module_build(db_session, username, mmd, params):
transition_to = models.BUILD_STATES["wait"]
module.batch = 0
module.transition(db_session, conf, transition_to, "Resubmitted by %s" % username)
db_session.commit()
log.info("Resumed existing module build in previous state %s" % module.state)
else:
# make NSVC unique for every scratch build
@@ -601,10 +603,14 @@ def submit_module_build(db_session, username, mmd, params):
module.build_context, module.runtime_context, module.context, \
module.build_context_no_bms = module.contexts_from_mmd(module.modulemd)
module.context += context_suffix
db_session.commit()
notify_on_module_state_change(
# Note the state is "init" here...
module.json(db_session, show_tasks=False)
)
all_modules_skipped = False
db_session.add(module)
db_session.commit()
modules.append(module)
log.info('The user "%s" submitted the build "%s"', username, nsvc)

View File

@@ -7,6 +7,7 @@ This is the implementation of the orchestrator's public RESTful API.
from __future__ import absolute_import
from io import BytesIO
import json
import sqlalchemy.event
from flask import request, url_for, Blueprint, Response
from flask.views import MethodView
@@ -18,6 +19,7 @@ from module_build_service.common import models
from module_build_service.common.errors import (
ValidationError, Forbidden, NotFound, ProgrammingError
)
from module_build_service.common.models import send_message_after_module_build_state_change
from module_build_service.common.monitor import registry
from module_build_service.common.submit import fetch_mmd
from module_build_service.common.utils import import_mmd
@@ -565,3 +567,8 @@ def register_api():
register_api()
# Ensure the event handler is called on db.session
sqlalchemy.event.listen(
db.session, "after_commit", send_message_after_module_build_state_change)

View File

@@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: MIT
from mock import patch
from module_build_service import conf
from module_build_service.common import models
from module_build_service.scheduler.db_session import db_session
from tests import clean_database, make_module_in_db
@patch('module_build_service.common.messaging.publish')
def test_send_messages_after_several_state_transitions(mock_publish):
"""
Ensure all module build state change messages are sent after multiple
ModuleBuild.transitions are committed at once
"""
clean_database()
build = make_module_in_db("testmodule:1:2:c3")
build.transition(db_session, conf, models.BUILD_STATES["wait"])
build.transition(db_session, conf, models.BUILD_STATES["done"])
assert 0 == mock_publish.call_count
db_session.commit()
assert 2 == mock_publish.call_count

View File

@@ -138,10 +138,10 @@ class TestDecisionUpdateHandler:
# Assert this call below
first_publish_call = call(
service="mbs",
topic="module.state.change",
msg=module_build.json(db_session, show_tasks=False),
conf=conf,
"module.state.change",
module_build.json(db_session, show_tasks=False),
conf,
"mbs",
)
ClientSession.return_value.getBuild.return_value = {
@@ -169,9 +169,9 @@ class TestDecisionUpdateHandler:
publish.assert_has_calls([
first_publish_call,
call(
service="mbs",
topic="module.state.change",
msg=module_build.json(db_session, show_tasks=False),
conf=conf,
"module.state.change",
module_build.json(db_session, show_tasks=False),
conf,
"mbs"
),
])