Register handlers as Celery tasks

This commit is contained in:
Qixiang Wan
2019-11-18 16:54:18 +08:00
committed by mprahl
parent 81d9554af3
commit 9049eea35c
9 changed files with 52 additions and 8 deletions

View File

@@ -21,7 +21,13 @@ class BackendConfiguration(object):
# name in MBS is CELERY_BROKER_URL.
CELERY_BROKER_URL = ""
CELERY_RESULT_BACKEND = ""
CELERY_IMPORTS = []
CELERY_IMPORTS = [
"module_build_service.scheduler.handlers.components",
"module_build_service.scheduler.handlers.modules",
"module_build_service.scheduler.handlers.repos",
"module_build_service.scheduler.handlers.tags",
"module_build_service.scheduler.handlers.greenwave",
]
class TestConfiguration(BackendConfiguration):

View File

@@ -259,7 +259,13 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
kwargs.pop("event")
try:
handler(**kwargs)
if conf.celery_broker_url:
# handlers are also Celery tasks, when celery_broker_url is configured,
# call "delay" method to run the handlers as Celery async tasks
func = getattr(handler, "delay")
func(**kwargs)
else:
handler(**kwargs)
except Exception as e:
log.exception("Could not process message handler.")
db_session.rollback()

View File

@@ -5,7 +5,7 @@
import logging
import koji
from module_build_service import conf, models, log
from module_build_service import celery_app, conf, models, log
from module_build_service.builder import GenericBuilder
from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder
from module_build_service.utils.general import mmd_to_str
@@ -16,6 +16,7 @@ from module_build_service.utils.batches import continue_batch_build
logging.basicConfig(level=logging.DEBUG)
@celery_app.task
@events.mbs_event_handler()
def build_task_finalize(
msg_id, build_id, task_id, build_new_state,

View File

@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: MIT
from module_build_service import conf, log
from module_build_service import celery_app, conf, log
from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder
from module_build_service.db_session import db_session
from module_build_service.models import ModuleBuild, BUILD_STATES
@@ -32,6 +32,7 @@ def get_corresponding_module_build(nvr):
return ModuleBuild.get_by_id(db_session, module_build_id)
@celery_app.task
@events.mbs_event_handler()
def decision_update(msg_id, decision_context, subject_identifier, policies_satisfied):
"""Move module build to ready or failed according to Greenwave result

View File

@@ -2,7 +2,7 @@
# SPDX-License-Identifier: MIT
""" Handlers for module change events on the message bus. """
from module_build_service import conf, models, log, build_logs
from module_build_service import celery_app, conf, models, log, build_logs
import module_build_service.resolver
import module_build_service.utils
from module_build_service.utils import (
@@ -40,6 +40,7 @@ def get_artifact_from_srpm(srpm_path):
return os.path.basename(srpm_path).replace(".src.rpm", "")
@celery_app.task
@events.mbs_event_handler()
def failed(msg_id, module_build_id, module_build_state):
"""Called whenever a module enters the 'failed' state.
@@ -100,6 +101,7 @@ def failed(msg_id, module_build_id, module_build_state):
GenericBuilder.clear_cache(build)
@celery_app.task
@events.mbs_event_handler()
def done(msg_id, module_build_id, module_build_state):
"""Called whenever a module enters the 'done' state.
@@ -138,6 +140,7 @@ def done(msg_id, module_build_id, module_build_state):
GenericBuilder.clear_cache(build)
@celery_app.task
@events.mbs_event_handler()
def init(msg_id, module_build_id, module_build_state):
"""Called whenever a module enters the 'init' state.
@@ -313,6 +316,7 @@ def get_content_generator_build_koji_tag(module_deps):
return conf.koji_cg_default_build_tag
@celery_app.task
@events.mbs_event_handler()
def wait(msg_id, module_build_id, module_build_state):
""" Called whenever a module enters the 'wait' state.

View File

@@ -4,7 +4,7 @@
import logging
from datetime import datetime
from module_build_service import conf, models, log
from module_build_service import celery_app, conf, models, log
from module_build_service.builder import GenericBuilder
from module_build_service.utils import start_next_batch_build
from module_build_service.db_session import db_session
@@ -13,6 +13,7 @@ from module_build_service.scheduler import events
logging.basicConfig(level=logging.DEBUG)
@celery_app.task
@events.mbs_event_handler()
def done(msg_id, repo_tag):
"""Called whenever koji rebuilds a repo, any repo.

View File

@@ -4,7 +4,7 @@
import logging
import koji
from module_build_service import conf, models, log
from module_build_service import celery_app, conf, models, log
from module_build_service.db_session import db_session
from module_build_service.builder import GenericBuilder
from module_build_service.scheduler import events
@@ -12,6 +12,7 @@ from module_build_service.scheduler import events
logging.basicConfig(level=logging.DEBUG)
@celery_app.task
@events.mbs_event_handler()
def tagged(msg_id, tag_name, build_name, build_nvr):
"""Called whenever koji tags a build to tag.

View File

@@ -389,8 +389,18 @@ class TestBuild(BaseTestBuild):
return_value=True)
self.mock_check_gating = self.p_check_gating.start()
self.patch_config_broker = patch.object(
module_build_service.config.Config,
"celery_broker_url",
create=True,
new_callable=PropertyMock,
return_value=False,
)
self.patch_config_broker.start()
def teardown_method(self, test_method):
self.p_check_gating.stop()
self.patch_config_broker.stop()
FakeModuleBuilder.reset()
cleanup_moksha()
for i in range(20):

View File

@@ -2,9 +2,10 @@
# SPDX-License-Identifier: MIT
import pytest
from mock import call, patch, Mock
from mock import call, patch, PropertyMock, Mock
from sqlalchemy import func
import module_build_service.config
from module_build_service import conf
from module_build_service.db_session import db_session
from module_build_service.models import BUILD_STATES, ModuleBuild
@@ -72,6 +73,19 @@ class TestGetCorrespondingModuleBuild:
class TestDecisionUpdateHandler:
"""Test handler decision_update"""
def setup_method(self, test_method):
self.patch_config_broker = patch.object(
module_build_service.config.Config,
"celery_broker_url",
create=True,
new_callable=PropertyMock,
return_value=False,
)
self.patch_config_broker.start()
def teardown_method(self, test_method):
self.patch_config_broker.stop()
@patch("module_build_service.scheduler.handlers.greenwave.log")
def test_decision_context_is_not_match(self, log):
decision_update(