diff --git a/conf/backend_config.py b/conf/backend_config.py index 0491d435..5de87f8a 100644 --- a/conf/backend_config.py +++ b/conf/backend_config.py @@ -21,7 +21,13 @@ class BackendConfiguration(object): # name in MBS is CELERY_BROKER_URL. CELERY_BROKER_URL = "" CELERY_RESULT_BACKEND = "" - CELERY_IMPORTS = [] + CELERY_IMPORTS = [ + "module_build_service.scheduler.handlers.components", + "module_build_service.scheduler.handlers.modules", + "module_build_service.scheduler.handlers.repos", + "module_build_service.scheduler.handlers.tags", + "module_build_service.scheduler.handlers.greenwave", + ] class TestConfiguration(BackendConfiguration): diff --git a/module_build_service/scheduler/consumer.py b/module_build_service/scheduler/consumer.py index 1f8cd416..c617d88f 100644 --- a/module_build_service/scheduler/consumer.py +++ b/module_build_service/scheduler/consumer.py @@ -259,7 +259,13 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): kwargs.pop("event") try: - handler(**kwargs) + if conf.celery_broker_url: + # handlers are also Celery tasks, when celery_broker_url is configured, + # call "delay" method to run the handlers as Celery async tasks + func = getattr(handler, "delay") + func(**kwargs) + else: + handler(**kwargs) except Exception as e: log.exception("Could not process message handler.") db_session.rollback() diff --git a/module_build_service/scheduler/handlers/components.py b/module_build_service/scheduler/handlers/components.py index 27a20c66..4b46960a 100644 --- a/module_build_service/scheduler/handlers/components.py +++ b/module_build_service/scheduler/handlers/components.py @@ -5,7 +5,7 @@ import logging import koji -from module_build_service import conf, models, log +from module_build_service import celery_app, conf, models, log from module_build_service.builder import GenericBuilder from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder from module_build_service.utils.general import mmd_to_str @@ -16,6 +16,7 @@ from module_build_service.utils.batches import continue_batch_build logging.basicConfig(level=logging.DEBUG) +@celery_app.task @events.mbs_event_handler() def build_task_finalize( msg_id, build_id, task_id, build_new_state, diff --git a/module_build_service/scheduler/handlers/greenwave.py b/module_build_service/scheduler/handlers/greenwave.py index 817b71db..b62a13c1 100644 --- a/module_build_service/scheduler/handlers/greenwave.py +++ b/module_build_service/scheduler/handlers/greenwave.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # SPDX-License-Identifier: MIT -from module_build_service import conf, log +from module_build_service import celery_app, conf, log from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder from module_build_service.db_session import db_session from module_build_service.models import ModuleBuild, BUILD_STATES @@ -32,6 +32,7 @@ def get_corresponding_module_build(nvr): return ModuleBuild.get_by_id(db_session, module_build_id) +@celery_app.task @events.mbs_event_handler() def decision_update(msg_id, decision_context, subject_identifier, policies_satisfied): """Move module build to ready or failed according to Greenwave result diff --git a/module_build_service/scheduler/handlers/modules.py b/module_build_service/scheduler/handlers/modules.py index ec1665b2..d303cd2c 100644 --- a/module_build_service/scheduler/handlers/modules.py +++ b/module_build_service/scheduler/handlers/modules.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: MIT """ Handlers for module change events on the message bus. """ -from module_build_service import conf, models, log, build_logs +from module_build_service import celery_app, conf, models, log, build_logs import module_build_service.resolver import module_build_service.utils from module_build_service.utils import ( @@ -40,6 +40,7 @@ def get_artifact_from_srpm(srpm_path): return os.path.basename(srpm_path).replace(".src.rpm", "") +@celery_app.task @events.mbs_event_handler() def failed(msg_id, module_build_id, module_build_state): """Called whenever a module enters the 'failed' state. @@ -100,6 +101,7 @@ def failed(msg_id, module_build_id, module_build_state): GenericBuilder.clear_cache(build) +@celery_app.task @events.mbs_event_handler() def done(msg_id, module_build_id, module_build_state): """Called whenever a module enters the 'done' state. @@ -138,6 +140,7 @@ def done(msg_id, module_build_id, module_build_state): GenericBuilder.clear_cache(build) +@celery_app.task @events.mbs_event_handler() def init(msg_id, module_build_id, module_build_state): """Called whenever a module enters the 'init' state. @@ -313,6 +316,7 @@ def get_content_generator_build_koji_tag(module_deps): return conf.koji_cg_default_build_tag +@celery_app.task @events.mbs_event_handler() def wait(msg_id, module_build_id, module_build_state): """ Called whenever a module enters the 'wait' state. diff --git a/module_build_service/scheduler/handlers/repos.py b/module_build_service/scheduler/handlers/repos.py index ac6732f4..3771056f 100644 --- a/module_build_service/scheduler/handlers/repos.py +++ b/module_build_service/scheduler/handlers/repos.py @@ -4,7 +4,7 @@ import logging from datetime import datetime -from module_build_service import conf, models, log +from module_build_service import celery_app, conf, models, log from module_build_service.builder import GenericBuilder from module_build_service.utils import start_next_batch_build from module_build_service.db_session import db_session @@ -13,6 +13,7 @@ from module_build_service.scheduler import events logging.basicConfig(level=logging.DEBUG) +@celery_app.task @events.mbs_event_handler() def done(msg_id, repo_tag): """Called whenever koji rebuilds a repo, any repo. diff --git a/module_build_service/scheduler/handlers/tags.py b/module_build_service/scheduler/handlers/tags.py index 7fae267e..ac38ca11 100644 --- a/module_build_service/scheduler/handlers/tags.py +++ b/module_build_service/scheduler/handlers/tags.py @@ -4,7 +4,7 @@ import logging import koji -from module_build_service import conf, models, log +from module_build_service import celery_app, conf, models, log from module_build_service.db_session import db_session from module_build_service.builder import GenericBuilder from module_build_service.scheduler import events @@ -12,6 +12,7 @@ from module_build_service.scheduler import events logging.basicConfig(level=logging.DEBUG) +@celery_app.task @events.mbs_event_handler() def tagged(msg_id, tag_name, build_name, build_nvr): """Called whenever koji tags a build to tag. diff --git a/tests/test_build/test_build.py b/tests/test_build/test_build.py index e8bb1a0e..5b22891c 100644 --- a/tests/test_build/test_build.py +++ b/tests/test_build/test_build.py @@ -389,8 +389,18 @@ class TestBuild(BaseTestBuild): return_value=True) self.mock_check_gating = self.p_check_gating.start() + self.patch_config_broker = patch.object( + module_build_service.config.Config, + "celery_broker_url", + create=True, + new_callable=PropertyMock, + return_value=False, + ) + self.patch_config_broker.start() + def teardown_method(self, test_method): self.p_check_gating.stop() + self.patch_config_broker.stop() FakeModuleBuilder.reset() cleanup_moksha() for i in range(20): diff --git a/tests/test_scheduler/test_greenwave.py b/tests/test_scheduler/test_greenwave.py index 997e1adb..2a1915b8 100644 --- a/tests/test_scheduler/test_greenwave.py +++ b/tests/test_scheduler/test_greenwave.py @@ -2,9 +2,10 @@ # SPDX-License-Identifier: MIT import pytest -from mock import call, patch, Mock +from mock import call, patch, PropertyMock, Mock from sqlalchemy import func +import module_build_service.config from module_build_service import conf from module_build_service.db_session import db_session from module_build_service.models import BUILD_STATES, ModuleBuild @@ -72,6 +73,19 @@ class TestGetCorrespondingModuleBuild: class TestDecisionUpdateHandler: """Test handler decision_update""" + def setup_method(self, test_method): + self.patch_config_broker = patch.object( + module_build_service.config.Config, + "celery_broker_url", + create=True, + new_callable=PropertyMock, + return_value=False, + ) + self.patch_config_broker.start() + + def teardown_method(self, test_method): + self.patch_config_broker.stop() + @patch("module_build_service.scheduler.handlers.greenwave.log") def test_decision_context_is_not_match(self, log): decision_update(