diff --git a/module_build_service/scheduler/consumer.py b/module_build_service/scheduler/consumer.py index c617d88f..f863f54c 100644 --- a/module_build_service/scheduler/consumer.py +++ b/module_build_service/scheduler/consumer.py @@ -20,11 +20,6 @@ import moksha.hub import sqlalchemy.exc import module_build_service.messaging -import module_build_service.scheduler.handlers.repos -import module_build_service.scheduler.handlers.components -import module_build_service.scheduler.handlers.modules -import module_build_service.scheduler.handlers.tags -import module_build_service.scheduler.handlers.greenwave import module_build_service.monitor as monitor from module_build_service import models, log, conf @@ -32,9 +27,42 @@ from module_build_service.db_session import db_session from module_build_service.errors import IgnoreMessage from module_build_service.messaging import default_messaging_backend from module_build_service.scheduler import events +from module_build_service.scheduler.handlers import components +from module_build_service.scheduler.handlers import repos +from module_build_service.scheduler.handlers import modules +from module_build_service.scheduler.handlers import tags from module_build_service.scheduler.handlers import greenwave +def no_op_handler(*args, **kwargs): + return True + + +ON_BUILD_CHANGE_HANDLERS = { + koji.BUILD_STATES["BUILDING"]: no_op_handler, + koji.BUILD_STATES["COMPLETE"]: components.build_task_finalize, + koji.BUILD_STATES["FAILED"]: components.build_task_finalize, + koji.BUILD_STATES["CANCELED"]: components.build_task_finalize, + koji.BUILD_STATES["DELETED"]: no_op_handler, +} + +ON_MODULE_CHANGE_HANDLERS = { + models.BUILD_STATES["init"]: modules.init, + models.BUILD_STATES["wait"]: modules.wait, + models.BUILD_STATES["build"]: no_op_handler, + models.BUILD_STATES["failed"]: modules.failed, + models.BUILD_STATES["done"]: modules.done, + # XXX: DIRECT TRANSITION TO READY + models.BUILD_STATES["ready"]: no_op_handler, + models.BUILD_STATES["garbage"]: no_op_handler, +} + +# Only one kind of repo change event, though... +ON_REPO_CHANGE_HANDLER = repos.done +ON_TAG_CHANGE_HANDLER = tags.tagged +ON_DECISION_UPDATE_HANDLER = greenwave.decision_update + + class MBSConsumer(fedmsg.consumers.FedmsgConsumer): """ This is triggered by running fedmsg-hub. This class is responsible for ingesting and processing messages from the message bus. @@ -83,32 +111,6 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): msg = module_build_service.messaging._initial_messages.pop(0) self.incoming.put(msg) - from module_build_service.scheduler import handlers - - # These are our main lookup tables for figuring out what to run in - # response to what messaging events. - self.NO_OP = NO_OP = lambda *args, **kwargs: True - self.on_build_change = { - koji.BUILD_STATES["BUILDING"]: NO_OP, - koji.BUILD_STATES["COMPLETE"]: handlers.components.build_task_finalize, - koji.BUILD_STATES["FAILED"]: handlers.components.build_task_finalize, - koji.BUILD_STATES["CANCELED"]: handlers.components.build_task_finalize, - koji.BUILD_STATES["DELETED"]: NO_OP, - } - self.on_module_change = { - models.BUILD_STATES["init"]: handlers.modules.init, - models.BUILD_STATES["wait"]: handlers.modules.wait, - models.BUILD_STATES["build"]: NO_OP, - models.BUILD_STATES["failed"]: handlers.modules.failed, - models.BUILD_STATES["done"]: handlers.modules.done, - # XXX: DIRECT TRANSITION TO READY - models.BUILD_STATES["ready"]: NO_OP, - models.BUILD_STATES["garbage"]: NO_OP, - } - # Only one kind of repo change event, though... - self.on_repo_change = handlers.repos.done - self.on_tag_change = handlers.tags.tagged - self.on_decision_update = handlers.greenwave.decision_update self.sanity_check() def shutdown(self): @@ -180,10 +182,10 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): """ On startup, make sure our implementation is sane. """ # Ensure we have every state covered for state in models.BUILD_STATES: - if models.BUILD_STATES[state] not in self.on_module_change: + if models.BUILD_STATES[state] not in ON_MODULE_CHANGE_HANDLERS: raise KeyError("Module build states %r not handled." % state) for state in koji.BUILD_STATES: - if koji.BUILD_STATES[state] not in self.on_build_change: + if koji.BUILD_STATES[state] not in ON_BUILD_CHANGE_HANDLERS: raise KeyError("Koji build states %r not handled." % state) def _map_message(self, db_session, event_info): @@ -192,7 +194,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): event = event_info["event"] if event == events.KOJI_BUILD_CHANGE: - handler = self.on_build_change[event_info["build_new_state"]] + handler = ON_BUILD_CHANGE_HANDLERS[event_info["build_new_state"]] build = models.ComponentBuild.from_component_event( db_session, event_info["task_id"], event_info["module_build_id"]) if build: @@ -201,13 +203,13 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): if event == events.KOJI_REPO_CHANGE: return ( - self.on_repo_change, + ON_REPO_CHANGE_HANDLER, models.ModuleBuild.get_by_tag(db_session, event_info["repo_tag"]) ) if event == events.KOJI_TAG_CHANGE: return ( - self.on_tag_change, + ON_TAG_CHANGE_HANDLER, models.ModuleBuild.get_by_tag(db_session, event_info["tag_name"]) ) @@ -219,14 +221,13 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): state, type(state), valid_module_build_states )) return ( - self.on_module_change[state], - models.ModuleBuild.get_by_id( - db_session, event_info["module_build_id"]) + ON_MODULE_CHANGE_HANDLERS[state], + models.ModuleBuild.get_by_id(db_session, event_info["module_build_id"]) ) if event == events.GREENWAVE_DECISION_UPDATE: return ( - self.on_decision_update, + ON_DECISION_UPDATE_HANDLER, greenwave.get_corresponding_module_build(event_info["subject_identifier"]) ) @@ -243,7 +244,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): idx = "%s: %s, %s" % ( handler.__name__, event_info["event"], event_info["msg_id"]) - if handler is self.NO_OP: + if handler is no_op_handler: log.debug("Handler is NO_OP: %s", idx) return diff --git a/module_build_service/scheduler/producer.py b/module_build_service/scheduler/producer.py index d4d2bdce..da9e2931 100644 --- a/module_build_service/scheduler/producer.py +++ b/module_build_service/scheduler/producer.py @@ -1,501 +1,470 @@ # -*- coding: utf-8 -*- # SPDX-License-Identifier: MIT -""" The PollingProducer class that acts as a producer entry point for -fedmsg-hub. This class polls the database for tasks to do. -""" import koji import operator from datetime import timedelta, datetime -from sqlalchemy.orm import lazyload -from moksha.hub.api.producer import PollingProducer +from sqlalchemy.orm import lazyload, load_only import module_build_service.messaging import module_build_service.scheduler import module_build_service.scheduler.consumer -from module_build_service import conf, models, log +from module_build_service import celery_app, conf, models, log from module_build_service.builder import GenericBuilder from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder from module_build_service.utils.greenwave import greenwave from module_build_service.db_session import db_session -from module_build_service.scheduler import events +from module_build_service.scheduler.consumer import ON_MODULE_CHANGE_HANDLERS +from module_build_service.scheduler.handlers.components import build_task_finalize +from module_build_service.scheduler.handlers.tags import tagged -class MBSProducer(PollingProducer): - frequency = timedelta(seconds=conf.polling_interval) +@celery_app.on_after_configure.connect +def setup_periodic_tasks(sender, **kwargs): + tasks = ( + (log_summary, "Log summary of module builds and component builds"), + (process_waiting_module_builds, "Process waiting module builds"), + (fail_lost_builds, "Fail lost builds"), + (process_paused_module_builds, "Process paused module builds"), + (delete_old_koji_targets, "Delete old koji targets"), + (cleanup_stale_failed_builds, "Cleanup stale failed builds"), + (cancel_stuck_module_builds, "Cancel stuck module builds"), + (sync_koji_build_tags, "Sync Koji build tags"), + (poll_greenwave, "Gating module build to ready state"), + ) - @events.mbs_event_handler() - def poll(self): - try: - self.log_summary() - self.process_waiting_module_builds() - self.process_open_component_builds() - self.fail_lost_builds() - self.process_paused_module_builds(conf) - self.retrigger_new_repo_on_failure(conf) - self.delete_old_koji_targets(conf) - self.cleanup_stale_failed_builds(conf) - self.sync_koji_build_tags(conf) - self.poll_greenwave(conf) - except Exception: - msg = "Error in poller execution:" - log.exception(msg) + for task, name in tasks: + sender.add_periodic_task(conf.polling_interval, task.s(), name=name) - # Poller runs in its own thread. Database session can be removed safely. - db_session.remove() - log.info('Poller will now sleep for "{}" seconds'.format(conf.polling_interval)) +@celery_app.task +def log_summary(): + states = sorted(models.BUILD_STATES.items(), key=operator.itemgetter(1)) + for name, code in states: + query = db_session.query(models.ModuleBuild).filter_by(state=code) + count = query.count() + if count: + log.info(" * %s module builds in the %s state", count, name) + if name == "build": + for module_build in query.all(): + log.info(" * %r", module_build) + # First batch is number '1'. + for i in range(1, module_build.batch + 1): + n = len([c for c in module_build.component_builds if c.batch == i]) + log.info(" * %s components in batch %s", n, i) - def fail_lost_builds(self): - # This function is supposed to be handling only the part which can't be - # updated through messaging (e.g. srpm-build failures). Please keep it - # fit `n` slim. We do want rest to be processed elsewhere - # TODO re-use - if conf.system == "koji": - # We don't do this on behalf of users - koji_session = KojiModuleBuilder.get_session(conf, login=False) - log.info("Querying tasks for statuses:") - res = ( - db_session.query(models.ComponentBuild) - .filter_by(state=koji.BUILD_STATES["BUILDING"]) - .options(lazyload("module_build")) - .all() - ) +@celery_app.task +def process_waiting_module_builds(): + for state in ["init", "wait"]: + nudge_module_builds_in_state(state, 10) - log.info("Checking status for {0} tasks".format(len(res))) - for component_build in res: - log.debug(component_build.json(db_session)) - # Don't check tasks which haven't been triggered yet - if not component_build.task_id: - continue - # Don't check tasks for components which have been reused, - # they may have BUILDING state temporarily before we tag them - # to new module tag. Checking them would be waste of resources. - if component_build.reused_component_id: - log.debug( - 'Skipping check for task "{0}", ' - 'the component has been reused ("{1}").'.format( - component_build.task_id, component_build.reused_component_id) - ) - continue +def nudge_module_builds_in_state(state_name, older_than_minutes): + """ + Finds all the module builds in the `state` with `time_modified` older + than `older_than_minutes` and adds fake MBSModule message to the + work queue. + """ + log.info("Looking for module builds stuck in the %s state", state_name) + builds = models.ModuleBuild.by_state(db_session, state_name) + log.info(" %r module builds in the %s state...", len(builds), state_name) + now = datetime.utcnow() + time_modified_threshold = timedelta(minutes=older_than_minutes) + for build in builds: - task_id = component_build.task_id - - log.info('Checking status of task_id "{0}"'.format(task_id)) - task_info = koji_session.getTaskInfo(task_id) - - state_mapping = { - # Cancelled and failed builds should be marked as failed. - koji.TASK_STATES["CANCELED"]: koji.BUILD_STATES["FAILED"], - koji.TASK_STATES["FAILED"]: koji.BUILD_STATES["FAILED"], - # Completed tasks should be marked as complete. - koji.TASK_STATES["CLOSED"]: koji.BUILD_STATES["COMPLETE"], - } - - # If it is a closed/completed task, then we can extract the NVR - build_version, build_release = None, None # defaults - if task_info["state"] == koji.TASK_STATES["CLOSED"]: - builds = koji_session.listBuilds(taskID=task_id) - if not builds: - log.warning( - "Task ID %r is closed, but we found no builds in koji." % task_id) - elif len(builds) > 1: - log.warning( - "Task ID %r is closed, but more than one build is present!" % task_id) - else: - build_version = builds[0]["version"] - build_release = builds[0]["release"] - - log.info(" task {0!r} is in state {1!r}".format(task_id, task_info["state"])) - if task_info["state"] in state_mapping: - # Fake a fedmsg message on our internal queue - msg = { - "msg_id": "producer::fail_lost_builds fake msg", - "event": events.KOJI_BUILD_CHANGE, - "build_id": component_build.task_id, - "task_id": component_build.task_id, - "build_new_state": state_mapping[task_info["state"]], - "build_name": component_build.package, - "build_release": build_release, - "build_version": build_version, - "module_build_id": None, - "state_reason": None - } - module_build_service.scheduler.consumer.work_queue_put(msg) - - elif conf.system == "mock": - pass - - def cleanup_stale_failed_builds(self, conf): - """ Does various clean up tasks on stale failed module builds - :param conf: the MBS configuration object - :param db_session: a SQLAlchemy database session - """ - if conf.system == "koji": - stale_date = datetime.utcnow() - timedelta(days=conf.cleanup_failed_builds_time) - stale_module_builds = ( - db_session.query(models.ModuleBuild) - .filter( - models.ModuleBuild.state == models.BUILD_STATES["failed"], - models.ModuleBuild.time_modified <= stale_date, - ) - .all() - ) - if stale_module_builds: - log.info( - "{0} stale failed module build(s) will be cleaned up".format( - len(stale_module_builds)) - ) - for module in stale_module_builds: - log.info("{0!r} is stale and is being cleaned up".format(module)) - # Find completed artifacts in the stale build - artifacts = [c for c in module.component_builds if c.is_completed] - # If there are no completed artifacts, then there is nothing to tag - if artifacts: - # Set buildroot_connect=False so it doesn't recreate the Koji target and etc. - builder = GenericBuilder.create_from_module( - db_session, module, conf, buildroot_connect=False - ) - builder.untag_artifacts([c.nvr for c in artifacts]) - # Mark the artifacts as untagged in the database - for c in artifacts: - c.tagged = False - c.tagged_in_final = False - db_session.add(c) - state_reason = ( - "The module was garbage collected since it has failed over {0}" - " day(s) ago".format(conf.cleanup_failed_builds_time) - ) - module.transition( - db_session, - conf, - models.BUILD_STATES["garbage"], - state_reason=state_reason, - failure_type="user", - ) - db_session.add(module) - db_session.commit() - - def log_summary(self): - log.info("Current status:") - consumer = module_build_service.scheduler.consumer.get_global_consumer() - backlog = consumer.incoming.qsize() - log.info(" * internal queue backlog is {0}".format(backlog)) - states = sorted(models.BUILD_STATES.items(), key=operator.itemgetter(1)) - for name, code in states: - query = db_session.query(models.ModuleBuild).filter_by(state=code) - count = query.count() - if count: - log.info(" * {0} module builds in the {1} state".format(count, name)) - if name == "build": - for module_build in query.all(): - log.info(" * {0!r}".format(module_build)) - # First batch is number '1'. - for i in range(1, module_build.batch + 1): - n = len([c for c in module_build.component_builds if c.batch == i]) - log.info(" * {0} components in batch {1}".format(n, i)) - - def _nudge_module_builds_in_state(self, state_name, older_than_minutes): - """ - Finds all the module builds in the `state` with `time_modified` older - than `older_than_minutes` and adds fake MBSModule message to the - work queue. - """ - log.info("Looking for module builds stuck in the %s state", state_name) - builds = models.ModuleBuild.by_state(db_session, state_name) - log.info(" %r module builds in the %s state...", len(builds), state_name) - now = datetime.utcnow() - time_modified_threshold = timedelta(minutes=older_than_minutes) - for build in builds: - - # Only give builds a nudge if stuck for more than ten minutes - if (now - build.time_modified) < time_modified_threshold: - continue - - # Pretend the build is modified, so we don't tight spin. - build.time_modified = now - - # Fake a message to kickstart the build anew in the consumer - state = module_build_service.models.BUILD_STATES[state_name] - msg = { - "msg_id": "nudge_module_builds_fake_message", - "event": events.MBS_MODULE_STATE_CHANGE, - "module_build_id": build.id, - "module_build_state": state, - } - log.info(" Scheduling faked event %r", msg) - module_build_service.scheduler.consumer.work_queue_put(msg) + # Only give builds a nudge if stuck for more than ten minutes + if (now - build.time_modified) < time_modified_threshold: + continue + # Pretend the build is modified, so we don't tight spin. + build.time_modified = now db_session.commit() - def process_waiting_module_builds(self): - for state in ["init", "wait"]: - self._nudge_module_builds_in_state(state, 10) + # Fake a message to kickstart the build anew in the consumer + state = module_build_service.models.BUILD_STATES[state_name] + handler = ON_MODULE_CHANGE_HANDLERS[state] + handler.delay("internal:mbs.module.state.change", build.id, state) - def process_open_component_builds(self): - log.warning("process_open_component_builds is not yet implemented...") - def process_paused_module_builds(self, config): - log.info("Looking for paused module builds in the build state") - if module_build_service.utils.at_concurrent_component_threshold(config): - log.debug( - "Will not attempt to start paused module builds due to " - "the concurrent build threshold being met" - ) - return +def process_open_component_builds(): + log.warning("process_open_component_builds is not yet implemented...") - ten_minutes = timedelta(minutes=10) - # Check for module builds that are in the build state but don't have any active component - # builds. Exclude module builds in batch 0. This is likely a build of a module without - # components. - module_builds = ( - db_session.query(models.ModuleBuild) - .filter( - models.ModuleBuild.state == models.BUILD_STATES["build"], - models.ModuleBuild.batch > 0, - ) - .all() - ) - for module_build in module_builds: - now = datetime.utcnow() - # Only give builds a nudge if stuck for more than ten minutes - if (now - module_build.time_modified) < ten_minutes: - continue - # If there are no components in the build state on the module build, - # then no possible event will start off new component builds. - # But do not try to start new builds when we are waiting for the - # repo-regen. - if not module_build.current_batch(koji.BUILD_STATES["BUILDING"]): - # Initialize the builder... - builder = GenericBuilder.create_from_module(db_session, module_build, config) - if _has_missed_new_repo_message(module_build, builder.koji_session): - log.info(" Processing the paused module build %r", module_build) - module_build_service.utils.start_next_batch_build( - config, module_build, builder) - - # Check if we have met the threshold. - if module_build_service.utils.at_concurrent_component_threshold(config): - break - - def retrigger_new_repo_on_failure(self, config): - """ - Retrigger failed new repo tasks for module builds in the build state. - - The newRepo task may fail for various reasons outside the scope of MBS. - This method will detect this scenario and retrigger the newRepo task - if needed to avoid the module build from being stuck in the "build" state. - """ - if config.system != "koji": - return - - koji_session = module_build_service.builder.KojiModuleBuilder.KojiModuleBuilder.get_session( - config) - - for module_build in ( - db_session.query(models.ModuleBuild).filter_by(state=models.BUILD_STATES["build"]).all() - ): - if not module_build.new_repo_task_id: - continue - - task_info = koji_session.getTaskInfo(module_build.new_repo_task_id) - if task_info["state"] in [koji.TASK_STATES["CANCELED"], koji.TASK_STATES["FAILED"]]: - log.info( - "newRepo task %s for %r failed, starting another one", - str(module_build.new_repo_task_id), module_build, - ) - taginfo = koji_session.getTag(module_build.koji_tag + "-build") - module_build.new_repo_task_id = koji_session.newRepo(taginfo["name"]) - - db_session.commit() - - def delete_old_koji_targets(self, config): - """ - Deletes targets older than `config.koji_target_delete_time` seconds - from Koji to cleanup after the module builds. - """ - if config.system != "koji": - return - - log.info("Looking for module builds which Koji target can be removed") - - now = datetime.utcnow() - - koji_session = KojiModuleBuilder.get_session(config) - for target in koji_session.getBuildTargets(): - koji_tag = target["dest_tag_name"] - module = db_session.query(models.ModuleBuild).filter_by(koji_tag=koji_tag).first() - if ( - not module - or module.name in conf.base_module_names - or module.state in [ - models.BUILD_STATES["init"], - models.BUILD_STATES["wait"], - models.BUILD_STATES["build"], - ] - ): - continue - - # Double-check that the target we are going to remove is prefixed - # by our prefix, so we won't remove f26 when there is some garbage - # in DB or Koji. - for allowed_prefix in config.koji_tag_prefixes: - if target["name"].startswith(allowed_prefix + "-"): - break - else: - log.error("Module %r has Koji target with not allowed prefix.", module) - continue - - delta = now - module.time_completed - if delta.total_seconds() > config.koji_target_delete_time: - log.info("Removing target of module %r", module) - koji_session.deleteBuildTarget(target["id"]) - - def cancel_stuck_module_builds(self, config): - """ - Method transitions builds which are stuck in one state too long to the "failed" state. - The states are defined with the "cleanup_stuck_builds_states" config option and the - time is defined by the "cleanup_stuck_builds_time" config option. - """ - log.info( - 'Looking for module builds stuck in the states "{states}" more than {days} days' - .format( - states=" and ".join(config.cleanup_stuck_builds_states), - days=config.cleanup_stuck_builds_time, - ) - ) - - delta = timedelta(days=config.cleanup_stuck_builds_time) - now = datetime.utcnow() - threshold = now - delta - states = [ - module_build_service.models.BUILD_STATES[state] - for state in config.cleanup_stuck_builds_states - ] - - module_builds = ( - db_session.query(models.ModuleBuild) - .filter( - models.ModuleBuild.state.in_(states), models.ModuleBuild.time_modified < threshold - ) - .all() - ) - - log.info(" {0!r} module builds are stuck...".format(len(module_builds))) - - for build in module_builds: - nsvc = ":".join([build.name, build.stream, build.version, build.context]) - log.info('Transitioning build "{nsvc}" to "Failed" state.'.format(nsvc=nsvc)) - - state_reason = "The module was in {state} for more than {days} days".format( - state=build.state, days=config.cleanup_stuck_builds_time - ) - build.transition( - db_session, - config, - state=models.BUILD_STATES["failed"], - state_reason=state_reason, - failure_type="user", - ) - db_session.commit() - - def sync_koji_build_tags(self, config): - """ - Method checking the "tagged" and "tagged_in_final" attributes of - "complete" ComponentBuilds in the current batch of module builds - in "building" state against the Koji. - - In case the Koji shows the build as tagged/tagged_in_final, - fake "tagged" message is added to work queue. - """ - if conf.system != "koji": - return +@celery_app.task +def fail_lost_builds(): + # This function is supposed to be handling only the part which can't be + # updated through messaging (e.g. srpm-build failures). Please keep it + # fit `n` slim. We do want rest to be processed elsewhere + # TODO re-use + if conf.system == "koji": + # We don't do this on behalf of users koji_session = KojiModuleBuilder.get_session(conf, login=False) + log.info("Querying tasks for statuses:") + res = db_session.query(models.ComponentBuild).filter_by( + state=koji.BUILD_STATES["BUILDING"] + ).options(lazyload("module_build")).all() - threshold = datetime.utcnow() - timedelta(minutes=10) - module_builds = db_session.query(models.ModuleBuild).filter( - models.ModuleBuild.time_modified < threshold, - models.ModuleBuild.state == models.BUILD_STATES["build"] - ).all() - for module_build in module_builds: - complete_components = module_build.current_batch(koji.BUILD_STATES["COMPLETE"]) - for c in complete_components: - # In case the component is tagged in the build tag and - # also tagged in the final tag (or it is build_time_only - # and therefore should not be tagged in final tag), skip it. - if c.tagged and (c.tagged_in_final or c.build_time_only): - continue + log.info("Checking status for %s tasks", len(res)) + for component_build in res: + log.debug(component_build.json(db_session)) + # Don't check tasks which haven't been triggered yet + if not component_build.task_id: + continue - log.info( - "%r: Component %r is complete, but not tagged in the " - "final and/or build tags.", - module_build, c, + # Don't check tasks for components which have been reused, + # they may have BUILDING state temporarily before we tag them + # to new module tag. Checking them would be waste of resources. + if component_build.reused_component_id: + log.debug( + 'Skipping check for task "%s", the component has been reused ("%s").', + component_build.task_id, component_build.reused_component_id + ) + continue + + task_id = component_build.task_id + + log.info('Checking status of task_id "%s"', task_id) + task_info = koji_session.getTaskInfo(task_id) + + state_mapping = { + # Cancelled and failed builds should be marked as failed. + koji.TASK_STATES["CANCELED"]: koji.BUILD_STATES["FAILED"], + koji.TASK_STATES["FAILED"]: koji.BUILD_STATES["FAILED"], + # Completed tasks should be marked as complete. + koji.TASK_STATES["CLOSED"]: koji.BUILD_STATES["COMPLETE"], + } + + # If it is a closed/completed task, then we can extract the NVR + build_version, build_release = None, None # defaults + if task_info["state"] == koji.TASK_STATES["CLOSED"]: + builds = koji_session.listBuilds(taskID=task_id) + if not builds: + log.warning( + "Task ID %r is closed, but we found no builds in koji.", task_id) + elif len(builds) > 1: + log.warning( + "Task ID %r is closed, but more than one build is present!", task_id) + else: + build_version = builds[0]["version"] + build_release = builds[0]["release"] + + log.info(" task %r is in state %r", task_id, task_info["state"]) + if task_info["state"] in state_mapping: + build_task_finalize.delay( + msg_id="producer::fail_lost_builds fake msg", + build_id=component_build.task_id, + task_id=component_build.task_id, + build_new_state=state_mapping[task_info["state"]], + build_name=component_build.package, + build_release=build_release, + build_version=build_version, ) - # Check in which tags the component is tagged. - tag_dicts = koji_session.listTags(c.nvr) - tags = [tag_dict["name"] for tag_dict in tag_dicts] + elif conf.system == "mock": + pass - # If it is tagged in final tag, but MBS does not think so, - # schedule fake message. - if not c.tagged_in_final and module_build.koji_tag in tags: - msg = { - "msg_id": "sync_koji_build_tags_fake_message", - "event": events.KOJI_TAG_CHANGE, - "tag_name": module_build.koji_tag, - "build_name": c.package, - "build_nvr": c.nvr, - } - log.info(" Scheduling faked event %r", msg) - module_build_service.scheduler.consumer.work_queue_put(msg) - # If it is tagged in the build tag, but MBS does not think so, - # schedule fake message. - build_tag = module_build.koji_tag + "-build" - if not c.tagged and build_tag in tags: - msg = { - "msg_id": "sync_koji_build_tags_fake_message", - "event": events.KOJI_TAG_CHANGE, - "tag_name": build_tag, - "build_name": c.package, - "build_nvr": c.nvr, - } - log.info(" Scheduling faked event %r", msg) - module_build_service.scheduler.consumer.work_queue_put(msg) - - def poll_greenwave(self, config): - """ - Polls Greenwave for all builds in done state - :param db_session: SQLAlchemy DB session - :return: None - """ - if greenwave is None: - return - - module_builds = ( - db_session.query(models.ModuleBuild) - .filter_by(state=models.BUILD_STATES["done"], scratch=False).all() +@celery_app.task +def process_paused_module_builds(): + log.info("Looking for paused module builds in the build state") + if module_build_service.utils.at_concurrent_component_threshold(conf): + log.debug( + "Will not attempt to start paused module builds due to " + "the concurrent build threshold being met" ) + return - log.info("Checking Greenwave for %d builds", len(module_builds)) + ten_minutes = timedelta(minutes=10) + # Check for module builds that are in the build state but don't have any active component + # builds. Exclude module builds in batch 0. This is likely a build of a module without + # components. + module_builds = db_session.query(models.ModuleBuild).filter( + models.ModuleBuild.state == models.BUILD_STATES["build"], + models.ModuleBuild.batch > 0, + ).all() + for module_build in module_builds: + now = datetime.utcnow() + # Only give builds a nudge if stuck for more than ten minutes + if (now - module_build.time_modified) < ten_minutes: + continue + # If there are no components in the build state on the module build, + # then no possible event will start off new component builds. + # But do not try to start new builds when we are waiting for the + # repo-regen. + if not module_build.current_batch(koji.BUILD_STATES["BUILDING"]): + # Initialize the builder... + builder = GenericBuilder.create_from_module( + db_session, module_build, conf) - for build in module_builds: - if greenwave.check_gating(build): - build.transition(db_session, config, state=models.BUILD_STATES["ready"]) - else: - build.state_reason = "Gating failed (MBS will retry in {0} seconds)".format( - conf.polling_interval - ) - if greenwave.error_occurred: - build.state_reason += " (Error occured while querying Greenwave)" - build.time_modified = datetime.utcnow() - db_session.commit() + if has_missed_new_repo_message(module_build, builder.koji_session): + log.info(" Processing the paused module build %r", module_build) + module_build_service.utils.start_next_batch_build( + conf, module_build, builder) + + # Check if we have met the threshold. + if module_build_service.utils.at_concurrent_component_threshold(conf): + break -def _has_missed_new_repo_message(module_build, koji_session): +@celery_app.task +def retrigger_new_repo_on_failure(): + """ + Retrigger failed new repo tasks for module builds in the build state. + + The newRepo task may fail for various reasons outside the scope of MBS. + This method will detect this scenario and retrigger the newRepo task + if needed to avoid the module build from being stuck in the "build" state. + """ + if conf.system != "koji": + return + + koji_session = KojiModuleBuilder.get_session(conf) + module_builds = db_session.query(models.ModuleBuild).filter( + models.ModuleBuild.state == models.BUILD_STATES["build"], + models.ModuleBuild.new_repo_task_id.isnot(None), + ).all() + + for module_build in module_builds: + task_info = koji_session.getTaskInfo(module_build.new_repo_task_id) + if task_info["state"] in [koji.TASK_STATES["CANCELED"], koji.TASK_STATES["FAILED"]]: + log.info( + "newRepo task %s for %r failed, starting another one", + str(module_build.new_repo_task_id), module_build, + ) + taginfo = koji_session.getTag(module_build.koji_tag + "-build") + module_build.new_repo_task_id = koji_session.newRepo(taginfo["name"]) + + db_session.commit() + + +@celery_app.task +def delete_old_koji_targets(): + """ + Deletes targets older than `config.koji_target_delete_time` seconds + from Koji to cleanup after the module builds. + """ + if conf.system != "koji": + return + + log.info("Looking for module builds which Koji target can be removed") + + now = datetime.utcnow() + + koji_session = KojiModuleBuilder.get_session(conf) + for target in koji_session.getBuildTargets(): + module = db_session.query(models.ModuleBuild).filter( + models.ModuleBuild.koji_tag == target["dest_tag_name"], + models.ModuleBuild.name.notin_(conf.base_module_names), + models.ModuleBuild.state.notin_([ + models.BUILD_STATES["init"], + models.BUILD_STATES["wait"], + models.BUILD_STATES["build"], + ]), + ).options( + load_only("time_completed"), + ).first() + + if module is None: + continue + + # Double-check that the target we are going to remove is prefixed + # by our prefix, so we won't remove f26 when there is some garbage + # in DB or Koji. + for allowed_prefix in conf.koji_tag_prefixes: + if target["name"].startswith(allowed_prefix + "-"): + break + else: + log.error("Module %r has Koji target with not allowed prefix.", module) + continue + + delta = now - module.time_completed + if delta.total_seconds() > conf.koji_target_delete_time: + log.info("Removing target of module %r", module) + koji_session.deleteBuildTarget(target["id"]) + + +@celery_app.task +def cleanup_stale_failed_builds(): + """Does various clean up tasks on stale failed module builds""" + + if conf.system != "koji": + return + + stale_date = datetime.utcnow() - timedelta(days=conf.cleanup_failed_builds_time) + stale_module_builds = db_session.query(models.ModuleBuild).filter( + models.ModuleBuild.state == models.BUILD_STATES["failed"], + models.ModuleBuild.time_modified <= stale_date, + ).all() + if stale_module_builds: + log.info( + "%s stale failed module build(s) will be cleaned up", + len(stale_module_builds) + ) + for module in stale_module_builds: + log.info("%r is stale and is being cleaned up", module) + # Find completed artifacts in the stale build + artifacts = [c for c in module.component_builds if c.is_completed] + # If there are no completed artifacts, then there is nothing to tag + if artifacts: + # Set buildroot_connect=False so it doesn't recreate the Koji target and etc. + builder = GenericBuilder.create_from_module( + db_session, module, conf, buildroot_connect=False + ) + builder.untag_artifacts([c.nvr for c in artifacts]) + # Mark the artifacts as untagged in the database + for c in artifacts: + c.tagged = False + c.tagged_in_final = False + db_session.add(c) + state_reason = ( + "The module was garbage collected since it has failed over {0}" + " day(s) ago".format(conf.cleanup_failed_builds_time) + ) + module.transition( + db_session, + conf, + models.BUILD_STATES["garbage"], + state_reason=state_reason, + failure_type="user", + ) + db_session.add(module) + db_session.commit() + + +@celery_app.task +def cancel_stuck_module_builds(): + """ + Method transitions builds which are stuck in one state too long to the "failed" state. + The states are defined with the "cleanup_stuck_builds_states" config option and the + time is defined by the "cleanup_stuck_builds_time" config option. + """ + log.info( + 'Looking for module builds stuck in the states "%s" more than %s days', + " and ".join(conf.cleanup_stuck_builds_states), + conf.cleanup_stuck_builds_time, + ) + + threshold = datetime.utcnow() - timedelta(days=conf.cleanup_stuck_builds_time) + states = [ + module_build_service.models.BUILD_STATES[state] + for state in conf.cleanup_stuck_builds_states + ] + + module_builds = db_session.query(models.ModuleBuild).filter( + models.ModuleBuild.state.in_(states), + models.ModuleBuild.time_modified < threshold + ).all() + + log.info(" %s module builds are stuck...", len(module_builds)) + + for build in module_builds: + log.info( + 'Transitioning build "%s:%s:%s:%s" to "Failed" state.', + build.name, build.stream, build.version, build.context + ) + state_reason = "The module was in {} for more than {} days".format( + build.state, conf.cleanup_stuck_builds_time + ) + build.transition( + db_session, + conf, + state=models.BUILD_STATES["failed"], + state_reason=state_reason, + failure_type="user", + ) + db_session.commit() + + +@celery_app.task +def sync_koji_build_tags(): + """ + Method checking the "tagged" and "tagged_in_final" attributes of + "complete" ComponentBuilds in the current batch of module builds + in "building" state against the Koji. + + In case the Koji shows the build as tagged/tagged_in_final, + fake "tagged" message is added to work queue. + """ + if conf.system != "koji": + return + + koji_session = KojiModuleBuilder.get_session(conf, login=False) + + threshold = datetime.utcnow() - timedelta(minutes=10) + module_builds = db_session.query(models.ModuleBuild).filter( + models.ModuleBuild.time_modified < threshold, + models.ModuleBuild.state == models.BUILD_STATES["build"] + ).all() + for module_build in module_builds: + complete_components = module_build.current_batch(koji.BUILD_STATES["COMPLETE"]) + for c in complete_components: + # In case the component is tagged in the build tag and + # also tagged in the final tag (or it is build_time_only + # and therefore should not be tagged in final tag), skip it. + if c.tagged and (c.tagged_in_final or c.build_time_only): + continue + + log.info( + "%r: Component %r is complete, but not tagged in the " + "final and/or build tags.", + module_build, c, + ) + + # Check in which tags the component is tagged. + tag_dicts = koji_session.listTags(c.nvr) + tags = [tag_dict["name"] for tag_dict in tag_dicts] + + # If it is tagged in final tag, but MBS does not think so, + # schedule fake message. + if not c.tagged_in_final and module_build.koji_tag in tags: + log.info( + "Apply tag %s to module build %r", + module_build.koji_tag, module_build) + tagged.delay( + "internal:sync_koji_build_tags", + module_build.koji_tag, c.package, c.nvr) + + # If it is tagged in the build tag, but MBS does not think so, + # schedule fake message. + build_tag = module_build.koji_tag + "-build" + if not c.tagged and build_tag in tags: + log.info( + "Apply build tag %s to module build %r", + build_tag, module_build) + tagged.delay( + "internal:sync_koji_build_tags", + build_tag, c.package, c.nvr) + + +@celery_app.task +def poll_greenwave(): + """Polls Greenwave for all builds in done state""" + if greenwave is None: + return + + module_builds = db_session.query(models.ModuleBuild).filter_by( + state=models.BUILD_STATES["done"], + scratch=False + ).all() + + log.info("Checking Greenwave for %d builds", len(module_builds)) + + for build in module_builds: + if greenwave.check_gating(build): + build.transition(db_session, conf, state=models.BUILD_STATES["ready"]) + else: + build.state_reason = "Gating failed (MBS will retry in {0} seconds)".format( + conf.polling_interval + ) + if greenwave.error_occurred: + build.state_reason += " (Error occured while querying Greenwave)" + build.time_modified = datetime.utcnow() + db_session.commit() + + +def has_missed_new_repo_message(module_build, koji_session): """ Returns whether or not a new repo message has probably been missed. """ @@ -504,7 +473,8 @@ def _has_missed_new_repo_message(module_build, koji_session): # message so module build can recover. return True log.debug( - 'Checking status of newRepo task "%d" for %s', module_build.new_repo_task_id, module_build) + 'Checking status of newRepo task "%d" for %s', + module_build.new_repo_task_id, module_build) task_info = koji_session.getTaskInfo(module_build.new_repo_task_id) # Other final states, FAILED and CANCELED, are handled by retrigger_new_repo_on_failure return task_info["state"] == koji.TASK_STATES["CLOSED"] diff --git a/tests/test_content_generator.py b/tests/test_content_generator.py index d08efed3..6a2feeb4 100644 --- a/tests/test_content_generator.py +++ b/tests/test_content_generator.py @@ -8,7 +8,6 @@ import os from os import path import module_build_service.messaging -import module_build_service.scheduler.handlers.repos # noqa from module_build_service import models, conf, build_logs, Modulemd from module_build_service.db_session import db_session from module_build_service.utils.general import mmd_to_str diff --git a/tests/test_scheduler/test_poller.py b/tests/test_scheduler/test_poller.py index b3380d4f..197dee45 100644 --- a/tests/test_scheduler/test_poller.py +++ b/tests/test_scheduler/test_poller.py @@ -2,15 +2,13 @@ # SPDX-License-Identifier: MIT import re import pytest -from mock import patch +from mock import call, patch from module_build_service import models, conf from tests import clean_database, make_module_in_db import mock import koji from module_build_service.db_session import db_session -from module_build_service.scheduler import events -from module_build_service.scheduler.producer import MBSProducer -import six.moves.queue as queue +from module_build_service.scheduler import producer from datetime import datetime, timedelta @@ -19,7 +17,6 @@ from datetime import datetime, timedelta "module_build_service.builder.GenericBuilder.default_buildroot_groups", return_value={"build": [], "srpm-build": []}, ) -@patch("module_build_service.scheduler.consumer.get_global_consumer") @patch("module_build_service.builder.GenericBuilder.create_from_module") class TestPoller: def setup_method(self, test_method): @@ -40,15 +37,11 @@ class TestPoller: @pytest.mark.parametrize("fresh", [True, False]) @patch("module_build_service.utils.batches.start_build_component") def test_process_paused_module_builds( - self, start_build_component, create_builder, global_consumer, dbg, fresh + self, start_build_component, create_builder, dbg, fresh ): """ Tests general use-case of process_paused_module_builds. """ - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - builder = mock.MagicMock() create_builder.return_value = builder @@ -64,9 +57,7 @@ class TestPoller: db_session.commit() # Poll :) - hub = mock.MagicMock() - poller = MBSProducer(hub) - poller.poll() + producer.process_paused_module_builds() module_build = models.ModuleBuild.get_by_id(db_session, 3) @@ -92,16 +83,12 @@ class TestPoller: )) @patch("module_build_service.utils.batches.start_build_component") def test_process_paused_module_builds_with_new_repo_task( - self, start_build_component, create_builder, global_consumer, dbg, task_state, + self, start_build_component, create_builder, dbg, task_state, expect_start_build_component ): """ Tests general use-case of process_paused_module_builds. """ - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - builder = mock.MagicMock() create_builder.return_value = builder @@ -118,9 +105,7 @@ class TestPoller: db_session.commit() # Poll :) - hub = mock.MagicMock() - poller = MBSProducer(hub) - poller.poll() + producer.process_paused_module_builds() module_build = models.ModuleBuild.get_by_id(db_session, 3) @@ -139,16 +124,10 @@ class TestPoller: @patch.dict("sys.modules", krbV=mock.MagicMock()) @patch("koji.ClientSession") - def test_retrigger_new_repo_on_failure( - self, ClientSession, create_builder, global_consumer, dbg - ): + def test_retrigger_new_repo_on_failure(self, ClientSession, create_builder, dbg): """ Tests that we call koji_sesion.newRepo when newRepo task failed. """ - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - koji_session = ClientSession.return_value koji_session.getTag = lambda tag_name: {"name": tag_name} koji_session.getTaskInfo.return_value = {"state": koji.TASK_STATES["FAILED"]} @@ -165,26 +144,18 @@ class TestPoller: module_build.new_repo_task_id = 123456 db_session.commit() - hub = mock.MagicMock() - poller = MBSProducer(hub) - poller.poll() + producer.retrigger_new_repo_on_failure() koji_session.newRepo.assert_called_once_with( "module-testmodule-master-20170219191323-c40c156c-build") @patch.dict("sys.modules", krbV=mock.MagicMock()) @patch("koji.ClientSession") - def test_trigger_new_repo_when_succeeded( - self, ClientSession, create_builder, global_consumer, dbg - ): + def test_trigger_new_repo_when_succeeded(self, ClientSession, create_builder, dbg): """ Tests that we do not call koji_sesion.newRepo when newRepo task succeeded. """ - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - koji_session = ClientSession.return_value koji_session.getTag = lambda tag_name: {"name": tag_name} koji_session.getTaskInfo.return_value = {"state": koji.TASK_STATES["CLOSED"]} @@ -201,26 +172,18 @@ class TestPoller: module_build.new_repo_task_id = 123456 db_session.commit() - hub = mock.MagicMock() - poller = MBSProducer(hub) - poller.poll() + producer.retrigger_new_repo_on_failure() module_build = models.ModuleBuild.get_by_id(db_session, 3) assert not koji_session.newRepo.called assert module_build.new_repo_task_id == 123456 - def test_process_paused_module_builds_waiting_for_repo( - self, create_builder, global_consumer, dbg - ): + def test_process_paused_module_builds_waiting_for_repo(self, create_builder, dbg): """ Tests that process_paused_module_builds does not start new batch when we are waiting for repo. """ - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - builder = mock.MagicMock() create_builder.return_value = builder @@ -232,9 +195,7 @@ class TestPoller: db_session.commit() # Poll :) - hub = mock.MagicMock() - poller = MBSProducer(hub) - poller.poll() + producer.process_paused_module_builds() module_build = models.ModuleBuild.get_by_id(db_session, 3) @@ -246,12 +207,8 @@ class TestPoller: @patch.dict("sys.modules", krbV=mock.MagicMock()) @patch("koji.ClientSession") def test_old_build_targets_are_not_associated_with_any_module_builds( - self, ClientSession, create_builder, global_consumer, dbg + self, ClientSession, create_builder, dbg ): - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - koji_session = ClientSession.return_value # No created module build has any of these tags. koji_session.getBuildTargets.return_value = [ @@ -259,16 +216,14 @@ class TestPoller: {"dest_tag_name": "module-yyy-2"}, ] - hub = mock.MagicMock() - poller = MBSProducer(hub) - poller.delete_old_koji_targets(conf) + producer.delete_old_koji_targets() koji_session.deleteBuildTarget.assert_not_called() @patch.dict("sys.modules", krbV=mock.MagicMock()) @patch("koji.ClientSession") def test_dont_delete_base_module_build_target( - self, ClientSession, create_builder, global_consumer, dbg + self, ClientSession, create_builder, dbg ): module_build = models.ModuleBuild.get_by_id(db_session, 3) @@ -276,24 +231,16 @@ class TestPoller: # No created module build has any of these tags. koji_session.getBuildTargets.return_value = [{"dest_tag_name": module_build.koji_tag}] - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - # If module build's name is one of base module names, build target # should not be deleted. with patch.object(conf, "base_module_names", new=[module_build.name]): - - hub = mock.MagicMock() - poller = MBSProducer(hub) - poller.delete_old_koji_targets(conf) - + producer.delete_old_koji_targets() koji_session.deleteBuildTarget.assert_not_called() @patch.dict("sys.modules", krbV=mock.MagicMock()) @patch("koji.ClientSession") def test_dont_delete_build_target_for_unfinished_module_builds( - self, ClientSession, create_builder, global_consumer, dbg + self, ClientSession, create_builder, dbg ): module_build = models.ModuleBuild.get_by_id(db_session, 3) @@ -301,26 +248,20 @@ class TestPoller: # No created module build has any of these tags. koji_session.getBuildTargets.return_value = [{"dest_tag_name": module_build.koji_tag}] - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - # Each time when a module build is in one of these state, build target # should not be deleted. for state in ["init", "wait", "build"]: module_build.state = state db_session.commit() - hub = mock.MagicMock() - poller = MBSProducer(hub) - poller.delete_old_koji_targets(conf) + producer.delete_old_koji_targets() koji_session.deleteBuildTarget.assert_not_called() @patch.dict("sys.modules", krbV=mock.MagicMock()) @patch("koji.ClientSession") def test_only_delete_build_target_with_allowed_koji_tag_prefix( - self, ClientSession, create_builder, global_consumer, dbg + self, ClientSession, create_builder, dbg ): module_build_2 = models.ModuleBuild.get_by_id(db_session, 2) # Only module build 1's build target should be deleted. @@ -343,15 +284,9 @@ class TestPoller: {"id": 2, "dest_tag_name": module_build_3.koji_tag, "name": module_build_3.koji_tag}, ] - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - with patch.object(conf, "koji_tag_prefixes", new=["module", "another-prefix"]): with patch.object(conf, "koji_target_delete_time", new=60): - hub = mock.MagicMock() - poller = MBSProducer(hub) - poller.delete_old_koji_targets(conf) + producer.delete_old_koji_targets() koji_session.deleteBuildTarget.assert_called_once_with(1) koji_session.krb_login.assert_called_once() @@ -359,7 +294,7 @@ class TestPoller: @patch.dict("sys.modules", krbV=mock.MagicMock()) @patch("koji.ClientSession") def test_cant_delete_build_target_if_not_reach_delete_time( - self, ClientSession, create_builder, global_consumer, dbg + self, ClientSession, create_builder, dbg ): module_build_2 = models.ModuleBuild.get_by_id(db_session, 2) # Only module build 1's build target should be deleted. @@ -377,31 +312,22 @@ class TestPoller: {"id": 1, "dest_tag_name": module_build_2.koji_tag, "name": module_build_2.koji_tag} ] - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - with patch.object(conf, "koji_tag_prefixes", new=["module"]): # Use default koji_target_delete_time in config. That time is long # enough for test. - hub = mock.MagicMock() - poller = MBSProducer(hub) - poller.delete_old_koji_targets(conf) + producer.delete_old_koji_targets() koji_session.deleteBuildTarget.assert_not_called() @pytest.mark.parametrize("state", ["init", "wait"]) - def test_process_waiting_module_build( - self, create_builder, global_consumer, dbg, state - ): + @patch.dict(producer.ON_MODULE_CHANGE_HANDLERS, clear=True, values={ + models.BUILD_STATES["init"]: mock.Mock(), + models.BUILD_STATES["wait"]: mock.Mock(), + }) + def test_process_waiting_module_build(self, create_builder, dbg, state): """ Test that processing old waiting module builds works. """ - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - - hub = mock.MagicMock() - poller = MBSProducer(hub) + handler = producer.ON_MODULE_CHANGE_HANDLERS[models.BUILD_STATES[state]] # Change the batch to 2, so the module build is in state where # it is not building anything, but the state is "build". @@ -413,32 +339,32 @@ class TestPoller: db_session.commit() db_session.refresh(module_build) - # Ensure the queue is empty before we start. - assert consumer.incoming.qsize() == 0 - # Poll :) - poller.process_waiting_module_builds() + producer.process_waiting_module_builds() - assert consumer.incoming.qsize() == 1 + handler.delay.assert_called_once_with( + "internal:mbs.module.state.change", + module_build.id, + module_build.state + ) db_session.refresh(module_build) # ensure the time_modified was changed. assert module_build.time_modified > original @pytest.mark.parametrize("state", ["init", "wait"]) + @patch.dict(producer.ON_MODULE_CHANGE_HANDLERS, clear=True, values={ + models.BUILD_STATES["init"]: mock.Mock(), + models.BUILD_STATES["wait"]: mock.Mock(), + }) def test_process_waiting_module_build_not_old_enough( - self, create_builder, global_consumer, dbg, state + self, create_builder, dbg, state ): """ Test that we do not process young waiting builds. """ - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer + handler = producer.ON_MODULE_CHANGE_HANDLERS[models.BUILD_STATES[state]] - hub = mock.MagicMock() - poller = MBSProducer(hub) - - # Change the batch to 2, so the module build is in state where + # Change the batch to build, so the module build is in state where # it is not building anything, but the state is "build". module_build = models.ModuleBuild.get_by_id(db_session, 3) module_build.state = models.BUILD_STATES[state] @@ -448,37 +374,25 @@ class TestPoller: db_session.commit() db_session.refresh(module_build) - # Ensure the queue is empty before we start. - assert consumer.incoming.qsize() == 0 - # Poll :) - poller.process_waiting_module_builds() + producer.process_waiting_module_builds() - # Ensure we did *not* process the 9 minute-old build. - assert consumer.incoming.qsize() == 0 + handler.assert_not_called() - def test_process_waiting_module_build_none_found( - self, create_builder, global_consumer, dbg - ): + @patch.dict(producer.ON_MODULE_CHANGE_HANDLERS, clear=True, values={ + models.BUILD_STATES["init"]: mock.Mock(), + models.BUILD_STATES["wait"]: mock.Mock(), + }) + def test_process_waiting_module_build_none_found(self, create_builder, dbg): """ Test nothing happens when no module builds are waiting. """ - - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - - hub = mock.MagicMock() - poller = MBSProducer(hub) - - # Ensure the queue is empty before we start. - assert consumer.incoming.qsize() == 0 - # Poll :) - poller.process_waiting_module_builds() + producer.process_waiting_module_builds() # Ensure we did *not* process any of the non-waiting builds. - assert consumer.incoming.qsize() == 0 + for handler in producer.ON_MODULE_CHANGE_HANDLERS.values(): + handler.assert_not_called() - def test_cleanup_stale_failed_builds(self, create_builder, global_consumer, dbg): + def test_cleanup_stale_failed_builds(self, create_builder, dbg): """ Test that one of the two module builds gets to the garbage state when running cleanup_stale_failed_builds. """ @@ -502,15 +416,8 @@ class TestPoller: db_session.commit() - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - hub = mock.MagicMock() - poller = MBSProducer(hub) + producer.cleanup_stale_failed_builds() - # Ensure the queue is empty before we start - assert consumer.incoming.qsize() == 0 - poller.cleanup_stale_failed_builds(conf) db_session.refresh(module_build_two) # Make sure module_build_one was transitioned to garbage assert module_build_one.state == models.BUILD_STATES["garbage"] @@ -533,9 +440,7 @@ class TestPoller: "module-build-macros-0.1-1.module+0+d027b723", ]) - def test_cleanup_stale_failed_builds_no_components( - self, create_builder, global_consumer, dbg - ): + def test_cleanup_stale_failed_builds_no_components(self, create_builder, dbg): """ Test that a module build without any components built gets to the garbage state when running cleanup_stale_failed_builds. """ @@ -555,15 +460,8 @@ class TestPoller: db_session.commit() - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - hub = mock.MagicMock() - poller = MBSProducer(hub) + producer.cleanup_stale_failed_builds() - # Ensure the queue is empty before we start - assert consumer.incoming.qsize() == 0 - poller.cleanup_stale_failed_builds(conf) db_session.refresh(module_build_two) # Make sure module_build_two was transitioned to garbage assert module_build_two.state == models.BUILD_STATES["garbage"] @@ -580,9 +478,7 @@ class TestPoller: @pytest.mark.parametrize( "test_state", [models.BUILD_STATES[state] for state in conf.cleanup_stuck_builds_states] ) - def test_cancel_stuck_module_builds( - self, create_builder, global_consumer, dbg, test_state - ): + def test_cancel_stuck_module_builds(self, create_builder, dbg, test_state): module_build1 = models.ModuleBuild.get_by_id(db_session, 1) module_build1.state = test_state @@ -601,17 +497,9 @@ class TestPoller: db_session.commit() - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - hub = mock.MagicMock() - poller = MBSProducer(hub) + producer.cancel_stuck_module_builds() - assert consumer.incoming.qsize() == 0 - - poller.cancel_stuck_module_builds(conf) - - module = db_session.query(models.ModuleBuild).filter_by(state=4).all() + module = models.ModuleBuild.by_state(db_session, "failed") assert len(module) == 1 assert module[0].id == 2 @@ -619,8 +507,10 @@ class TestPoller: @pytest.mark.parametrize("tagged_in_final", (True, False)) @pytest.mark.parametrize("btime", (True, False)) @patch("koji.ClientSession") + @patch("module_build_service.scheduler.producer.tagged") def test_sync_koji_build_tags( - self, ClientSession, create_builder, global_consumer, dbg, tagged, tagged_in_final, btime + self, tagged_handler, ClientSession, create_builder, dbg, + tagged, tagged_in_final, btime ): module_build_2 = models.ModuleBuild.get_by_id(db_session, 2) # Only module build 1's build target should be deleted. @@ -639,48 +529,35 @@ class TestPoller: koji_session = ClientSession.return_value # No created module build has any of these tags. - ret = [] + + listtags_return_value = [] + expected_tagged_calls = [] if btime: if tagged: - ret.append({"id": 1, "name": module_build_2.koji_tag + "-build"}) + listtags_return_value.append( + {"id": 1, "name": module_build_2.koji_tag + "-build"}) + expected_tagged_calls.append(call( + "internal:sync_koji_build_tags", + module_build_2.koji_tag + "-build", c.package, c.nvr + )) if tagged_in_final: - ret.append({"id": 2, "name": module_build_2.koji_tag}) - koji_session.listTags.return_value = ret + listtags_return_value.append( + {"id": 2, "name": module_build_2.koji_tag}) + expected_tagged_calls.append(call( + "internal:sync_koji_build_tags", + module_build_2.koji_tag, c.package, c.nvr + )) + koji_session.listTags.return_value = listtags_return_value - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - hub = mock.MagicMock() - poller = MBSProducer(hub) + producer.sync_koji_build_tags() - assert consumer.incoming.qsize() == 0 - - poller.sync_koji_build_tags(conf) - - assert consumer.incoming.qsize() == len(ret) - - expected_msg_tags = [] - if btime: - if tagged: - expected_msg_tags.append(module_build_2.koji_tag + "-build") - if tagged_in_final: - expected_msg_tags.append(module_build_2.koji_tag) - - assert len(expected_msg_tags) == consumer.incoming.qsize() - - for i in range(consumer.incoming.qsize()): - msg = consumer.incoming.get() - assert events.KOJI_TAG_CHANGE == msg["event"] - assert c.package == msg["build_name"] - assert c.nvr == msg["build_nvr"] - assert msg["tag_name"] in expected_msg_tags + tagged_handler.delay.assert_has_calls( + expected_tagged_calls, any_order=True) @pytest.mark.parametrize("greenwave_result", [True, False]) @patch("module_build_service.utils.greenwave.Greenwave.check_gating") - def test_poll_greenwave( - self, mock_gw, create_builder, global_consumer, dbg, greenwave_result - ): + def test_poll_greenwave(self, mock_gw, create_builder, dbg, greenwave_result): module_build1 = models.ModuleBuild.get_by_id(db_session, 1) module_build1.state = models.BUILD_STATES["ready"] @@ -697,17 +574,9 @@ class TestPoller: db_session.commit() - consumer = mock.MagicMock() - consumer.incoming = queue.Queue() - global_consumer.return_value = consumer - hub = mock.MagicMock() - poller = MBSProducer(hub) - - assert consumer.incoming.qsize() == 0 - mock_gw.return_value = greenwave_result - poller.poll_greenwave(conf) + producer.poll_greenwave() mock_gw.assert_called_once() modules = models.ModuleBuild.by_state(db_session, "ready") diff --git a/tests/test_views/test_views.py b/tests/test_views/test_views.py index 1febc169..921073d4 100644 --- a/tests/test_views/test_views.py +++ b/tests/test_views/test_views.py @@ -26,7 +26,6 @@ from module_build_service.errors import UnprocessableEntity from module_build_service.models import ModuleBuild, BUILD_STATES, ComponentBuild from module_build_service import version import module_build_service.config as mbs_config -import module_build_service.scheduler.handlers.modules import module_build_service.utils.submit from module_build_service.utils.general import ( import_mmd, mmd_to_str, load_mmd,