Convert the Poller to be Celery periodic tasks

Poller methods within original class MBSProducer become module level
functions and are registered as Celery periodic tasks.

Code logging the size of fedmsg-hub queue are removed from log_summary.

process_open_component_builds is still kept there and not converted to a
periodic task.

There are some small refactor:

* do not format string in logging method call.
* reformat some lines of code doing SQLAlchemy database query to make
  them more readable.

Signed-off-by: Chenxiong Qi <cqi@redhat.com>
This commit is contained in:
Chenxiong Qi
2019-11-29 16:02:35 +08:00
committed by mprahl
parent e673f690a9
commit 0607f2079c
5 changed files with 549 additions and 711 deletions

View File

@@ -20,11 +20,6 @@ import moksha.hub
import sqlalchemy.exc
import module_build_service.messaging
import module_build_service.scheduler.handlers.repos
import module_build_service.scheduler.handlers.components
import module_build_service.scheduler.handlers.modules
import module_build_service.scheduler.handlers.tags
import module_build_service.scheduler.handlers.greenwave
import module_build_service.monitor as monitor
from module_build_service import models, log, conf
@@ -32,9 +27,42 @@ from module_build_service.db_session import db_session
from module_build_service.errors import IgnoreMessage
from module_build_service.messaging import default_messaging_backend
from module_build_service.scheduler import events
from module_build_service.scheduler.handlers import components
from module_build_service.scheduler.handlers import repos
from module_build_service.scheduler.handlers import modules
from module_build_service.scheduler.handlers import tags
from module_build_service.scheduler.handlers import greenwave
def no_op_handler(*args, **kwargs):
return True
ON_BUILD_CHANGE_HANDLERS = {
koji.BUILD_STATES["BUILDING"]: no_op_handler,
koji.BUILD_STATES["COMPLETE"]: components.build_task_finalize,
koji.BUILD_STATES["FAILED"]: components.build_task_finalize,
koji.BUILD_STATES["CANCELED"]: components.build_task_finalize,
koji.BUILD_STATES["DELETED"]: no_op_handler,
}
ON_MODULE_CHANGE_HANDLERS = {
models.BUILD_STATES["init"]: modules.init,
models.BUILD_STATES["wait"]: modules.wait,
models.BUILD_STATES["build"]: no_op_handler,
models.BUILD_STATES["failed"]: modules.failed,
models.BUILD_STATES["done"]: modules.done,
# XXX: DIRECT TRANSITION TO READY
models.BUILD_STATES["ready"]: no_op_handler,
models.BUILD_STATES["garbage"]: no_op_handler,
}
# Only one kind of repo change event, though...
ON_REPO_CHANGE_HANDLER = repos.done
ON_TAG_CHANGE_HANDLER = tags.tagged
ON_DECISION_UPDATE_HANDLER = greenwave.decision_update
class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
""" This is triggered by running fedmsg-hub. This class is responsible for
ingesting and processing messages from the message bus.
@@ -83,32 +111,6 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
msg = module_build_service.messaging._initial_messages.pop(0)
self.incoming.put(msg)
from module_build_service.scheduler import handlers
# These are our main lookup tables for figuring out what to run in
# response to what messaging events.
self.NO_OP = NO_OP = lambda *args, **kwargs: True
self.on_build_change = {
koji.BUILD_STATES["BUILDING"]: NO_OP,
koji.BUILD_STATES["COMPLETE"]: handlers.components.build_task_finalize,
koji.BUILD_STATES["FAILED"]: handlers.components.build_task_finalize,
koji.BUILD_STATES["CANCELED"]: handlers.components.build_task_finalize,
koji.BUILD_STATES["DELETED"]: NO_OP,
}
self.on_module_change = {
models.BUILD_STATES["init"]: handlers.modules.init,
models.BUILD_STATES["wait"]: handlers.modules.wait,
models.BUILD_STATES["build"]: NO_OP,
models.BUILD_STATES["failed"]: handlers.modules.failed,
models.BUILD_STATES["done"]: handlers.modules.done,
# XXX: DIRECT TRANSITION TO READY
models.BUILD_STATES["ready"]: NO_OP,
models.BUILD_STATES["garbage"]: NO_OP,
}
# Only one kind of repo change event, though...
self.on_repo_change = handlers.repos.done
self.on_tag_change = handlers.tags.tagged
self.on_decision_update = handlers.greenwave.decision_update
self.sanity_check()
def shutdown(self):
@@ -180,10 +182,10 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
""" On startup, make sure our implementation is sane. """
# Ensure we have every state covered
for state in models.BUILD_STATES:
if models.BUILD_STATES[state] not in self.on_module_change:
if models.BUILD_STATES[state] not in ON_MODULE_CHANGE_HANDLERS:
raise KeyError("Module build states %r not handled." % state)
for state in koji.BUILD_STATES:
if koji.BUILD_STATES[state] not in self.on_build_change:
if koji.BUILD_STATES[state] not in ON_BUILD_CHANGE_HANDLERS:
raise KeyError("Koji build states %r not handled." % state)
def _map_message(self, db_session, event_info):
@@ -192,7 +194,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
event = event_info["event"]
if event == events.KOJI_BUILD_CHANGE:
handler = self.on_build_change[event_info["build_new_state"]]
handler = ON_BUILD_CHANGE_HANDLERS[event_info["build_new_state"]]
build = models.ComponentBuild.from_component_event(
db_session, event_info["task_id"], event_info["module_build_id"])
if build:
@@ -201,13 +203,13 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
if event == events.KOJI_REPO_CHANGE:
return (
self.on_repo_change,
ON_REPO_CHANGE_HANDLER,
models.ModuleBuild.get_by_tag(db_session, event_info["repo_tag"])
)
if event == events.KOJI_TAG_CHANGE:
return (
self.on_tag_change,
ON_TAG_CHANGE_HANDLER,
models.ModuleBuild.get_by_tag(db_session, event_info["tag_name"])
)
@@ -219,14 +221,13 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
state, type(state), valid_module_build_states
))
return (
self.on_module_change[state],
models.ModuleBuild.get_by_id(
db_session, event_info["module_build_id"])
ON_MODULE_CHANGE_HANDLERS[state],
models.ModuleBuild.get_by_id(db_session, event_info["module_build_id"])
)
if event == events.GREENWAVE_DECISION_UPDATE:
return (
self.on_decision_update,
ON_DECISION_UPDATE_HANDLER,
greenwave.get_corresponding_module_build(event_info["subject_identifier"])
)
@@ -243,7 +244,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
idx = "%s: %s, %s" % (
handler.__name__, event_info["event"], event_info["msg_id"])
if handler is self.NO_OP:
if handler is no_op_handler:
log.debug("Handler is NO_OP: %s", idx)
return

View File

@@ -1,501 +1,470 @@
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: MIT
""" The PollingProducer class that acts as a producer entry point for
fedmsg-hub. This class polls the database for tasks to do.
"""
import koji
import operator
from datetime import timedelta, datetime
from sqlalchemy.orm import lazyload
from moksha.hub.api.producer import PollingProducer
from sqlalchemy.orm import lazyload, load_only
import module_build_service.messaging
import module_build_service.scheduler
import module_build_service.scheduler.consumer
from module_build_service import conf, models, log
from module_build_service import celery_app, conf, models, log
from module_build_service.builder import GenericBuilder
from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder
from module_build_service.utils.greenwave import greenwave
from module_build_service.db_session import db_session
from module_build_service.scheduler import events
from module_build_service.scheduler.consumer import ON_MODULE_CHANGE_HANDLERS
from module_build_service.scheduler.handlers.components import build_task_finalize
from module_build_service.scheduler.handlers.tags import tagged
class MBSProducer(PollingProducer):
frequency = timedelta(seconds=conf.polling_interval)
@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
tasks = (
(log_summary, "Log summary of module builds and component builds"),
(process_waiting_module_builds, "Process waiting module builds"),
(fail_lost_builds, "Fail lost builds"),
(process_paused_module_builds, "Process paused module builds"),
(delete_old_koji_targets, "Delete old koji targets"),
(cleanup_stale_failed_builds, "Cleanup stale failed builds"),
(cancel_stuck_module_builds, "Cancel stuck module builds"),
(sync_koji_build_tags, "Sync Koji build tags"),
(poll_greenwave, "Gating module build to ready state"),
)
@events.mbs_event_handler()
def poll(self):
try:
self.log_summary()
self.process_waiting_module_builds()
self.process_open_component_builds()
self.fail_lost_builds()
self.process_paused_module_builds(conf)
self.retrigger_new_repo_on_failure(conf)
self.delete_old_koji_targets(conf)
self.cleanup_stale_failed_builds(conf)
self.sync_koji_build_tags(conf)
self.poll_greenwave(conf)
except Exception:
msg = "Error in poller execution:"
log.exception(msg)
for task, name in tasks:
sender.add_periodic_task(conf.polling_interval, task.s(), name=name)
# Poller runs in its own thread. Database session can be removed safely.
db_session.remove()
log.info('Poller will now sleep for "{}" seconds'.format(conf.polling_interval))
@celery_app.task
def log_summary():
states = sorted(models.BUILD_STATES.items(), key=operator.itemgetter(1))
for name, code in states:
query = db_session.query(models.ModuleBuild).filter_by(state=code)
count = query.count()
if count:
log.info(" * %s module builds in the %s state", count, name)
if name == "build":
for module_build in query.all():
log.info(" * %r", module_build)
# First batch is number '1'.
for i in range(1, module_build.batch + 1):
n = len([c for c in module_build.component_builds if c.batch == i])
log.info(" * %s components in batch %s", n, i)
def fail_lost_builds(self):
# This function is supposed to be handling only the part which can't be
# updated through messaging (e.g. srpm-build failures). Please keep it
# fit `n` slim. We do want rest to be processed elsewhere
# TODO re-use
if conf.system == "koji":
# We don't do this on behalf of users
koji_session = KojiModuleBuilder.get_session(conf, login=False)
log.info("Querying tasks for statuses:")
res = (
db_session.query(models.ComponentBuild)
.filter_by(state=koji.BUILD_STATES["BUILDING"])
.options(lazyload("module_build"))
.all()
)
@celery_app.task
def process_waiting_module_builds():
for state in ["init", "wait"]:
nudge_module_builds_in_state(state, 10)
log.info("Checking status for {0} tasks".format(len(res)))
for component_build in res:
log.debug(component_build.json(db_session))
# Don't check tasks which haven't been triggered yet
if not component_build.task_id:
continue
# Don't check tasks for components which have been reused,
# they may have BUILDING state temporarily before we tag them
# to new module tag. Checking them would be waste of resources.
if component_build.reused_component_id:
log.debug(
'Skipping check for task "{0}", '
'the component has been reused ("{1}").'.format(
component_build.task_id, component_build.reused_component_id)
)
continue
def nudge_module_builds_in_state(state_name, older_than_minutes):
"""
Finds all the module builds in the `state` with `time_modified` older
than `older_than_minutes` and adds fake MBSModule message to the
work queue.
"""
log.info("Looking for module builds stuck in the %s state", state_name)
builds = models.ModuleBuild.by_state(db_session, state_name)
log.info(" %r module builds in the %s state...", len(builds), state_name)
now = datetime.utcnow()
time_modified_threshold = timedelta(minutes=older_than_minutes)
for build in builds:
task_id = component_build.task_id
log.info('Checking status of task_id "{0}"'.format(task_id))
task_info = koji_session.getTaskInfo(task_id)
state_mapping = {
# Cancelled and failed builds should be marked as failed.
koji.TASK_STATES["CANCELED"]: koji.BUILD_STATES["FAILED"],
koji.TASK_STATES["FAILED"]: koji.BUILD_STATES["FAILED"],
# Completed tasks should be marked as complete.
koji.TASK_STATES["CLOSED"]: koji.BUILD_STATES["COMPLETE"],
}
# If it is a closed/completed task, then we can extract the NVR
build_version, build_release = None, None # defaults
if task_info["state"] == koji.TASK_STATES["CLOSED"]:
builds = koji_session.listBuilds(taskID=task_id)
if not builds:
log.warning(
"Task ID %r is closed, but we found no builds in koji." % task_id)
elif len(builds) > 1:
log.warning(
"Task ID %r is closed, but more than one build is present!" % task_id)
else:
build_version = builds[0]["version"]
build_release = builds[0]["release"]
log.info(" task {0!r} is in state {1!r}".format(task_id, task_info["state"]))
if task_info["state"] in state_mapping:
# Fake a fedmsg message on our internal queue
msg = {
"msg_id": "producer::fail_lost_builds fake msg",
"event": events.KOJI_BUILD_CHANGE,
"build_id": component_build.task_id,
"task_id": component_build.task_id,
"build_new_state": state_mapping[task_info["state"]],
"build_name": component_build.package,
"build_release": build_release,
"build_version": build_version,
"module_build_id": None,
"state_reason": None
}
module_build_service.scheduler.consumer.work_queue_put(msg)
elif conf.system == "mock":
pass
def cleanup_stale_failed_builds(self, conf):
""" Does various clean up tasks on stale failed module builds
:param conf: the MBS configuration object
:param db_session: a SQLAlchemy database session
"""
if conf.system == "koji":
stale_date = datetime.utcnow() - timedelta(days=conf.cleanup_failed_builds_time)
stale_module_builds = (
db_session.query(models.ModuleBuild)
.filter(
models.ModuleBuild.state == models.BUILD_STATES["failed"],
models.ModuleBuild.time_modified <= stale_date,
)
.all()
)
if stale_module_builds:
log.info(
"{0} stale failed module build(s) will be cleaned up".format(
len(stale_module_builds))
)
for module in stale_module_builds:
log.info("{0!r} is stale and is being cleaned up".format(module))
# Find completed artifacts in the stale build
artifacts = [c for c in module.component_builds if c.is_completed]
# If there are no completed artifacts, then there is nothing to tag
if artifacts:
# Set buildroot_connect=False so it doesn't recreate the Koji target and etc.
builder = GenericBuilder.create_from_module(
db_session, module, conf, buildroot_connect=False
)
builder.untag_artifacts([c.nvr for c in artifacts])
# Mark the artifacts as untagged in the database
for c in artifacts:
c.tagged = False
c.tagged_in_final = False
db_session.add(c)
state_reason = (
"The module was garbage collected since it has failed over {0}"
" day(s) ago".format(conf.cleanup_failed_builds_time)
)
module.transition(
db_session,
conf,
models.BUILD_STATES["garbage"],
state_reason=state_reason,
failure_type="user",
)
db_session.add(module)
db_session.commit()
def log_summary(self):
log.info("Current status:")
consumer = module_build_service.scheduler.consumer.get_global_consumer()
backlog = consumer.incoming.qsize()
log.info(" * internal queue backlog is {0}".format(backlog))
states = sorted(models.BUILD_STATES.items(), key=operator.itemgetter(1))
for name, code in states:
query = db_session.query(models.ModuleBuild).filter_by(state=code)
count = query.count()
if count:
log.info(" * {0} module builds in the {1} state".format(count, name))
if name == "build":
for module_build in query.all():
log.info(" * {0!r}".format(module_build))
# First batch is number '1'.
for i in range(1, module_build.batch + 1):
n = len([c for c in module_build.component_builds if c.batch == i])
log.info(" * {0} components in batch {1}".format(n, i))
def _nudge_module_builds_in_state(self, state_name, older_than_minutes):
"""
Finds all the module builds in the `state` with `time_modified` older
than `older_than_minutes` and adds fake MBSModule message to the
work queue.
"""
log.info("Looking for module builds stuck in the %s state", state_name)
builds = models.ModuleBuild.by_state(db_session, state_name)
log.info(" %r module builds in the %s state...", len(builds), state_name)
now = datetime.utcnow()
time_modified_threshold = timedelta(minutes=older_than_minutes)
for build in builds:
# Only give builds a nudge if stuck for more than ten minutes
if (now - build.time_modified) < time_modified_threshold:
continue
# Pretend the build is modified, so we don't tight spin.
build.time_modified = now
# Fake a message to kickstart the build anew in the consumer
state = module_build_service.models.BUILD_STATES[state_name]
msg = {
"msg_id": "nudge_module_builds_fake_message",
"event": events.MBS_MODULE_STATE_CHANGE,
"module_build_id": build.id,
"module_build_state": state,
}
log.info(" Scheduling faked event %r", msg)
module_build_service.scheduler.consumer.work_queue_put(msg)
# Only give builds a nudge if stuck for more than ten minutes
if (now - build.time_modified) < time_modified_threshold:
continue
# Pretend the build is modified, so we don't tight spin.
build.time_modified = now
db_session.commit()
def process_waiting_module_builds(self):
for state in ["init", "wait"]:
self._nudge_module_builds_in_state(state, 10)
# Fake a message to kickstart the build anew in the consumer
state = module_build_service.models.BUILD_STATES[state_name]
handler = ON_MODULE_CHANGE_HANDLERS[state]
handler.delay("internal:mbs.module.state.change", build.id, state)
def process_open_component_builds(self):
log.warning("process_open_component_builds is not yet implemented...")
def process_paused_module_builds(self, config):
log.info("Looking for paused module builds in the build state")
if module_build_service.utils.at_concurrent_component_threshold(config):
log.debug(
"Will not attempt to start paused module builds due to "
"the concurrent build threshold being met"
)
return
def process_open_component_builds():
log.warning("process_open_component_builds is not yet implemented...")
ten_minutes = timedelta(minutes=10)
# Check for module builds that are in the build state but don't have any active component
# builds. Exclude module builds in batch 0. This is likely a build of a module without
# components.
module_builds = (
db_session.query(models.ModuleBuild)
.filter(
models.ModuleBuild.state == models.BUILD_STATES["build"],
models.ModuleBuild.batch > 0,
)
.all()
)
for module_build in module_builds:
now = datetime.utcnow()
# Only give builds a nudge if stuck for more than ten minutes
if (now - module_build.time_modified) < ten_minutes:
continue
# If there are no components in the build state on the module build,
# then no possible event will start off new component builds.
# But do not try to start new builds when we are waiting for the
# repo-regen.
if not module_build.current_batch(koji.BUILD_STATES["BUILDING"]):
# Initialize the builder...
builder = GenericBuilder.create_from_module(db_session, module_build, config)
if _has_missed_new_repo_message(module_build, builder.koji_session):
log.info(" Processing the paused module build %r", module_build)
module_build_service.utils.start_next_batch_build(
config, module_build, builder)
# Check if we have met the threshold.
if module_build_service.utils.at_concurrent_component_threshold(config):
break
def retrigger_new_repo_on_failure(self, config):
"""
Retrigger failed new repo tasks for module builds in the build state.
The newRepo task may fail for various reasons outside the scope of MBS.
This method will detect this scenario and retrigger the newRepo task
if needed to avoid the module build from being stuck in the "build" state.
"""
if config.system != "koji":
return
koji_session = module_build_service.builder.KojiModuleBuilder.KojiModuleBuilder.get_session(
config)
for module_build in (
db_session.query(models.ModuleBuild).filter_by(state=models.BUILD_STATES["build"]).all()
):
if not module_build.new_repo_task_id:
continue
task_info = koji_session.getTaskInfo(module_build.new_repo_task_id)
if task_info["state"] in [koji.TASK_STATES["CANCELED"], koji.TASK_STATES["FAILED"]]:
log.info(
"newRepo task %s for %r failed, starting another one",
str(module_build.new_repo_task_id), module_build,
)
taginfo = koji_session.getTag(module_build.koji_tag + "-build")
module_build.new_repo_task_id = koji_session.newRepo(taginfo["name"])
db_session.commit()
def delete_old_koji_targets(self, config):
"""
Deletes targets older than `config.koji_target_delete_time` seconds
from Koji to cleanup after the module builds.
"""
if config.system != "koji":
return
log.info("Looking for module builds which Koji target can be removed")
now = datetime.utcnow()
koji_session = KojiModuleBuilder.get_session(config)
for target in koji_session.getBuildTargets():
koji_tag = target["dest_tag_name"]
module = db_session.query(models.ModuleBuild).filter_by(koji_tag=koji_tag).first()
if (
not module
or module.name in conf.base_module_names
or module.state in [
models.BUILD_STATES["init"],
models.BUILD_STATES["wait"],
models.BUILD_STATES["build"],
]
):
continue
# Double-check that the target we are going to remove is prefixed
# by our prefix, so we won't remove f26 when there is some garbage
# in DB or Koji.
for allowed_prefix in config.koji_tag_prefixes:
if target["name"].startswith(allowed_prefix + "-"):
break
else:
log.error("Module %r has Koji target with not allowed prefix.", module)
continue
delta = now - module.time_completed
if delta.total_seconds() > config.koji_target_delete_time:
log.info("Removing target of module %r", module)
koji_session.deleteBuildTarget(target["id"])
def cancel_stuck_module_builds(self, config):
"""
Method transitions builds which are stuck in one state too long to the "failed" state.
The states are defined with the "cleanup_stuck_builds_states" config option and the
time is defined by the "cleanup_stuck_builds_time" config option.
"""
log.info(
'Looking for module builds stuck in the states "{states}" more than {days} days'
.format(
states=" and ".join(config.cleanup_stuck_builds_states),
days=config.cleanup_stuck_builds_time,
)
)
delta = timedelta(days=config.cleanup_stuck_builds_time)
now = datetime.utcnow()
threshold = now - delta
states = [
module_build_service.models.BUILD_STATES[state]
for state in config.cleanup_stuck_builds_states
]
module_builds = (
db_session.query(models.ModuleBuild)
.filter(
models.ModuleBuild.state.in_(states), models.ModuleBuild.time_modified < threshold
)
.all()
)
log.info(" {0!r} module builds are stuck...".format(len(module_builds)))
for build in module_builds:
nsvc = ":".join([build.name, build.stream, build.version, build.context])
log.info('Transitioning build "{nsvc}" to "Failed" state.'.format(nsvc=nsvc))
state_reason = "The module was in {state} for more than {days} days".format(
state=build.state, days=config.cleanup_stuck_builds_time
)
build.transition(
db_session,
config,
state=models.BUILD_STATES["failed"],
state_reason=state_reason,
failure_type="user",
)
db_session.commit()
def sync_koji_build_tags(self, config):
"""
Method checking the "tagged" and "tagged_in_final" attributes of
"complete" ComponentBuilds in the current batch of module builds
in "building" state against the Koji.
In case the Koji shows the build as tagged/tagged_in_final,
fake "tagged" message is added to work queue.
"""
if conf.system != "koji":
return
@celery_app.task
def fail_lost_builds():
# This function is supposed to be handling only the part which can't be
# updated through messaging (e.g. srpm-build failures). Please keep it
# fit `n` slim. We do want rest to be processed elsewhere
# TODO re-use
if conf.system == "koji":
# We don't do this on behalf of users
koji_session = KojiModuleBuilder.get_session(conf, login=False)
log.info("Querying tasks for statuses:")
res = db_session.query(models.ComponentBuild).filter_by(
state=koji.BUILD_STATES["BUILDING"]
).options(lazyload("module_build")).all()
threshold = datetime.utcnow() - timedelta(minutes=10)
module_builds = db_session.query(models.ModuleBuild).filter(
models.ModuleBuild.time_modified < threshold,
models.ModuleBuild.state == models.BUILD_STATES["build"]
).all()
for module_build in module_builds:
complete_components = module_build.current_batch(koji.BUILD_STATES["COMPLETE"])
for c in complete_components:
# In case the component is tagged in the build tag and
# also tagged in the final tag (or it is build_time_only
# and therefore should not be tagged in final tag), skip it.
if c.tagged and (c.tagged_in_final or c.build_time_only):
continue
log.info("Checking status for %s tasks", len(res))
for component_build in res:
log.debug(component_build.json(db_session))
# Don't check tasks which haven't been triggered yet
if not component_build.task_id:
continue
log.info(
"%r: Component %r is complete, but not tagged in the "
"final and/or build tags.",
module_build, c,
# Don't check tasks for components which have been reused,
# they may have BUILDING state temporarily before we tag them
# to new module tag. Checking them would be waste of resources.
if component_build.reused_component_id:
log.debug(
'Skipping check for task "%s", the component has been reused ("%s").',
component_build.task_id, component_build.reused_component_id
)
continue
task_id = component_build.task_id
log.info('Checking status of task_id "%s"', task_id)
task_info = koji_session.getTaskInfo(task_id)
state_mapping = {
# Cancelled and failed builds should be marked as failed.
koji.TASK_STATES["CANCELED"]: koji.BUILD_STATES["FAILED"],
koji.TASK_STATES["FAILED"]: koji.BUILD_STATES["FAILED"],
# Completed tasks should be marked as complete.
koji.TASK_STATES["CLOSED"]: koji.BUILD_STATES["COMPLETE"],
}
# If it is a closed/completed task, then we can extract the NVR
build_version, build_release = None, None # defaults
if task_info["state"] == koji.TASK_STATES["CLOSED"]:
builds = koji_session.listBuilds(taskID=task_id)
if not builds:
log.warning(
"Task ID %r is closed, but we found no builds in koji.", task_id)
elif len(builds) > 1:
log.warning(
"Task ID %r is closed, but more than one build is present!", task_id)
else:
build_version = builds[0]["version"]
build_release = builds[0]["release"]
log.info(" task %r is in state %r", task_id, task_info["state"])
if task_info["state"] in state_mapping:
build_task_finalize.delay(
msg_id="producer::fail_lost_builds fake msg",
build_id=component_build.task_id,
task_id=component_build.task_id,
build_new_state=state_mapping[task_info["state"]],
build_name=component_build.package,
build_release=build_release,
build_version=build_version,
)
# Check in which tags the component is tagged.
tag_dicts = koji_session.listTags(c.nvr)
tags = [tag_dict["name"] for tag_dict in tag_dicts]
elif conf.system == "mock":
pass
# If it is tagged in final tag, but MBS does not think so,
# schedule fake message.
if not c.tagged_in_final and module_build.koji_tag in tags:
msg = {
"msg_id": "sync_koji_build_tags_fake_message",
"event": events.KOJI_TAG_CHANGE,
"tag_name": module_build.koji_tag,
"build_name": c.package,
"build_nvr": c.nvr,
}
log.info(" Scheduling faked event %r", msg)
module_build_service.scheduler.consumer.work_queue_put(msg)
# If it is tagged in the build tag, but MBS does not think so,
# schedule fake message.
build_tag = module_build.koji_tag + "-build"
if not c.tagged and build_tag in tags:
msg = {
"msg_id": "sync_koji_build_tags_fake_message",
"event": events.KOJI_TAG_CHANGE,
"tag_name": build_tag,
"build_name": c.package,
"build_nvr": c.nvr,
}
log.info(" Scheduling faked event %r", msg)
module_build_service.scheduler.consumer.work_queue_put(msg)
def poll_greenwave(self, config):
"""
Polls Greenwave for all builds in done state
:param db_session: SQLAlchemy DB session
:return: None
"""
if greenwave is None:
return
module_builds = (
db_session.query(models.ModuleBuild)
.filter_by(state=models.BUILD_STATES["done"], scratch=False).all()
@celery_app.task
def process_paused_module_builds():
log.info("Looking for paused module builds in the build state")
if module_build_service.utils.at_concurrent_component_threshold(conf):
log.debug(
"Will not attempt to start paused module builds due to "
"the concurrent build threshold being met"
)
return
log.info("Checking Greenwave for %d builds", len(module_builds))
ten_minutes = timedelta(minutes=10)
# Check for module builds that are in the build state but don't have any active component
# builds. Exclude module builds in batch 0. This is likely a build of a module without
# components.
module_builds = db_session.query(models.ModuleBuild).filter(
models.ModuleBuild.state == models.BUILD_STATES["build"],
models.ModuleBuild.batch > 0,
).all()
for module_build in module_builds:
now = datetime.utcnow()
# Only give builds a nudge if stuck for more than ten minutes
if (now - module_build.time_modified) < ten_minutes:
continue
# If there are no components in the build state on the module build,
# then no possible event will start off new component builds.
# But do not try to start new builds when we are waiting for the
# repo-regen.
if not module_build.current_batch(koji.BUILD_STATES["BUILDING"]):
# Initialize the builder...
builder = GenericBuilder.create_from_module(
db_session, module_build, conf)
for build in module_builds:
if greenwave.check_gating(build):
build.transition(db_session, config, state=models.BUILD_STATES["ready"])
else:
build.state_reason = "Gating failed (MBS will retry in {0} seconds)".format(
conf.polling_interval
)
if greenwave.error_occurred:
build.state_reason += " (Error occured while querying Greenwave)"
build.time_modified = datetime.utcnow()
db_session.commit()
if has_missed_new_repo_message(module_build, builder.koji_session):
log.info(" Processing the paused module build %r", module_build)
module_build_service.utils.start_next_batch_build(
conf, module_build, builder)
# Check if we have met the threshold.
if module_build_service.utils.at_concurrent_component_threshold(conf):
break
def _has_missed_new_repo_message(module_build, koji_session):
@celery_app.task
def retrigger_new_repo_on_failure():
"""
Retrigger failed new repo tasks for module builds in the build state.
The newRepo task may fail for various reasons outside the scope of MBS.
This method will detect this scenario and retrigger the newRepo task
if needed to avoid the module build from being stuck in the "build" state.
"""
if conf.system != "koji":
return
koji_session = KojiModuleBuilder.get_session(conf)
module_builds = db_session.query(models.ModuleBuild).filter(
models.ModuleBuild.state == models.BUILD_STATES["build"],
models.ModuleBuild.new_repo_task_id.isnot(None),
).all()
for module_build in module_builds:
task_info = koji_session.getTaskInfo(module_build.new_repo_task_id)
if task_info["state"] in [koji.TASK_STATES["CANCELED"], koji.TASK_STATES["FAILED"]]:
log.info(
"newRepo task %s for %r failed, starting another one",
str(module_build.new_repo_task_id), module_build,
)
taginfo = koji_session.getTag(module_build.koji_tag + "-build")
module_build.new_repo_task_id = koji_session.newRepo(taginfo["name"])
db_session.commit()
@celery_app.task
def delete_old_koji_targets():
"""
Deletes targets older than `config.koji_target_delete_time` seconds
from Koji to cleanup after the module builds.
"""
if conf.system != "koji":
return
log.info("Looking for module builds which Koji target can be removed")
now = datetime.utcnow()
koji_session = KojiModuleBuilder.get_session(conf)
for target in koji_session.getBuildTargets():
module = db_session.query(models.ModuleBuild).filter(
models.ModuleBuild.koji_tag == target["dest_tag_name"],
models.ModuleBuild.name.notin_(conf.base_module_names),
models.ModuleBuild.state.notin_([
models.BUILD_STATES["init"],
models.BUILD_STATES["wait"],
models.BUILD_STATES["build"],
]),
).options(
load_only("time_completed"),
).first()
if module is None:
continue
# Double-check that the target we are going to remove is prefixed
# by our prefix, so we won't remove f26 when there is some garbage
# in DB or Koji.
for allowed_prefix in conf.koji_tag_prefixes:
if target["name"].startswith(allowed_prefix + "-"):
break
else:
log.error("Module %r has Koji target with not allowed prefix.", module)
continue
delta = now - module.time_completed
if delta.total_seconds() > conf.koji_target_delete_time:
log.info("Removing target of module %r", module)
koji_session.deleteBuildTarget(target["id"])
@celery_app.task
def cleanup_stale_failed_builds():
"""Does various clean up tasks on stale failed module builds"""
if conf.system != "koji":
return
stale_date = datetime.utcnow() - timedelta(days=conf.cleanup_failed_builds_time)
stale_module_builds = db_session.query(models.ModuleBuild).filter(
models.ModuleBuild.state == models.BUILD_STATES["failed"],
models.ModuleBuild.time_modified <= stale_date,
).all()
if stale_module_builds:
log.info(
"%s stale failed module build(s) will be cleaned up",
len(stale_module_builds)
)
for module in stale_module_builds:
log.info("%r is stale and is being cleaned up", module)
# Find completed artifacts in the stale build
artifacts = [c for c in module.component_builds if c.is_completed]
# If there are no completed artifacts, then there is nothing to tag
if artifacts:
# Set buildroot_connect=False so it doesn't recreate the Koji target and etc.
builder = GenericBuilder.create_from_module(
db_session, module, conf, buildroot_connect=False
)
builder.untag_artifacts([c.nvr for c in artifacts])
# Mark the artifacts as untagged in the database
for c in artifacts:
c.tagged = False
c.tagged_in_final = False
db_session.add(c)
state_reason = (
"The module was garbage collected since it has failed over {0}"
" day(s) ago".format(conf.cleanup_failed_builds_time)
)
module.transition(
db_session,
conf,
models.BUILD_STATES["garbage"],
state_reason=state_reason,
failure_type="user",
)
db_session.add(module)
db_session.commit()
@celery_app.task
def cancel_stuck_module_builds():
"""
Method transitions builds which are stuck in one state too long to the "failed" state.
The states are defined with the "cleanup_stuck_builds_states" config option and the
time is defined by the "cleanup_stuck_builds_time" config option.
"""
log.info(
'Looking for module builds stuck in the states "%s" more than %s days',
" and ".join(conf.cleanup_stuck_builds_states),
conf.cleanup_stuck_builds_time,
)
threshold = datetime.utcnow() - timedelta(days=conf.cleanup_stuck_builds_time)
states = [
module_build_service.models.BUILD_STATES[state]
for state in conf.cleanup_stuck_builds_states
]
module_builds = db_session.query(models.ModuleBuild).filter(
models.ModuleBuild.state.in_(states),
models.ModuleBuild.time_modified < threshold
).all()
log.info(" %s module builds are stuck...", len(module_builds))
for build in module_builds:
log.info(
'Transitioning build "%s:%s:%s:%s" to "Failed" state.',
build.name, build.stream, build.version, build.context
)
state_reason = "The module was in {} for more than {} days".format(
build.state, conf.cleanup_stuck_builds_time
)
build.transition(
db_session,
conf,
state=models.BUILD_STATES["failed"],
state_reason=state_reason,
failure_type="user",
)
db_session.commit()
@celery_app.task
def sync_koji_build_tags():
"""
Method checking the "tagged" and "tagged_in_final" attributes of
"complete" ComponentBuilds in the current batch of module builds
in "building" state against the Koji.
In case the Koji shows the build as tagged/tagged_in_final,
fake "tagged" message is added to work queue.
"""
if conf.system != "koji":
return
koji_session = KojiModuleBuilder.get_session(conf, login=False)
threshold = datetime.utcnow() - timedelta(minutes=10)
module_builds = db_session.query(models.ModuleBuild).filter(
models.ModuleBuild.time_modified < threshold,
models.ModuleBuild.state == models.BUILD_STATES["build"]
).all()
for module_build in module_builds:
complete_components = module_build.current_batch(koji.BUILD_STATES["COMPLETE"])
for c in complete_components:
# In case the component is tagged in the build tag and
# also tagged in the final tag (or it is build_time_only
# and therefore should not be tagged in final tag), skip it.
if c.tagged and (c.tagged_in_final or c.build_time_only):
continue
log.info(
"%r: Component %r is complete, but not tagged in the "
"final and/or build tags.",
module_build, c,
)
# Check in which tags the component is tagged.
tag_dicts = koji_session.listTags(c.nvr)
tags = [tag_dict["name"] for tag_dict in tag_dicts]
# If it is tagged in final tag, but MBS does not think so,
# schedule fake message.
if not c.tagged_in_final and module_build.koji_tag in tags:
log.info(
"Apply tag %s to module build %r",
module_build.koji_tag, module_build)
tagged.delay(
"internal:sync_koji_build_tags",
module_build.koji_tag, c.package, c.nvr)
# If it is tagged in the build tag, but MBS does not think so,
# schedule fake message.
build_tag = module_build.koji_tag + "-build"
if not c.tagged and build_tag in tags:
log.info(
"Apply build tag %s to module build %r",
build_tag, module_build)
tagged.delay(
"internal:sync_koji_build_tags",
build_tag, c.package, c.nvr)
@celery_app.task
def poll_greenwave():
"""Polls Greenwave for all builds in done state"""
if greenwave is None:
return
module_builds = db_session.query(models.ModuleBuild).filter_by(
state=models.BUILD_STATES["done"],
scratch=False
).all()
log.info("Checking Greenwave for %d builds", len(module_builds))
for build in module_builds:
if greenwave.check_gating(build):
build.transition(db_session, conf, state=models.BUILD_STATES["ready"])
else:
build.state_reason = "Gating failed (MBS will retry in {0} seconds)".format(
conf.polling_interval
)
if greenwave.error_occurred:
build.state_reason += " (Error occured while querying Greenwave)"
build.time_modified = datetime.utcnow()
db_session.commit()
def has_missed_new_repo_message(module_build, koji_session):
"""
Returns whether or not a new repo message has probably been missed.
"""
@@ -504,7 +473,8 @@ def _has_missed_new_repo_message(module_build, koji_session):
# message so module build can recover.
return True
log.debug(
'Checking status of newRepo task "%d" for %s', module_build.new_repo_task_id, module_build)
'Checking status of newRepo task "%d" for %s',
module_build.new_repo_task_id, module_build)
task_info = koji_session.getTaskInfo(module_build.new_repo_task_id)
# Other final states, FAILED and CANCELED, are handled by retrigger_new_repo_on_failure
return task_info["state"] == koji.TASK_STATES["CLOSED"]

View File

@@ -8,7 +8,6 @@ import os
from os import path
import module_build_service.messaging
import module_build_service.scheduler.handlers.repos # noqa
from module_build_service import models, conf, build_logs, Modulemd
from module_build_service.db_session import db_session
from module_build_service.utils.general import mmd_to_str

View File

@@ -2,15 +2,13 @@
# SPDX-License-Identifier: MIT
import re
import pytest
from mock import patch
from mock import call, patch
from module_build_service import models, conf
from tests import clean_database, make_module_in_db
import mock
import koji
from module_build_service.db_session import db_session
from module_build_service.scheduler import events
from module_build_service.scheduler.producer import MBSProducer
import six.moves.queue as queue
from module_build_service.scheduler import producer
from datetime import datetime, timedelta
@@ -19,7 +17,6 @@ from datetime import datetime, timedelta
"module_build_service.builder.GenericBuilder.default_buildroot_groups",
return_value={"build": [], "srpm-build": []},
)
@patch("module_build_service.scheduler.consumer.get_global_consumer")
@patch("module_build_service.builder.GenericBuilder.create_from_module")
class TestPoller:
def setup_method(self, test_method):
@@ -40,15 +37,11 @@ class TestPoller:
@pytest.mark.parametrize("fresh", [True, False])
@patch("module_build_service.utils.batches.start_build_component")
def test_process_paused_module_builds(
self, start_build_component, create_builder, global_consumer, dbg, fresh
self, start_build_component, create_builder, dbg, fresh
):
"""
Tests general use-case of process_paused_module_builds.
"""
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
builder = mock.MagicMock()
create_builder.return_value = builder
@@ -64,9 +57,7 @@ class TestPoller:
db_session.commit()
# Poll :)
hub = mock.MagicMock()
poller = MBSProducer(hub)
poller.poll()
producer.process_paused_module_builds()
module_build = models.ModuleBuild.get_by_id(db_session, 3)
@@ -92,16 +83,12 @@ class TestPoller:
))
@patch("module_build_service.utils.batches.start_build_component")
def test_process_paused_module_builds_with_new_repo_task(
self, start_build_component, create_builder, global_consumer, dbg, task_state,
self, start_build_component, create_builder, dbg, task_state,
expect_start_build_component
):
"""
Tests general use-case of process_paused_module_builds.
"""
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
builder = mock.MagicMock()
create_builder.return_value = builder
@@ -118,9 +105,7 @@ class TestPoller:
db_session.commit()
# Poll :)
hub = mock.MagicMock()
poller = MBSProducer(hub)
poller.poll()
producer.process_paused_module_builds()
module_build = models.ModuleBuild.get_by_id(db_session, 3)
@@ -139,16 +124,10 @@ class TestPoller:
@patch.dict("sys.modules", krbV=mock.MagicMock())
@patch("koji.ClientSession")
def test_retrigger_new_repo_on_failure(
self, ClientSession, create_builder, global_consumer, dbg
):
def test_retrigger_new_repo_on_failure(self, ClientSession, create_builder, dbg):
"""
Tests that we call koji_sesion.newRepo when newRepo task failed.
"""
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
koji_session = ClientSession.return_value
koji_session.getTag = lambda tag_name: {"name": tag_name}
koji_session.getTaskInfo.return_value = {"state": koji.TASK_STATES["FAILED"]}
@@ -165,26 +144,18 @@ class TestPoller:
module_build.new_repo_task_id = 123456
db_session.commit()
hub = mock.MagicMock()
poller = MBSProducer(hub)
poller.poll()
producer.retrigger_new_repo_on_failure()
koji_session.newRepo.assert_called_once_with(
"module-testmodule-master-20170219191323-c40c156c-build")
@patch.dict("sys.modules", krbV=mock.MagicMock())
@patch("koji.ClientSession")
def test_trigger_new_repo_when_succeeded(
self, ClientSession, create_builder, global_consumer, dbg
):
def test_trigger_new_repo_when_succeeded(self, ClientSession, create_builder, dbg):
"""
Tests that we do not call koji_sesion.newRepo when newRepo task
succeeded.
"""
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
koji_session = ClientSession.return_value
koji_session.getTag = lambda tag_name: {"name": tag_name}
koji_session.getTaskInfo.return_value = {"state": koji.TASK_STATES["CLOSED"]}
@@ -201,26 +172,18 @@ class TestPoller:
module_build.new_repo_task_id = 123456
db_session.commit()
hub = mock.MagicMock()
poller = MBSProducer(hub)
poller.poll()
producer.retrigger_new_repo_on_failure()
module_build = models.ModuleBuild.get_by_id(db_session, 3)
assert not koji_session.newRepo.called
assert module_build.new_repo_task_id == 123456
def test_process_paused_module_builds_waiting_for_repo(
self, create_builder, global_consumer, dbg
):
def test_process_paused_module_builds_waiting_for_repo(self, create_builder, dbg):
"""
Tests that process_paused_module_builds does not start new batch
when we are waiting for repo.
"""
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
builder = mock.MagicMock()
create_builder.return_value = builder
@@ -232,9 +195,7 @@ class TestPoller:
db_session.commit()
# Poll :)
hub = mock.MagicMock()
poller = MBSProducer(hub)
poller.poll()
producer.process_paused_module_builds()
module_build = models.ModuleBuild.get_by_id(db_session, 3)
@@ -246,12 +207,8 @@ class TestPoller:
@patch.dict("sys.modules", krbV=mock.MagicMock())
@patch("koji.ClientSession")
def test_old_build_targets_are_not_associated_with_any_module_builds(
self, ClientSession, create_builder, global_consumer, dbg
self, ClientSession, create_builder, dbg
):
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
koji_session = ClientSession.return_value
# No created module build has any of these tags.
koji_session.getBuildTargets.return_value = [
@@ -259,16 +216,14 @@ class TestPoller:
{"dest_tag_name": "module-yyy-2"},
]
hub = mock.MagicMock()
poller = MBSProducer(hub)
poller.delete_old_koji_targets(conf)
producer.delete_old_koji_targets()
koji_session.deleteBuildTarget.assert_not_called()
@patch.dict("sys.modules", krbV=mock.MagicMock())
@patch("koji.ClientSession")
def test_dont_delete_base_module_build_target(
self, ClientSession, create_builder, global_consumer, dbg
self, ClientSession, create_builder, dbg
):
module_build = models.ModuleBuild.get_by_id(db_session, 3)
@@ -276,24 +231,16 @@ class TestPoller:
# No created module build has any of these tags.
koji_session.getBuildTargets.return_value = [{"dest_tag_name": module_build.koji_tag}]
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
# If module build's name is one of base module names, build target
# should not be deleted.
with patch.object(conf, "base_module_names", new=[module_build.name]):
hub = mock.MagicMock()
poller = MBSProducer(hub)
poller.delete_old_koji_targets(conf)
producer.delete_old_koji_targets()
koji_session.deleteBuildTarget.assert_not_called()
@patch.dict("sys.modules", krbV=mock.MagicMock())
@patch("koji.ClientSession")
def test_dont_delete_build_target_for_unfinished_module_builds(
self, ClientSession, create_builder, global_consumer, dbg
self, ClientSession, create_builder, dbg
):
module_build = models.ModuleBuild.get_by_id(db_session, 3)
@@ -301,26 +248,20 @@ class TestPoller:
# No created module build has any of these tags.
koji_session.getBuildTargets.return_value = [{"dest_tag_name": module_build.koji_tag}]
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
# Each time when a module build is in one of these state, build target
# should not be deleted.
for state in ["init", "wait", "build"]:
module_build.state = state
db_session.commit()
hub = mock.MagicMock()
poller = MBSProducer(hub)
poller.delete_old_koji_targets(conf)
producer.delete_old_koji_targets()
koji_session.deleteBuildTarget.assert_not_called()
@patch.dict("sys.modules", krbV=mock.MagicMock())
@patch("koji.ClientSession")
def test_only_delete_build_target_with_allowed_koji_tag_prefix(
self, ClientSession, create_builder, global_consumer, dbg
self, ClientSession, create_builder, dbg
):
module_build_2 = models.ModuleBuild.get_by_id(db_session, 2)
# Only module build 1's build target should be deleted.
@@ -343,15 +284,9 @@ class TestPoller:
{"id": 2, "dest_tag_name": module_build_3.koji_tag, "name": module_build_3.koji_tag},
]
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
with patch.object(conf, "koji_tag_prefixes", new=["module", "another-prefix"]):
with patch.object(conf, "koji_target_delete_time", new=60):
hub = mock.MagicMock()
poller = MBSProducer(hub)
poller.delete_old_koji_targets(conf)
producer.delete_old_koji_targets()
koji_session.deleteBuildTarget.assert_called_once_with(1)
koji_session.krb_login.assert_called_once()
@@ -359,7 +294,7 @@ class TestPoller:
@patch.dict("sys.modules", krbV=mock.MagicMock())
@patch("koji.ClientSession")
def test_cant_delete_build_target_if_not_reach_delete_time(
self, ClientSession, create_builder, global_consumer, dbg
self, ClientSession, create_builder, dbg
):
module_build_2 = models.ModuleBuild.get_by_id(db_session, 2)
# Only module build 1's build target should be deleted.
@@ -377,31 +312,22 @@ class TestPoller:
{"id": 1, "dest_tag_name": module_build_2.koji_tag, "name": module_build_2.koji_tag}
]
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
with patch.object(conf, "koji_tag_prefixes", new=["module"]):
# Use default koji_target_delete_time in config. That time is long
# enough for test.
hub = mock.MagicMock()
poller = MBSProducer(hub)
poller.delete_old_koji_targets(conf)
producer.delete_old_koji_targets()
koji_session.deleteBuildTarget.assert_not_called()
@pytest.mark.parametrize("state", ["init", "wait"])
def test_process_waiting_module_build(
self, create_builder, global_consumer, dbg, state
):
@patch.dict(producer.ON_MODULE_CHANGE_HANDLERS, clear=True, values={
models.BUILD_STATES["init"]: mock.Mock(),
models.BUILD_STATES["wait"]: mock.Mock(),
})
def test_process_waiting_module_build(self, create_builder, dbg, state):
""" Test that processing old waiting module builds works. """
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
hub = mock.MagicMock()
poller = MBSProducer(hub)
handler = producer.ON_MODULE_CHANGE_HANDLERS[models.BUILD_STATES[state]]
# Change the batch to 2, so the module build is in state where
# it is not building anything, but the state is "build".
@@ -413,32 +339,32 @@ class TestPoller:
db_session.commit()
db_session.refresh(module_build)
# Ensure the queue is empty before we start.
assert consumer.incoming.qsize() == 0
# Poll :)
poller.process_waiting_module_builds()
producer.process_waiting_module_builds()
assert consumer.incoming.qsize() == 1
handler.delay.assert_called_once_with(
"internal:mbs.module.state.change",
module_build.id,
module_build.state
)
db_session.refresh(module_build)
# ensure the time_modified was changed.
assert module_build.time_modified > original
@pytest.mark.parametrize("state", ["init", "wait"])
@patch.dict(producer.ON_MODULE_CHANGE_HANDLERS, clear=True, values={
models.BUILD_STATES["init"]: mock.Mock(),
models.BUILD_STATES["wait"]: mock.Mock(),
})
def test_process_waiting_module_build_not_old_enough(
self, create_builder, global_consumer, dbg, state
self, create_builder, dbg, state
):
""" Test that we do not process young waiting builds. """
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
handler = producer.ON_MODULE_CHANGE_HANDLERS[models.BUILD_STATES[state]]
hub = mock.MagicMock()
poller = MBSProducer(hub)
# Change the batch to 2, so the module build is in state where
# Change the batch to build, so the module build is in state where
# it is not building anything, but the state is "build".
module_build = models.ModuleBuild.get_by_id(db_session, 3)
module_build.state = models.BUILD_STATES[state]
@@ -448,37 +374,25 @@ class TestPoller:
db_session.commit()
db_session.refresh(module_build)
# Ensure the queue is empty before we start.
assert consumer.incoming.qsize() == 0
# Poll :)
poller.process_waiting_module_builds()
producer.process_waiting_module_builds()
# Ensure we did *not* process the 9 minute-old build.
assert consumer.incoming.qsize() == 0
handler.assert_not_called()
def test_process_waiting_module_build_none_found(
self, create_builder, global_consumer, dbg
):
@patch.dict(producer.ON_MODULE_CHANGE_HANDLERS, clear=True, values={
models.BUILD_STATES["init"]: mock.Mock(),
models.BUILD_STATES["wait"]: mock.Mock(),
})
def test_process_waiting_module_build_none_found(self, create_builder, dbg):
""" Test nothing happens when no module builds are waiting. """
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
hub = mock.MagicMock()
poller = MBSProducer(hub)
# Ensure the queue is empty before we start.
assert consumer.incoming.qsize() == 0
# Poll :)
poller.process_waiting_module_builds()
producer.process_waiting_module_builds()
# Ensure we did *not* process any of the non-waiting builds.
assert consumer.incoming.qsize() == 0
for handler in producer.ON_MODULE_CHANGE_HANDLERS.values():
handler.assert_not_called()
def test_cleanup_stale_failed_builds(self, create_builder, global_consumer, dbg):
def test_cleanup_stale_failed_builds(self, create_builder, dbg):
""" Test that one of the two module builds gets to the garbage state when running
cleanup_stale_failed_builds.
"""
@@ -502,15 +416,8 @@ class TestPoller:
db_session.commit()
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
hub = mock.MagicMock()
poller = MBSProducer(hub)
producer.cleanup_stale_failed_builds()
# Ensure the queue is empty before we start
assert consumer.incoming.qsize() == 0
poller.cleanup_stale_failed_builds(conf)
db_session.refresh(module_build_two)
# Make sure module_build_one was transitioned to garbage
assert module_build_one.state == models.BUILD_STATES["garbage"]
@@ -533,9 +440,7 @@ class TestPoller:
"module-build-macros-0.1-1.module+0+d027b723",
])
def test_cleanup_stale_failed_builds_no_components(
self, create_builder, global_consumer, dbg
):
def test_cleanup_stale_failed_builds_no_components(self, create_builder, dbg):
""" Test that a module build without any components built gets to the garbage state when
running cleanup_stale_failed_builds.
"""
@@ -555,15 +460,8 @@ class TestPoller:
db_session.commit()
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
hub = mock.MagicMock()
poller = MBSProducer(hub)
producer.cleanup_stale_failed_builds()
# Ensure the queue is empty before we start
assert consumer.incoming.qsize() == 0
poller.cleanup_stale_failed_builds(conf)
db_session.refresh(module_build_two)
# Make sure module_build_two was transitioned to garbage
assert module_build_two.state == models.BUILD_STATES["garbage"]
@@ -580,9 +478,7 @@ class TestPoller:
@pytest.mark.parametrize(
"test_state", [models.BUILD_STATES[state] for state in conf.cleanup_stuck_builds_states]
)
def test_cancel_stuck_module_builds(
self, create_builder, global_consumer, dbg, test_state
):
def test_cancel_stuck_module_builds(self, create_builder, dbg, test_state):
module_build1 = models.ModuleBuild.get_by_id(db_session, 1)
module_build1.state = test_state
@@ -601,17 +497,9 @@ class TestPoller:
db_session.commit()
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
hub = mock.MagicMock()
poller = MBSProducer(hub)
producer.cancel_stuck_module_builds()
assert consumer.incoming.qsize() == 0
poller.cancel_stuck_module_builds(conf)
module = db_session.query(models.ModuleBuild).filter_by(state=4).all()
module = models.ModuleBuild.by_state(db_session, "failed")
assert len(module) == 1
assert module[0].id == 2
@@ -619,8 +507,10 @@ class TestPoller:
@pytest.mark.parametrize("tagged_in_final", (True, False))
@pytest.mark.parametrize("btime", (True, False))
@patch("koji.ClientSession")
@patch("module_build_service.scheduler.producer.tagged")
def test_sync_koji_build_tags(
self, ClientSession, create_builder, global_consumer, dbg, tagged, tagged_in_final, btime
self, tagged_handler, ClientSession, create_builder, dbg,
tagged, tagged_in_final, btime
):
module_build_2 = models.ModuleBuild.get_by_id(db_session, 2)
# Only module build 1's build target should be deleted.
@@ -639,48 +529,35 @@ class TestPoller:
koji_session = ClientSession.return_value
# No created module build has any of these tags.
ret = []
listtags_return_value = []
expected_tagged_calls = []
if btime:
if tagged:
ret.append({"id": 1, "name": module_build_2.koji_tag + "-build"})
listtags_return_value.append(
{"id": 1, "name": module_build_2.koji_tag + "-build"})
expected_tagged_calls.append(call(
"internal:sync_koji_build_tags",
module_build_2.koji_tag + "-build", c.package, c.nvr
))
if tagged_in_final:
ret.append({"id": 2, "name": module_build_2.koji_tag})
koji_session.listTags.return_value = ret
listtags_return_value.append(
{"id": 2, "name": module_build_2.koji_tag})
expected_tagged_calls.append(call(
"internal:sync_koji_build_tags",
module_build_2.koji_tag, c.package, c.nvr
))
koji_session.listTags.return_value = listtags_return_value
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
hub = mock.MagicMock()
poller = MBSProducer(hub)
producer.sync_koji_build_tags()
assert consumer.incoming.qsize() == 0
poller.sync_koji_build_tags(conf)
assert consumer.incoming.qsize() == len(ret)
expected_msg_tags = []
if btime:
if tagged:
expected_msg_tags.append(module_build_2.koji_tag + "-build")
if tagged_in_final:
expected_msg_tags.append(module_build_2.koji_tag)
assert len(expected_msg_tags) == consumer.incoming.qsize()
for i in range(consumer.incoming.qsize()):
msg = consumer.incoming.get()
assert events.KOJI_TAG_CHANGE == msg["event"]
assert c.package == msg["build_name"]
assert c.nvr == msg["build_nvr"]
assert msg["tag_name"] in expected_msg_tags
tagged_handler.delay.assert_has_calls(
expected_tagged_calls, any_order=True)
@pytest.mark.parametrize("greenwave_result", [True, False])
@patch("module_build_service.utils.greenwave.Greenwave.check_gating")
def test_poll_greenwave(
self, mock_gw, create_builder, global_consumer, dbg, greenwave_result
):
def test_poll_greenwave(self, mock_gw, create_builder, dbg, greenwave_result):
module_build1 = models.ModuleBuild.get_by_id(db_session, 1)
module_build1.state = models.BUILD_STATES["ready"]
@@ -697,17 +574,9 @@ class TestPoller:
db_session.commit()
consumer = mock.MagicMock()
consumer.incoming = queue.Queue()
global_consumer.return_value = consumer
hub = mock.MagicMock()
poller = MBSProducer(hub)
assert consumer.incoming.qsize() == 0
mock_gw.return_value = greenwave_result
poller.poll_greenwave(conf)
producer.poll_greenwave()
mock_gw.assert_called_once()
modules = models.ModuleBuild.by_state(db_session, "ready")

View File

@@ -26,7 +26,6 @@ from module_build_service.errors import UnprocessableEntity
from module_build_service.models import ModuleBuild, BUILD_STATES, ComponentBuild
from module_build_service import version
import module_build_service.config as mbs_config
import module_build_service.scheduler.handlers.modules
import module_build_service.utils.submit
from module_build_service.utils.general import (
import_mmd, mmd_to_str, load_mmd,