diff --git a/module_build_service/builder/KojiModuleBuilder.py b/module_build_service/builder/KojiModuleBuilder.py index 7385619c..1d8eeca5 100644 --- a/module_build_service/builder/KojiModuleBuilder.py +++ b/module_build_service/builder/KojiModuleBuilder.py @@ -700,6 +700,11 @@ class KojiModuleBuilder(GenericBuilder): :param component_build: a ComponentBuild object :return: a list of msgs that MBS needs to process """ + # Imported here because of circular dependencies. + from module_build_service.scheduler.handlers.tags import tagged as tagged_handler + from module_build_service.scheduler.handlers.components import ( + build_task_finalize as build_task_finalize_handler) + opts = {"latest": True, "package": component_build.package, "inherit": False} build_tagged = self.koji_session.listTagged(self.module_build_tag["name"], **opts) dest_tagged = None @@ -727,10 +732,10 @@ class KojiModuleBuilder(GenericBuilder): nvr = "{name}-{version}-{release}".format(**untagged_build) build = self.koji_session.getBuild(nvr) break - further_work = [] - # If the build doesn't exist, then return + + # If the build doesn't exist, then return False if not build: - return further_work + return False # Start setting up MBS' database to use the existing build log.info('Skipping build of "{0}" since it already exists.'.format(build["nvr"])) @@ -741,19 +746,11 @@ class KojiModuleBuilder(GenericBuilder): component_build.state_reason = "Found existing build" nvr_dict = kobo.rpmlib.parse_nvr(component_build.nvr) # Trigger a completed build message - further_work.append({ - "msg_id": "recover_orphaned_artifact: fake message", - "event": events.KOJI_BUILD_CHANGE, - "build_id": build["build_id"], - "task_id": build["task_id"], - "build_new_state": koji.BUILD_STATES["COMPLETE"], - "build_name": component_build.package, - "build_version": nvr_dict["version"], - "build_release": nvr_dict["release"], - "module_build_id": component_build.module_build.id, - "state_reason": None - }) - + args = ( + "recover_orphaned_artifact: fake message", build["build_id"], build["task_id"], + koji.BUILD_STATES["COMPLETE"], component_build.package, nvr_dict["version"], + nvr_dict["release"], component_build.module_build.id, None) + events.scheduler.add(build_task_finalize_handler, args) component_tagged_in = [] if build_tagged: component_tagged_in.append(self.module_build_tag["name"]) @@ -772,14 +769,11 @@ class KojiModuleBuilder(GenericBuilder): 'The build being skipped isn\'t tagged in the "{0}" tag. Will send a message to ' "the tag handler".format(tag) ) - further_work.append({ - "msg_id": "recover_orphaned_artifact: fake message", - "event": events.KOJI_TAG_CHANGE, - "tag_name": tag, - "build_name": component_build.package, - "build_nvr": component_build.nvr, - }) - return further_work + args = ("recover_orphaned_artifact: fake message", tag, component_build.package, + component_build.nvr) + events.scheduler.add(tagged_handler, args) + + return True def build(self, artifact_name, source): """ diff --git a/module_build_service/builder/MockModuleBuilder.py b/module_build_service/builder/MockModuleBuilder.py index ceed2827..fdb8fdbb 100644 --- a/module_build_service/builder/MockModuleBuilder.py +++ b/module_build_service/builder/MockModuleBuilder.py @@ -318,6 +318,7 @@ class MockModuleBuilder(GenericBuilder): pass def buildroot_add_artifacts(self, artifacts, install=False): + from module_build_service.scheduler.handlers.repos import done as repos_done_handler self._createrepo() # TODO: This is just hack to install module-build-macros into the @@ -330,9 +331,7 @@ class MockModuleBuilder(GenericBuilder): self.groups.append("module-build-macros") self._write_mock_config() - from module_build_service.scheduler.consumer import fake_repo_done_message - - fake_repo_done_message(self.tag_name) + events.scheduler.add(repos_done_handler, ("fake_msg", self.tag_name + "-build")) def tag_artifacts(self, artifacts): pass @@ -394,6 +393,8 @@ class MockModuleBuilder(GenericBuilder): self._write_mock_config() def _send_build_change(self, state, source, build_id): + from module_build_service.scheduler.handlers.components import ( + build_task_finalize as build_task_finalize_handler) try: nvr = kobo.rpmlib.parse_nvr(source) except ValueError: @@ -401,18 +402,10 @@ class MockModuleBuilder(GenericBuilder): # build_id=1 and task_id=1 are OK here, because we are building just # one RPM at the time. - module_build_service.scheduler.consumer.work_queue_put({ - "msg_id": "a faked internal message", - "event": events.KOJI_BUILD_CHANGE, - "build_id": build_id, - "task_id": build_id, - "build_name": nvr["name"], - "build_new_state": state, - "build_release": nvr["release"], - "build_version": nvr["version"], - "module_build_id": None, - "state_reason": None - }) + args = ( + "a faked internal message", build_id, build_id, state, nvr["name"], nvr["version"], + nvr["release"], None, None) + events.scheduler.add(build_task_finalize_handler, args) def _save_log(self, resultsdir, log_name, artifact_name): old_log = os.path.join(resultsdir, log_name) diff --git a/module_build_service/scheduler/consumer.py b/module_build_service/scheduler/consumer.py index 5ff0c7cb..1f8cd416 100644 --- a/module_build_service/scheduler/consumer.py +++ b/module_build_service/scheduler/consumer.py @@ -259,7 +259,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): kwargs.pop("event") try: - further_work = handler(**kwargs) or [] + handler(**kwargs) except Exception as e: log.exception("Could not process message handler.") db_session.rollback() @@ -275,16 +275,6 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): # Allow caller to do something when error is occurred. raise - else: - # Handlers can *optionally* return a list of fake messages that - # should be re-inserted back into the main work queue. We can use - # this (for instance) when we submit a new component build but (for - # some reason) it has already been built, then it can fake its own - # completion back to the scheduler so that work resumes as if it - # was submitted for real and koji announced its completion. - for event in further_work: - log.info(" Scheduling faked event %r", event) - self.incoming.put(event) finally: MBSConsumer.current_module_build_id = None log.debug("Done with %s", idx) @@ -307,12 +297,3 @@ def work_queue_put(msg): """ Artificially put a message into the work queue of the consumer. """ consumer = get_global_consumer() consumer.incoming.put(msg) - - -def fake_repo_done_message(tag_name): - event_info = { - "msg_id": "a faked internal message", - "event": events.KOJI_REPO_CHANGE, - "repo_tag": tag_name + "-build" - } - work_queue_put(event_info) diff --git a/module_build_service/scheduler/events.py b/module_build_service/scheduler/events.py index 2a1ee401..69cada17 100644 --- a/module_build_service/scheduler/events.py +++ b/module_build_service/scheduler/events.py @@ -12,8 +12,76 @@ build is complete, Koji sends a message to topic buildsys.build.state.change, however Brew sends to topic brew.build.complete, etc. """ +import time +import sched +from functools import wraps + +from module_build_service import log + + KOJI_BUILD_CHANGE = "koji_build_change" KOJI_TAG_CHANGE = "koji_tag_change" KOJI_REPO_CHANGE = "koji_repo_change" MBS_MODULE_STATE_CHANGE = "mbs_module_state_change" GREENWAVE_DECISION_UPDATE = "greenwave_decision_update" + + +class Scheduler(sched.scheduler): + """ + Subclass of `sched.scheduler` allowing to schedule handlers calls. + + If one of the MBS handler functions need to call another handler, they need to do it in a safe + way - such another handler call should not be done in the middle of another handler's + execution. + + This class provides an solution for that. Handler can schedule run of other handler using + the `add` method. The handlers should use `mbs_event_handler` decorator which ensures that + the `run` method is called at the end of handler's execution and other scheduler handlers + are executed. + """ + + def add(self, handler, arguments=()): + """ + Schedule execution of `handler` with `arguments`. + """ + self.enter(0, 0, handler, arguments) + + def run(self): + """ + Runs scheduled handlers. + """ + log.debug("Running event scheduler with following events:") + for event in self.queue: + log.debug(" %r", event) + sched.scheduler.run(self) + + def reset(self): + """ + Resets the Scheduler to initial state. + """ + while not self.empty(): + self.cancel(self.queue[0]) + + +scheduler = Scheduler(time.time, delayfunc=lambda x: x) + + +def mbs_event_handler(): + """ + A decorator for MBS event handlers. It implements common tasks which should otherwise + be repeated in every MBS event handler, for example: + + - at the end of handler, call events.scheduler.run(). + """ + + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + finally: + scheduler.run() + + return wrapper + + return decorator diff --git a/module_build_service/scheduler/handlers/components.py b/module_build_service/scheduler/handlers/components.py index 1b14af2d..27a20c66 100644 --- a/module_build_service/scheduler/handlers/components.py +++ b/module_build_service/scheduler/handlers/components.py @@ -16,6 +16,7 @@ from module_build_service.utils.batches import continue_batch_build logging.basicConfig(level=logging.DEBUG) +@events.mbs_event_handler() def build_task_finalize( msg_id, build_id, task_id, build_new_state, build_name, build_version, build_release, @@ -97,8 +98,6 @@ def build_task_finalize( parent.modulemd = mmd_to_str(mmd) db_session.commit() - further_work = [] - parent_current_batch = parent.current_batch() # If there are no other components still building in a batch, @@ -134,11 +133,9 @@ def build_task_finalize( # The repository won't be regenerated in this case and therefore we generate fake repo # change message here. log.info("Batch done. No component to tag") - further_work += [{ - "msg_id": "components::_finalize: fake msg", - "event": events.KOJI_REPO_CHANGE, - "repo_tag": builder.module_build_tag["name"], - }] + from module_build_service.scheduler.handlers.repos import done as repos_done_handler + events.scheduler.add( + repos_done_handler, ("fake_msg", builder.module_build_tag["name"])) else: built_component_nvrs_in_batch = [c.nvr for c in built_components_in_batch] # tag && add to srpm-build group if neccessary @@ -174,6 +171,4 @@ def build_task_finalize( # build, try to call continue_batch_build again so in case we hit the # threshold previously, we will submit another build from this batch. builder = GenericBuilder.create_from_module(db_session, parent, conf) - further_work += continue_batch_build(conf, parent, builder) - - return further_work + continue_batch_build(conf, parent, builder) diff --git a/module_build_service/scheduler/handlers/greenwave.py b/module_build_service/scheduler/handlers/greenwave.py index 3ed5be9f..817b71db 100644 --- a/module_build_service/scheduler/handlers/greenwave.py +++ b/module_build_service/scheduler/handlers/greenwave.py @@ -4,6 +4,7 @@ from module_build_service import conf, log from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder from module_build_service.db_session import db_session from module_build_service.models import ModuleBuild, BUILD_STATES +from module_build_service.scheduler import events def get_corresponding_module_build(nvr): @@ -31,6 +32,7 @@ def get_corresponding_module_build(nvr): return ModuleBuild.get_by_id(db_session, module_build_id) +@events.mbs_event_handler() def decision_update(msg_id, decision_context, subject_identifier, policies_satisfied): """Move module build to ready or failed according to Greenwave result diff --git a/module_build_service/scheduler/handlers/modules.py b/module_build_service/scheduler/handlers/modules.py index 4cabfbe4..ec1665b2 100644 --- a/module_build_service/scheduler/handlers/modules.py +++ b/module_build_service/scheduler/handlers/modules.py @@ -40,6 +40,7 @@ def get_artifact_from_srpm(srpm_path): return os.path.basename(srpm_path).replace(".src.rpm", "") +@events.mbs_event_handler() def failed(msg_id, module_build_id, module_build_state): """Called whenever a module enters the 'failed' state. @@ -99,6 +100,7 @@ def failed(msg_id, module_build_id, module_build_state): GenericBuilder.clear_cache(build) +@events.mbs_event_handler() def done(msg_id, module_build_id, module_build_state): """Called whenever a module enters the 'done' state. @@ -136,6 +138,7 @@ def done(msg_id, module_build_id, module_build_state): GenericBuilder.clear_cache(build) +@events.mbs_event_handler() def init(msg_id, module_build_id, module_build_state): """Called whenever a module enters the 'init' state. @@ -310,6 +313,7 @@ def get_content_generator_build_koji_tag(module_deps): return conf.koji_cg_default_build_tag +@events.mbs_event_handler() def wait(msg_id, module_build_id, module_build_state): """ Called whenever a module enters the 'wait' state. @@ -398,11 +402,9 @@ def wait(msg_id, module_build_id, module_build_state): db_session.commit() # Return a KojiRepoChange message so that the build can be transitioned to done # in the repos handler - return [{ - "msg_id": "handlers.modules.wait: fake msg", - "event": events.KOJI_REPO_CHANGE, - "repo_tag": builder.module_build_tag["name"], - }] + from module_build_service.scheduler.handlers.repos import done as repos_done_handler + events.scheduler.add(repos_done_handler, ("fake_msg", builder.module_build_tag["name"])) + return # If all components in module build will be reused, we don't have to build # module-build-macros, because there won't be any build done. @@ -420,7 +422,6 @@ def wait(msg_id, module_build_id, module_build_state): artifact_name = "module-build-macros" component_build = models.ComponentBuild.from_component_name(db_session, artifact_name, build.id) - further_work = [] srpm = builder.get_disttag_srpm( disttag=".%s" % get_rpm_release(db_session, build), module_build=build) @@ -437,10 +438,9 @@ def wait(msg_id, module_build_id, module_build_state): # Commit and refresh so that the SQLAlchemy relationships are available db_session.commit() db_session.refresh(component_build) - msgs = builder.recover_orphaned_artifact(component_build) - if msgs: + recovered = builder.recover_orphaned_artifact(component_build) + if recovered: log.info("Found an existing module-build-macros build") - further_work += msgs # There was no existing artifact found, so lets submit the build instead else: task_id, state, reason, nvr = builder.build(artifact_name=artifact_name, source=srpm) @@ -452,10 +452,9 @@ def wait(msg_id, module_build_id, module_build_state): # It's possible that the build succeeded in the builder but some other step failed which # caused module-build-macros to be marked as failed in MBS, so check to see if it exists # first - msgs = builder.recover_orphaned_artifact(component_build) - if msgs: + recovered = builder.recover_orphaned_artifact(component_build) + if recovered: log.info("Found an existing module-build-macros build") - further_work += msgs else: task_id, state, reason, nvr = builder.build(artifact_name=artifact_name, source=srpm) component_build.task_id = task_id @@ -475,9 +474,5 @@ def wait(msg_id, module_build_id, module_build_state): build.new_repo_task_id = task_id db_session.commit() else: - further_work.append({ - "msg_id": "fake msg", - "event": events.KOJI_REPO_CHANGE, - "repo_tag": builder.module_build_tag["name"], - }) - return further_work + from module_build_service.scheduler.handlers.repos import done as repos_done_handler + events.scheduler.add(repos_done_handler, ("fake_msg", builder.module_build_tag["name"])) diff --git a/module_build_service/scheduler/handlers/repos.py b/module_build_service/scheduler/handlers/repos.py index c880874b..ac6732f4 100644 --- a/module_build_service/scheduler/handlers/repos.py +++ b/module_build_service/scheduler/handlers/repos.py @@ -8,10 +8,12 @@ from module_build_service import conf, models, log from module_build_service.builder import GenericBuilder from module_build_service.utils import start_next_batch_build from module_build_service.db_session import db_session +from module_build_service.scheduler import events logging.basicConfig(level=logging.DEBUG) +@events.mbs_event_handler() def done(msg_id, repo_tag): """Called whenever koji rebuilds a repo, any repo. @@ -103,7 +105,6 @@ def done(msg_id, repo_tag): has_unbuilt_components = any(c.is_unbuilt for c in module_build.component_builds) has_failed_components = any(c.is_unsuccessful for c in module_build.component_builds) - further_work = [] if has_unbuilt_components and not has_failed_components: # Ok, for the subset of builds that did complete successfully, check to # see if they are in the buildroot before starting new batch. @@ -114,8 +115,7 @@ def done(msg_id, repo_tag): # Try to start next batch build, because there are still unbuilt # components in a module. - further_work += start_next_batch_build(conf, module_build, builder) - + start_next_batch_build(conf, module_build, builder) else: if has_failed_components: state_reason = "Component(s) {} failed to build.".format( @@ -137,5 +137,3 @@ def done(msg_id, repo_tag): module_build.transition(db_session, conf, state=models.BUILD_STATES["done"]) db_session.commit() - - return further_work diff --git a/module_build_service/scheduler/handlers/tags.py b/module_build_service/scheduler/handlers/tags.py index 5e3c363c..7fae267e 100644 --- a/module_build_service/scheduler/handlers/tags.py +++ b/module_build_service/scheduler/handlers/tags.py @@ -12,6 +12,7 @@ from module_build_service.scheduler import events logging.basicConfig(level=logging.DEBUG) +@events.mbs_event_handler() def tagged(msg_id, tag_name, build_name, build_nvr): """Called whenever koji tags a build to tag. @@ -53,8 +54,6 @@ def tagged(msg_id, tag_name, build_name, build_nvr): ) return [] - further_work = [] - # If all components are tagged, start newRepo task. if not any(c.is_completed and not c.is_tagged for c in module_build.up_to_current_batch()): builder = GenericBuilder.create_from_module( @@ -77,15 +76,11 @@ def tagged(msg_id, tag_name, build_name, build_nvr): # would be useless to wait for a repository we will not use anyway. log.info( "All components in module tagged and built, skipping the last repo regeneration") - further_work += [{ - "msg_id": "components::_finalize: fake msg", - "event": events.KOJI_REPO_CHANGE, - "repo_tag": builder.module_build_tag["name"], - }] + from module_build_service.scheduler.handlers.repos import done as repos_done_handler + events.scheduler.add( + repos_done_handler, ("fake_msg", builder.module_build_tag["name"])) db_session.commit() - return further_work - def _is_new_repo_generating(module_build, koji_session): """ Return whether or not a new repo is already being generated. """ diff --git a/module_build_service/scheduler/producer.py b/module_build_service/scheduler/producer.py index ae0a29d0..d4d2bdce 100644 --- a/module_build_service/scheduler/producer.py +++ b/module_build_service/scheduler/producer.py @@ -24,6 +24,7 @@ from module_build_service.scheduler import events class MBSProducer(PollingProducer): frequency = timedelta(seconds=conf.polling_interval) + @events.mbs_event_handler() def poll(self): try: self.log_summary() @@ -272,11 +273,8 @@ class MBSProducer(PollingProducer): if _has_missed_new_repo_message(module_build, builder.koji_session): log.info(" Processing the paused module build %r", module_build) - further_work = module_build_service.utils.start_next_batch_build( + module_build_service.utils.start_next_batch_build( config, module_build, builder) - for event in further_work: - log.info(" Scheduling faked event %r" % event) - module_build_service.scheduler.consumer.work_queue_put(event) # Check if we have met the threshold. if module_build_service.utils.at_concurrent_component_threshold(config): diff --git a/module_build_service/utils/batches.py b/module_build_service/utils/batches.py index 6e343aae..3db48338 100644 --- a/module_build_service/utils/batches.py +++ b/module_build_service/utils/batches.py @@ -104,7 +104,6 @@ def continue_batch_build(config, module, builder, components=None): # Get the list of components to be built in this batch. We are not building # all `unbuilt_components`, because we can meet the num_concurrent_builds # threshold - further_work = [] components_to_build = [] # Sort the unbuilt_components so that the components that take the longest to build are # first @@ -115,8 +114,7 @@ def continue_batch_build(config, module, builder, components=None): # Only evaluate new components if not component.is_waiting_for_build: continue - msgs = builder.recover_orphaned_artifact(component) - further_work += msgs + builder.recover_orphaned_artifact(component) for c in unbuilt_components: # If a previous build of the component was found, then the state will be marked as @@ -149,7 +147,6 @@ def continue_batch_build(config, module, builder, components=None): future.result() db_session.commit() - return further_work def start_next_batch_build(config, module, builder, components=None): @@ -248,7 +245,6 @@ def start_next_batch_build(config, module, builder, components=None): log.info("Starting build of next batch %d, %s" % (module.batch, unbuilt_components)) # Attempt to reuse any components possible in the batch before attempting to build any - further_work = [] unbuilt_components_after_reuse = [] components_reused = False should_try_reuse = True @@ -264,7 +260,7 @@ def start_next_batch_build(config, module, builder, components=None): for c, reusable_c in zip(unbuilt_components, reusable_components): if reusable_c: components_reused = True - further_work += reuse_component(c, reusable_c) + reuse_component(c, reusable_c) else: unbuilt_components_after_reuse.append(c) # Commit the changes done by reuse_component @@ -274,12 +270,9 @@ def start_next_batch_build(config, module, builder, components=None): # If all the components were reused in the batch then make a KojiRepoChange # message and return if components_reused and not unbuilt_components_after_reuse: - further_work.append({ - "msg_id": "start_build_batch: fake msg", - "event": events.KOJI_REPO_CHANGE, - "repo_tag": builder.module_build_tag["name"], - }) - return further_work + from module_build_service.scheduler.handlers.repos import done as repos_done_handler + events.scheduler.add( + repos_done_handler, ("start_build_batch: fake_msg", builder.module_build_tag["name"])) + return - return further_work + continue_batch_build( - config, module, builder, unbuilt_components_after_reuse) + continue_batch_build(config, module, builder, unbuilt_components_after_reuse) diff --git a/module_build_service/utils/reuse.py b/module_build_service/utils/reuse.py index c1cf472a..5505c039 100644 --- a/module_build_service/utils/reuse.py +++ b/module_build_service/utils/reuse.py @@ -9,7 +9,8 @@ from module_build_service.scheduler import events from module_build_service.utils.mse import get_base_module_mmds -def reuse_component(component, previous_component_build, change_state_now=False): +def reuse_component(component, previous_component_build, change_state_now=False, + schedule_fake_events=True): """ Reuses component build `previous_component_build` instead of building component `component` @@ -18,11 +19,17 @@ def reuse_component(component, previous_component_build, change_state_now=False) This allows callers to reuse multiple component builds and commit them all at once. - Returns the list of BaseMessage instances to be handled later by the - scheduler. + :param ComponentBuild component: Component whihch will reuse previous module build. + :param ComponentBuild previous_component_build: Previous component build to reuse. + :param bool change_state_now: When True, the component.state will be set to + previous_component_build.state. Otherwise, the component.state will be set to BUILDING. + :param bool schedule_fake_events: When True, the `events.scheduler.add` will be used to + schedule handlers.component.build_task_finalize handler call. """ import koji + from module_build_service.scheduler.handlers.components import ( + build_task_finalize as build_task_finalize_handler) log.info( 'Reusing component "{0}" from a previous module ' @@ -43,20 +50,13 @@ def reuse_component(component, previous_component_build, change_state_now=False) component.state_reason = "Reused component from previous module build" component.nvr = previous_component_build.nvr nvr_dict = kobo.rpmlib.parse_nvr(component.nvr) - # Add this message to further_work so that the reused - # component will be tagged properly - return [{ - "msg_id": "reuse_component: fake msg", - "event": events.KOJI_BUILD_CHANGE, - "build_id": None, - "task_id": component.task_id, - "build_new_state": previous_component_build.state, - "build_name": nvr_dict["name"], - "build_version": nvr_dict["version"], - "build_release": nvr_dict["release"], - "module_build_id": component.module_id, - "state_reason": component.state_reason, - }] + # Add this event to scheduler so that the reused component will be tagged properly. + if schedule_fake_events: + args = ( + "reuse_component: fake msg", None, component.task_id, previous_component_build.state, + nvr_dict["name"], nvr_dict["version"], nvr_dict["release"], component.module_id, + component.state_reason) + events.scheduler.add(build_task_finalize_handler, args) def get_reusable_module(module): @@ -214,7 +214,7 @@ def attempt_to_reuse_all_components(builder, module): module.batch = c.batch # Reuse the component - reuse_component(c, component_to_reuse, True) + reuse_component(c, component_to_reuse, True, False) components_to_tag.append(c.nvr) # Tag them diff --git a/tests/test_build/test_build.py b/tests/test_build/test_build.py index 3659490d..e8bb1a0e 100644 --- a/tests/test_build/test_build.py +++ b/tests/test_build/test_build.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # SPDX-License-Identifier: MIT +import sched import koji import os import re @@ -19,6 +20,10 @@ from module_build_service import models, conf, build_logs from module_build_service.db_session import db_session from module_build_service.scheduler import events from module_build_service.scheduler.local import make_simple_stop_condition +from module_build_service.scheduler.handlers.tags import tagged as tagged_handler +from module_build_service.scheduler.handlers.components import ( + build_task_finalize as build_task_finalize_handler) +from module_build_service.scheduler.handlers.repos import done as repos_done_handler from mock import patch, PropertyMock, Mock, MagicMock from werkzeug.datastructures import FileStorage @@ -232,43 +237,20 @@ class FakeModuleBuilder(GenericBuilder): return {"name": self.tag_name + "-build"} def _send_repo_done(self): - event_info = { - "msg_id": "a faked internal message", - "event": events.KOJI_REPO_CHANGE, - "repo_tag": self.tag_name + "-build", - } - module_build_service.scheduler.consumer.work_queue_put(event_info) + events.scheduler.add(repos_done_handler, ("fake_msg", self.tag_name + "-build")) def _send_tag(self, artifact, nvr, dest_tag=True): if dest_tag: tag = self.tag_name else: tag = self.tag_name + "-build" - event_info = { - "msg_id": "a faked internal message", - "event": events.KOJI_TAG_CHANGE, - "tag_name": tag, - "build_name": artifact, - "build_nvr": nvr - } - module_build_service.scheduler.consumer.work_queue_put(event_info) + events.scheduler.add(tagged_handler, ("a faked internal message", tag, artifact, nvr)) def _send_build_change(self, state, name, build_id): # build_id=1 and task_id=1 are OK here, because we are building just # one RPM at the time. - event_info = { - "msg_id": "a faked internal message", - "event": events.KOJI_BUILD_CHANGE, - "build_id": build_id, - "task_id": build_id, - "build_name": name, - "build_new_state": state, - "build_release": "1", - "build_version": "1", - "module_build_id": None, - "state_reason": None - } - module_build_service.scheduler.consumer.work_queue_put(event_info) + args = ("a faked internal message", build_id, build_id, state, name, "1", "1", None, None) + events.scheduler.add(build_task_finalize_handler, args) def build(self, artifact_name, source): print("Starting building artifact %s: %s" % (artifact_name, source)) @@ -297,7 +279,6 @@ class FakeModuleBuilder(GenericBuilder): pass def recover_orphaned_artifact(self, component_build): - msgs = [] if self.INSTANT_COMPLETE: disttag = module_build_service.utils.get_rpm_release( self.db_session, component_build.module_build) @@ -309,27 +290,17 @@ class FakeModuleBuilder(GenericBuilder): component_build.state_reason = "Found existing build" nvr_dict = kobo.rpmlib.parse_nvr(component_build.nvr) # Send a message stating the build is complete - msgs.append({ - "msg_id": "recover_orphaned_artifact: fake message", - "event": events.KOJI_BUILD_CHANGE, - "build_id": randint(1, 9999999), - "task_id": component_build.task_id, - "build_new_state": koji.BUILD_STATES["COMPLETE"], - "build_name": component_build.package, - "build_version": nvr_dict["version"], - "build_release": nvr_dict["release"], - "module_build_id": component_build.module_build.id, - "state_reason": None - }) + args = ("recover_orphaned_artifact: fake message", randint(1, 9999999), + component_build.task_id, koji.BUILD_STATES["COMPLETE"], + component_build.package, nvr_dict["version"], nvr_dict["release"], + component_build.module_build.id, None) + events.scheduler.add(build_task_finalize_handler, args) # Send a message stating that the build was tagged in the build tag - msgs.append({ - "msg_id": "recover_orphaned_artifact: fake message", - "event": events.KOJI_TAG_CHANGE, - "tag_name": component_build.module_build.koji_tag + "-build", - "build_name": component_build.package, - "build_nvr": component_build.nvr, - }) - return msgs + args = ("recover_orphaned_artifact: fake message", + component_build.module_build.koji_tag + "-build", + component_build.package, component_build.nvr) + events.scheduler.add(tagged_handler, args) + return True def finalize(self, succeeded=None): if FakeModuleBuilder.on_finalize_cb: @@ -868,8 +839,9 @@ class TestBuild(BaseTestBuild): new_callable=PropertyMock, return_value=2, ) + @patch('module_build_service.scheduler.events.Scheduler.run', autospec=True) def test_try_to_reach_concurrent_threshold( - self, conf_num_concurrent_builds, mocked_scm, mocked_get_user, + self, scheduler_run, conf_num_concurrent_builds, mocked_scm, mocked_get_user, conf_system, dbg, hmsc ): """ @@ -896,24 +868,21 @@ class TestBuild(BaseTestBuild): # the module build. TestBuild._global_var = [] - def stop(message): + def mocked_scheduler_run(self): """ - Stop the scheduler when the module is built or when we try to build - more components than the num_concurrent_builds. + Store the number of concurrent builds between each handler call to global list so we + can examine it later. """ - main_stop = make_simple_stop_condition() num_building = ( db_session.query(models.ComponentBuild) .filter_by(state=koji.BUILD_STATES["BUILDING"]) .count() ) - over_threshold = conf.num_concurrent_builds < num_building TestBuild._global_var.append(num_building) - result = main_stop(message) or over_threshold - db_session.remove() - return result + sched.scheduler.run(self) - self.run_scheduler(stop_condition=stop) + scheduler_run.side_effect = mocked_scheduler_run + self.run_scheduler() # _global_var looks similar to this: [0, 1, 0, 0, 2, 2, 1, 0, 0, 0] # It shows the number of concurrent builds in the time. At first we diff --git a/tests/test_builder/test_koji.py b/tests/test_builder/test_koji.py index a7aebafc..67370b64 100644 --- a/tests/test_builder/test_koji.py +++ b/tests/test_builder/test_koji.py @@ -89,6 +89,7 @@ class FakeKojiModuleBuilder(KojiModuleBuilder): class TestKojiBuilder: def setup_method(self, test_method): init_data(1) + events.scheduler.reset() self.config = mock.Mock() self.config.koji_profile = conf.koji_profile self.config.koji_repository_url = conf.koji_repository_url @@ -105,6 +106,7 @@ class TestKojiBuilder: def teardown_method(self, test_method): self.p_read_config.stop() + events.scheduler.reset() def test_tag_to_repo(self): """ Test that when a repo msg hits us and we have no match, @@ -145,29 +147,24 @@ class TestKojiBuilder: component_build.state = None component_build.nvr = None - actual = builder.recover_orphaned_artifact(component_build) + recovered = builder.recover_orphaned_artifact(component_build) # recover_orphaned_artifact modifies a component build, but doesn't # commit the changes. db_session.commit() - assert len(actual) == 3 + assert recovered - assert actual[0]["event"] == events.KOJI_BUILD_CHANGE - assert actual[0]["build_id"] == 91 - assert actual[0]["task_id"] == 12345 - assert actual[0]["build_new_state"] == koji.BUILD_STATES["COMPLETE"] - assert actual[0]["build_name"] == "rubygem-rails" - assert actual[0]["build_version"] == "1.0" - assert actual[0]["build_release"] == "1.module+e0095747" - assert actual[0]["module_build_id"] == 4 + event_info = events.scheduler.queue[0][3] + assert event_info == ('recover_orphaned_artifact: fake message', 91, 12345, 1, + 'rubygem-rails', '1.0', '1.module+e0095747', 4, None) - assert actual[1]["event"] == events.KOJI_TAG_CHANGE - assert actual[1]["tag_name"] == "module-foo-build" - assert actual[1]["build_name"] == "rubygem-rails" + event_info = events.scheduler.queue[1][3] + assert event_info == ('recover_orphaned_artifact: fake message', 'module-foo-build', + 'rubygem-rails', 'foo-1.0-1.module+e0095747') - assert actual[2]["event"] == events.KOJI_TAG_CHANGE - assert actual[2]["tag_name"] == "module-foo" - assert actual[2]["build_name"] == "rubygem-rails" + event_info = events.scheduler.queue[2][3] + assert event_info == ('recover_orphaned_artifact: fake message', 'module-foo', + 'rubygem-rails', 'foo-1.0-1.module+e0095747') assert component_build.state == koji.BUILD_STATES["COMPLETE"] assert component_build.task_id == 12345 @@ -206,18 +203,14 @@ class TestKojiBuilder: component_build.state = None db_session.commit() - actual = builder.recover_orphaned_artifact(component_build) + recovered = builder.recover_orphaned_artifact(component_build) db_session.commit() - assert len(actual) == 1 - assert actual[0]["event"] == events.KOJI_BUILD_CHANGE - assert actual[0]["build_id"] == 91 - assert actual[0]["task_id"] == 12345 - assert actual[0]["build_new_state"] == koji.BUILD_STATES["COMPLETE"] - assert actual[0]["build_name"] == "rubygem-rails" - assert actual[0]["build_version"] == "1.0" - assert actual[0]["build_release"] == "1.{0}".format(dist_tag) - assert actual[0]["module_build_id"] == 4 + assert recovered + event_info = events.scheduler.queue[0][3] + assert event_info == ('recover_orphaned_artifact: fake message', 91, 12345, 1, + 'rubygem-rails', '1.0', '1.module+2+b8661ee4', 4, None) + assert component_build.state == koji.BUILD_STATES["COMPLETE"] assert component_build.task_id == 12345 assert component_build.state_reason == "Found existing build" @@ -260,18 +253,14 @@ class TestKojiBuilder: component_build.state = None db_session.commit() - actual = builder.recover_orphaned_artifact(component_build) + recovered = builder.recover_orphaned_artifact(component_build) db_session.commit() - assert len(actual) == 1 - assert actual[0]["event"] == events.KOJI_BUILD_CHANGE - assert actual[0]["build_id"] == 91 - assert actual[0]["task_id"] == 12345 - assert actual[0]["build_new_state"] == koji.BUILD_STATES["COMPLETE"] - assert actual[0]["build_name"] == "module-build-macros" - assert actual[0]["build_version"] == "1.0" - assert actual[0]["build_release"] == "1.{0}".format(dist_tag) - assert actual[0]["module_build_id"] == 4 + assert recovered + event_info = events.scheduler.queue[0][3] + assert event_info == ('recover_orphaned_artifact: fake message', 91, 12345, 1, + 'module-build-macros', '1.0', "1.{0}".format(dist_tag), 4, None) + assert component_build.state == koji.BUILD_STATES["COMPLETE"] assert component_build.task_id == 12345 assert component_build.state_reason == "Found existing build" @@ -312,10 +301,10 @@ class TestKojiBuilder: component_build.state = None db_session.commit() - actual = builder.recover_orphaned_artifact(component_build) + recovered = builder.recover_orphaned_artifact(component_build) db_session.commit() - assert actual == [] + assert not recovered # Make sure nothing erroneous gets tag assert builder.koji_session.tagBuild.call_count == 0 diff --git a/tests/test_utils/test_utils.py b/tests/test_utils/test_utils.py index e72e2701..dbbac009 100644 --- a/tests/test_utils/test_utils.py +++ b/tests/test_utils/test_utils.py @@ -1198,11 +1198,13 @@ class DummyModuleBuilder(GenericBuilder): class TestBatches: def setup_method(self, test_method): GenericBuilder.register_backend_class(DummyModuleBuilder) + events.scheduler.reset() def teardown_method(self, test_method): # clean_database() DummyModuleBuilder.TAGGED_COMPONENTS = [] GenericBuilder.register_backend_class(KojiModuleBuilder) + events.scheduler.reset() def test_start_next_batch_build_reuse(self, default_buildroot_groups): """ @@ -1218,7 +1220,8 @@ class TestBatches: module_build.batch = 1 builder = mock.MagicMock() - further_work = module_build_service.utils.start_next_batch_build( + builder.module_build_tag = {"name": "module-fedora-27-build"} + module_build_service.utils.start_next_batch_build( conf, module_build, builder) # Batch number should increase. @@ -1228,34 +1231,19 @@ class TestBatches: # build_new_state set to COMPLETE, but the current component build # state should be set to BUILDING, so KojiBuildChange message handler # handles the change properly. - for event_info in further_work: - if event_info["event"] == events.KOJI_BUILD_CHANGE: - assert event_info["build_new_state"] == koji.BUILD_STATES["COMPLETE"] + for event in events.scheduler.queue: + event_info = event[3] + if event_info[0].startswith("reuse_component"): + assert event_info[3] == koji.BUILD_STATES["COMPLETE"] component_build = models.ComponentBuild.from_component_event( db_session, - task_id=event_info["task_id"], - module_id=event_info["module_build_id"]) + task_id=event_info[2], + module_id=event_info[7]) assert component_build.state == koji.BUILD_STATES["BUILDING"] # When we handle these KojiBuildChange messages, MBS should tag all # the components just once. - for event_info in further_work: - if event_info["event"] == events.KOJI_BUILD_CHANGE: - module_build_service.scheduler.handlers.components.build_task_finalize( - msg_id=event_info["msg_id"], - build_id=event_info["build_id"], - task_id=event_info["task_id"], - build_new_state=event_info["build_new_state"], - build_name=event_info["build_name"], - build_version=event_info["build_version"], - build_release=event_info["build_release"], - module_build_id=event_info["module_build_id"], - state_reason=event_info["state_reason"] - ) - - # Since we have reused all the components in the batch, there should - # be fake KojiRepoChange message. - assert further_work[-1]["event"] == events.KOJI_REPO_CHANGE + events.scheduler.run() # Check that packages have been tagged just once. assert len(DummyModuleBuilder.TAGGED_COMPONENTS) == 2 @@ -1283,24 +1271,27 @@ class TestBatches: builder = mock.MagicMock() builder.recover_orphaned_artifact.return_value = [] - further_work = module_build_service.utils.start_next_batch_build( + module_build_service.utils.start_next_batch_build( conf, module_build, builder) # Batch number should increase. assert module_build.batch == 2 # Make sure we only have one message returned for the one reused component - assert len(further_work) == 1 + assert len(events.scheduler.queue) == 1 # The KojiBuildChange message in further_work should have build_new_state # set to COMPLETE, but the current component build state in the DB should be set # to BUILDING, so KojiBuildChange message handler handles the change # properly. - event_info = further_work[0] - assert event_info["build_new_state"] == koji.BUILD_STATES["COMPLETE"] + event_info = events.scheduler.queue[0][3] + assert event_info == ('reuse_component: fake msg', None, 90276227, 1, 'perl-Tangerine', + '0.23', '1.module+0+d027b723', 3, + 'Reused component from previous module build') component_build = models.ComponentBuild.from_component_event( db_session, - task_id=event_info["task_id"], - module_id=event_info["module_build_id"]) + task_id=event_info[2], + module_id=event_info[7], + ) assert component_build.state == koji.BUILD_STATES["BUILDING"] assert component_build.package == "perl-Tangerine" assert component_build.reused_component_id is not None @@ -1328,13 +1319,13 @@ class TestBatches: builder = mock.MagicMock() builder.recover_orphaned_artifact.return_value = [] - further_work = module_build_service.utils.start_next_batch_build( + module_build_service.utils.start_next_batch_build( conf, module_build, builder) # Batch number should increase. assert module_build.batch == 2 # No component reuse messages should be returned - assert len(further_work) == 0 + assert len(events.scheduler.queue) == 0 # Make sure that both components in the batch were submitted assert len(mock_sbc.mock_calls) == 2 @@ -1375,24 +1366,26 @@ class TestBatches: builder = mock.MagicMock() builder.recover_orphaned_artifact.return_value = [] - further_work = module_build_service.utils.start_next_batch_build( + module_build_service.utils.start_next_batch_build( conf, module_build, builder) # Batch number should increase assert module_build.batch == 2 # Make sure we only have one message returned for the one reused component - assert len(further_work) == 1 + assert len(events.scheduler.queue) == 1 # The buildsys.build.state.change message in further_work should have # build_new_state set to COMPLETE, but the current component build state # in the DB should be set to BUILDING, so the build state change handler # handles the change properly. - event_info = further_work[0] - assert event_info["build_new_state"] == koji.BUILD_STATES["COMPLETE"] + event_info = events.scheduler.queue[0][3] + assert event_info == ('reuse_component: fake msg', None, 90276227, 1, + 'perl-Tangerine', '0.23', '1.module+0+d027b723', 3, + 'Reused component from previous module build') component_build = models.ComponentBuild.from_component_event( db_session, - task_id=event_info["task_id"], - module_id=event_info["module_build_id"], + task_id=event_info[2], + module_id=event_info[7], ) assert component_build.state == koji.BUILD_STATES["BUILDING"] assert component_build.package == "perl-Tangerine" @@ -1409,19 +1402,23 @@ class TestBatches: db_session, "perl-Tangerine", 3) pt_component.state = koji.BUILD_STATES["COMPLETE"] + events.scheduler.reset() + # Start the next build batch - further_work = module_build_service.utils.start_next_batch_build( + module_build_service.utils.start_next_batch_build( conf, module_build, builder) # Batch number should increase assert module_build.batch == 3 # Verify that tangerine was reused even though perl-Tangerine was rebuilt in the previous # batch - event_info = further_work[0] - assert event_info["build_new_state"] == koji.BUILD_STATES["COMPLETE"] + event_info = events.scheduler.queue[0][3] + assert event_info == ('reuse_component: fake msg', None, 90276315, 1, 'tangerine', '0.22', + '3.module+0+d027b723', 3, + 'Reused component from previous module build') component_build = models.ComponentBuild.from_component_event( db_session, - task_id=event_info["task_id"], - module_id=event_info["module_build_id"] + task_id=event_info[2], + module_id=event_info[7], ) assert component_build.state == koji.BUILD_STATES["BUILDING"] assert component_build.package == "tangerine" @@ -1451,14 +1448,14 @@ class TestBatches: builder = mock.MagicMock() builder.recover_orphaned_artifact.return_value = [] - further_work = module_build_service.utils.start_next_batch_build( + module_build_service.utils.start_next_batch_build( conf, module_build, builder) # Batch number should increase. assert module_build.batch == 2 # Make sure we don't have any messages returned since no components should be reused - assert len(further_work) == 0 + assert len(events.scheduler.queue) == 0 # Make sure both components are set to the build state but not reused assert pt_component.state == koji.BUILD_STATES["BUILDING"] assert pt_component.reused_component_id is None @@ -1487,7 +1484,7 @@ class TestBatches: db_session.commit() builder = mock.MagicMock() - further_work = module_build_service.utils.start_next_batch_build( + module_build_service.utils.start_next_batch_build( conf, module_build, builder) # Batch number should not increase. @@ -1495,7 +1492,8 @@ class TestBatches: # Make sure start build was called for the second component which wasn't reused mock_sbc.assert_called_once() # No further work should be returned - assert len(further_work) == 0 + + assert len(events.scheduler.queue) == 0 def test_start_next_batch_build_repo_building(self, default_buildroot_groups): """ @@ -1524,9 +1522,11 @@ class TestBatches: class TestLocalBuilds: def setup_method(self): clean_database() + events.scheduler.reset() def teardown_method(self): clean_database() + events.scheduler.reset() def test_load_local_builds_name(self, conf_system, conf_resultsdir): module_build_service.utils.load_local_builds("testmodule")