Replace further work by Scheduler class based on the "sched" module.

To support multiple backend, we need to get rid of `further_work` concept
which is used in multiple places in the MBS code. Before this commit, if
one handler wanted to execute another handler, it planned this work by
constructing fake message and returning it. MBSConsumer then planned
its execution by adding it into the event loop.

In this commit, the new `events.scheduler` instance of new Scheduler
class is used to acomplish this. If handler wants to execute another
handler, it simply schedules it using `events.scheduler.add` method.

In the end of each handler, the `events.scheduler.run` method is
executed which calls all the scheduled handlers.

The idea is that when Celery is enabled, we can change the
`Scheduler.run` method to execute the handlers using the Celery, while
during the local builds, we could execute them directly without Celery.

Use of Scheduler also fixes the issue with ordering of such calls. If
we would call the handlers directly, they could have been executed
in the middle of another handler leading to behavior incompatible
with the current `further_work` concept. Using the Scheduler, these
calls are executed always in the end of the handler no matter when
they have been scheduled.
This commit is contained in:
Jan Kaluza
2019-11-14 15:46:54 +01:00
committed by mprahl
parent fa697a950d
commit 473f7e5e5b
15 changed files with 249 additions and 279 deletions

View File

@@ -700,6 +700,11 @@ class KojiModuleBuilder(GenericBuilder):
:param component_build: a ComponentBuild object
:return: a list of msgs that MBS needs to process
"""
# Imported here because of circular dependencies.
from module_build_service.scheduler.handlers.tags import tagged as tagged_handler
from module_build_service.scheduler.handlers.components import (
build_task_finalize as build_task_finalize_handler)
opts = {"latest": True, "package": component_build.package, "inherit": False}
build_tagged = self.koji_session.listTagged(self.module_build_tag["name"], **opts)
dest_tagged = None
@@ -727,10 +732,10 @@ class KojiModuleBuilder(GenericBuilder):
nvr = "{name}-{version}-{release}".format(**untagged_build)
build = self.koji_session.getBuild(nvr)
break
further_work = []
# If the build doesn't exist, then return
# If the build doesn't exist, then return False
if not build:
return further_work
return False
# Start setting up MBS' database to use the existing build
log.info('Skipping build of "{0}" since it already exists.'.format(build["nvr"]))
@@ -741,19 +746,11 @@ class KojiModuleBuilder(GenericBuilder):
component_build.state_reason = "Found existing build"
nvr_dict = kobo.rpmlib.parse_nvr(component_build.nvr)
# Trigger a completed build message
further_work.append({
"msg_id": "recover_orphaned_artifact: fake message",
"event": events.KOJI_BUILD_CHANGE,
"build_id": build["build_id"],
"task_id": build["task_id"],
"build_new_state": koji.BUILD_STATES["COMPLETE"],
"build_name": component_build.package,
"build_version": nvr_dict["version"],
"build_release": nvr_dict["release"],
"module_build_id": component_build.module_build.id,
"state_reason": None
})
args = (
"recover_orphaned_artifact: fake message", build["build_id"], build["task_id"],
koji.BUILD_STATES["COMPLETE"], component_build.package, nvr_dict["version"],
nvr_dict["release"], component_build.module_build.id, None)
events.scheduler.add(build_task_finalize_handler, args)
component_tagged_in = []
if build_tagged:
component_tagged_in.append(self.module_build_tag["name"])
@@ -772,14 +769,11 @@ class KojiModuleBuilder(GenericBuilder):
'The build being skipped isn\'t tagged in the "{0}" tag. Will send a message to '
"the tag handler".format(tag)
)
further_work.append({
"msg_id": "recover_orphaned_artifact: fake message",
"event": events.KOJI_TAG_CHANGE,
"tag_name": tag,
"build_name": component_build.package,
"build_nvr": component_build.nvr,
})
return further_work
args = ("recover_orphaned_artifact: fake message", tag, component_build.package,
component_build.nvr)
events.scheduler.add(tagged_handler, args)
return True
def build(self, artifact_name, source):
"""

View File

@@ -318,6 +318,7 @@ class MockModuleBuilder(GenericBuilder):
pass
def buildroot_add_artifacts(self, artifacts, install=False):
from module_build_service.scheduler.handlers.repos import done as repos_done_handler
self._createrepo()
# TODO: This is just hack to install module-build-macros into the
@@ -330,9 +331,7 @@ class MockModuleBuilder(GenericBuilder):
self.groups.append("module-build-macros")
self._write_mock_config()
from module_build_service.scheduler.consumer import fake_repo_done_message
fake_repo_done_message(self.tag_name)
events.scheduler.add(repos_done_handler, ("fake_msg", self.tag_name + "-build"))
def tag_artifacts(self, artifacts):
pass
@@ -394,6 +393,8 @@ class MockModuleBuilder(GenericBuilder):
self._write_mock_config()
def _send_build_change(self, state, source, build_id):
from module_build_service.scheduler.handlers.components import (
build_task_finalize as build_task_finalize_handler)
try:
nvr = kobo.rpmlib.parse_nvr(source)
except ValueError:
@@ -401,18 +402,10 @@ class MockModuleBuilder(GenericBuilder):
# build_id=1 and task_id=1 are OK here, because we are building just
# one RPM at the time.
module_build_service.scheduler.consumer.work_queue_put({
"msg_id": "a faked internal message",
"event": events.KOJI_BUILD_CHANGE,
"build_id": build_id,
"task_id": build_id,
"build_name": nvr["name"],
"build_new_state": state,
"build_release": nvr["release"],
"build_version": nvr["version"],
"module_build_id": None,
"state_reason": None
})
args = (
"a faked internal message", build_id, build_id, state, nvr["name"], nvr["version"],
nvr["release"], None, None)
events.scheduler.add(build_task_finalize_handler, args)
def _save_log(self, resultsdir, log_name, artifact_name):
old_log = os.path.join(resultsdir, log_name)

View File

@@ -259,7 +259,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
kwargs.pop("event")
try:
further_work = handler(**kwargs) or []
handler(**kwargs)
except Exception as e:
log.exception("Could not process message handler.")
db_session.rollback()
@@ -275,16 +275,6 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
# Allow caller to do something when error is occurred.
raise
else:
# Handlers can *optionally* return a list of fake messages that
# should be re-inserted back into the main work queue. We can use
# this (for instance) when we submit a new component build but (for
# some reason) it has already been built, then it can fake its own
# completion back to the scheduler so that work resumes as if it
# was submitted for real and koji announced its completion.
for event in further_work:
log.info(" Scheduling faked event %r", event)
self.incoming.put(event)
finally:
MBSConsumer.current_module_build_id = None
log.debug("Done with %s", idx)
@@ -307,12 +297,3 @@ def work_queue_put(msg):
""" Artificially put a message into the work queue of the consumer. """
consumer = get_global_consumer()
consumer.incoming.put(msg)
def fake_repo_done_message(tag_name):
event_info = {
"msg_id": "a faked internal message",
"event": events.KOJI_REPO_CHANGE,
"repo_tag": tag_name + "-build"
}
work_queue_put(event_info)

View File

@@ -12,8 +12,76 @@ build is complete, Koji sends a message to topic buildsys.build.state.change,
however Brew sends to topic brew.build.complete, etc.
"""
import time
import sched
from functools import wraps
from module_build_service import log
KOJI_BUILD_CHANGE = "koji_build_change"
KOJI_TAG_CHANGE = "koji_tag_change"
KOJI_REPO_CHANGE = "koji_repo_change"
MBS_MODULE_STATE_CHANGE = "mbs_module_state_change"
GREENWAVE_DECISION_UPDATE = "greenwave_decision_update"
class Scheduler(sched.scheduler):
"""
Subclass of `sched.scheduler` allowing to schedule handlers calls.
If one of the MBS handler functions need to call another handler, they need to do it in a safe
way - such another handler call should not be done in the middle of another handler's
execution.
This class provides an solution for that. Handler can schedule run of other handler using
the `add` method. The handlers should use `mbs_event_handler` decorator which ensures that
the `run` method is called at the end of handler's execution and other scheduler handlers
are executed.
"""
def add(self, handler, arguments=()):
"""
Schedule execution of `handler` with `arguments`.
"""
self.enter(0, 0, handler, arguments)
def run(self):
"""
Runs scheduled handlers.
"""
log.debug("Running event scheduler with following events:")
for event in self.queue:
log.debug(" %r", event)
sched.scheduler.run(self)
def reset(self):
"""
Resets the Scheduler to initial state.
"""
while not self.empty():
self.cancel(self.queue[0])
scheduler = Scheduler(time.time, delayfunc=lambda x: x)
def mbs_event_handler():
"""
A decorator for MBS event handlers. It implements common tasks which should otherwise
be repeated in every MBS event handler, for example:
- at the end of handler, call events.scheduler.run().
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
finally:
scheduler.run()
return wrapper
return decorator

View File

@@ -16,6 +16,7 @@ from module_build_service.utils.batches import continue_batch_build
logging.basicConfig(level=logging.DEBUG)
@events.mbs_event_handler()
def build_task_finalize(
msg_id, build_id, task_id, build_new_state,
build_name, build_version, build_release,
@@ -97,8 +98,6 @@ def build_task_finalize(
parent.modulemd = mmd_to_str(mmd)
db_session.commit()
further_work = []
parent_current_batch = parent.current_batch()
# If there are no other components still building in a batch,
@@ -134,11 +133,9 @@ def build_task_finalize(
# The repository won't be regenerated in this case and therefore we generate fake repo
# change message here.
log.info("Batch done. No component to tag")
further_work += [{
"msg_id": "components::_finalize: fake msg",
"event": events.KOJI_REPO_CHANGE,
"repo_tag": builder.module_build_tag["name"],
}]
from module_build_service.scheduler.handlers.repos import done as repos_done_handler
events.scheduler.add(
repos_done_handler, ("fake_msg", builder.module_build_tag["name"]))
else:
built_component_nvrs_in_batch = [c.nvr for c in built_components_in_batch]
# tag && add to srpm-build group if neccessary
@@ -174,6 +171,4 @@ def build_task_finalize(
# build, try to call continue_batch_build again so in case we hit the
# threshold previously, we will submit another build from this batch.
builder = GenericBuilder.create_from_module(db_session, parent, conf)
further_work += continue_batch_build(conf, parent, builder)
return further_work
continue_batch_build(conf, parent, builder)

View File

@@ -4,6 +4,7 @@ from module_build_service import conf, log
from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder
from module_build_service.db_session import db_session
from module_build_service.models import ModuleBuild, BUILD_STATES
from module_build_service.scheduler import events
def get_corresponding_module_build(nvr):
@@ -31,6 +32,7 @@ def get_corresponding_module_build(nvr):
return ModuleBuild.get_by_id(db_session, module_build_id)
@events.mbs_event_handler()
def decision_update(msg_id, decision_context, subject_identifier, policies_satisfied):
"""Move module build to ready or failed according to Greenwave result

View File

@@ -40,6 +40,7 @@ def get_artifact_from_srpm(srpm_path):
return os.path.basename(srpm_path).replace(".src.rpm", "")
@events.mbs_event_handler()
def failed(msg_id, module_build_id, module_build_state):
"""Called whenever a module enters the 'failed' state.
@@ -99,6 +100,7 @@ def failed(msg_id, module_build_id, module_build_state):
GenericBuilder.clear_cache(build)
@events.mbs_event_handler()
def done(msg_id, module_build_id, module_build_state):
"""Called whenever a module enters the 'done' state.
@@ -136,6 +138,7 @@ def done(msg_id, module_build_id, module_build_state):
GenericBuilder.clear_cache(build)
@events.mbs_event_handler()
def init(msg_id, module_build_id, module_build_state):
"""Called whenever a module enters the 'init' state.
@@ -310,6 +313,7 @@ def get_content_generator_build_koji_tag(module_deps):
return conf.koji_cg_default_build_tag
@events.mbs_event_handler()
def wait(msg_id, module_build_id, module_build_state):
""" Called whenever a module enters the 'wait' state.
@@ -398,11 +402,9 @@ def wait(msg_id, module_build_id, module_build_state):
db_session.commit()
# Return a KojiRepoChange message so that the build can be transitioned to done
# in the repos handler
return [{
"msg_id": "handlers.modules.wait: fake msg",
"event": events.KOJI_REPO_CHANGE,
"repo_tag": builder.module_build_tag["name"],
}]
from module_build_service.scheduler.handlers.repos import done as repos_done_handler
events.scheduler.add(repos_done_handler, ("fake_msg", builder.module_build_tag["name"]))
return
# If all components in module build will be reused, we don't have to build
# module-build-macros, because there won't be any build done.
@@ -420,7 +422,6 @@ def wait(msg_id, module_build_id, module_build_state):
artifact_name = "module-build-macros"
component_build = models.ComponentBuild.from_component_name(db_session, artifact_name, build.id)
further_work = []
srpm = builder.get_disttag_srpm(
disttag=".%s" % get_rpm_release(db_session, build),
module_build=build)
@@ -437,10 +438,9 @@ def wait(msg_id, module_build_id, module_build_state):
# Commit and refresh so that the SQLAlchemy relationships are available
db_session.commit()
db_session.refresh(component_build)
msgs = builder.recover_orphaned_artifact(component_build)
if msgs:
recovered = builder.recover_orphaned_artifact(component_build)
if recovered:
log.info("Found an existing module-build-macros build")
further_work += msgs
# There was no existing artifact found, so lets submit the build instead
else:
task_id, state, reason, nvr = builder.build(artifact_name=artifact_name, source=srpm)
@@ -452,10 +452,9 @@ def wait(msg_id, module_build_id, module_build_state):
# It's possible that the build succeeded in the builder but some other step failed which
# caused module-build-macros to be marked as failed in MBS, so check to see if it exists
# first
msgs = builder.recover_orphaned_artifact(component_build)
if msgs:
recovered = builder.recover_orphaned_artifact(component_build)
if recovered:
log.info("Found an existing module-build-macros build")
further_work += msgs
else:
task_id, state, reason, nvr = builder.build(artifact_name=artifact_name, source=srpm)
component_build.task_id = task_id
@@ -475,9 +474,5 @@ def wait(msg_id, module_build_id, module_build_state):
build.new_repo_task_id = task_id
db_session.commit()
else:
further_work.append({
"msg_id": "fake msg",
"event": events.KOJI_REPO_CHANGE,
"repo_tag": builder.module_build_tag["name"],
})
return further_work
from module_build_service.scheduler.handlers.repos import done as repos_done_handler
events.scheduler.add(repos_done_handler, ("fake_msg", builder.module_build_tag["name"]))

View File

@@ -8,10 +8,12 @@ from module_build_service import conf, models, log
from module_build_service.builder import GenericBuilder
from module_build_service.utils import start_next_batch_build
from module_build_service.db_session import db_session
from module_build_service.scheduler import events
logging.basicConfig(level=logging.DEBUG)
@events.mbs_event_handler()
def done(msg_id, repo_tag):
"""Called whenever koji rebuilds a repo, any repo.
@@ -103,7 +105,6 @@ def done(msg_id, repo_tag):
has_unbuilt_components = any(c.is_unbuilt for c in module_build.component_builds)
has_failed_components = any(c.is_unsuccessful for c in module_build.component_builds)
further_work = []
if has_unbuilt_components and not has_failed_components:
# Ok, for the subset of builds that did complete successfully, check to
# see if they are in the buildroot before starting new batch.
@@ -114,8 +115,7 @@ def done(msg_id, repo_tag):
# Try to start next batch build, because there are still unbuilt
# components in a module.
further_work += start_next_batch_build(conf, module_build, builder)
start_next_batch_build(conf, module_build, builder)
else:
if has_failed_components:
state_reason = "Component(s) {} failed to build.".format(
@@ -137,5 +137,3 @@ def done(msg_id, repo_tag):
module_build.transition(db_session, conf, state=models.BUILD_STATES["done"])
db_session.commit()
return further_work

View File

@@ -12,6 +12,7 @@ from module_build_service.scheduler import events
logging.basicConfig(level=logging.DEBUG)
@events.mbs_event_handler()
def tagged(msg_id, tag_name, build_name, build_nvr):
"""Called whenever koji tags a build to tag.
@@ -53,8 +54,6 @@ def tagged(msg_id, tag_name, build_name, build_nvr):
)
return []
further_work = []
# If all components are tagged, start newRepo task.
if not any(c.is_completed and not c.is_tagged for c in module_build.up_to_current_batch()):
builder = GenericBuilder.create_from_module(
@@ -77,15 +76,11 @@ def tagged(msg_id, tag_name, build_name, build_nvr):
# would be useless to wait for a repository we will not use anyway.
log.info(
"All components in module tagged and built, skipping the last repo regeneration")
further_work += [{
"msg_id": "components::_finalize: fake msg",
"event": events.KOJI_REPO_CHANGE,
"repo_tag": builder.module_build_tag["name"],
}]
from module_build_service.scheduler.handlers.repos import done as repos_done_handler
events.scheduler.add(
repos_done_handler, ("fake_msg", builder.module_build_tag["name"]))
db_session.commit()
return further_work
def _is_new_repo_generating(module_build, koji_session):
""" Return whether or not a new repo is already being generated. """

View File

@@ -24,6 +24,7 @@ from module_build_service.scheduler import events
class MBSProducer(PollingProducer):
frequency = timedelta(seconds=conf.polling_interval)
@events.mbs_event_handler()
def poll(self):
try:
self.log_summary()
@@ -272,11 +273,8 @@ class MBSProducer(PollingProducer):
if _has_missed_new_repo_message(module_build, builder.koji_session):
log.info(" Processing the paused module build %r", module_build)
further_work = module_build_service.utils.start_next_batch_build(
module_build_service.utils.start_next_batch_build(
config, module_build, builder)
for event in further_work:
log.info(" Scheduling faked event %r" % event)
module_build_service.scheduler.consumer.work_queue_put(event)
# Check if we have met the threshold.
if module_build_service.utils.at_concurrent_component_threshold(config):

View File

@@ -104,7 +104,6 @@ def continue_batch_build(config, module, builder, components=None):
# Get the list of components to be built in this batch. We are not building
# all `unbuilt_components`, because we can meet the num_concurrent_builds
# threshold
further_work = []
components_to_build = []
# Sort the unbuilt_components so that the components that take the longest to build are
# first
@@ -115,8 +114,7 @@ def continue_batch_build(config, module, builder, components=None):
# Only evaluate new components
if not component.is_waiting_for_build:
continue
msgs = builder.recover_orphaned_artifact(component)
further_work += msgs
builder.recover_orphaned_artifact(component)
for c in unbuilt_components:
# If a previous build of the component was found, then the state will be marked as
@@ -149,7 +147,6 @@ def continue_batch_build(config, module, builder, components=None):
future.result()
db_session.commit()
return further_work
def start_next_batch_build(config, module, builder, components=None):
@@ -248,7 +245,6 @@ def start_next_batch_build(config, module, builder, components=None):
log.info("Starting build of next batch %d, %s" % (module.batch, unbuilt_components))
# Attempt to reuse any components possible in the batch before attempting to build any
further_work = []
unbuilt_components_after_reuse = []
components_reused = False
should_try_reuse = True
@@ -264,7 +260,7 @@ def start_next_batch_build(config, module, builder, components=None):
for c, reusable_c in zip(unbuilt_components, reusable_components):
if reusable_c:
components_reused = True
further_work += reuse_component(c, reusable_c)
reuse_component(c, reusable_c)
else:
unbuilt_components_after_reuse.append(c)
# Commit the changes done by reuse_component
@@ -274,12 +270,9 @@ def start_next_batch_build(config, module, builder, components=None):
# If all the components were reused in the batch then make a KojiRepoChange
# message and return
if components_reused and not unbuilt_components_after_reuse:
further_work.append({
"msg_id": "start_build_batch: fake msg",
"event": events.KOJI_REPO_CHANGE,
"repo_tag": builder.module_build_tag["name"],
})
return further_work
from module_build_service.scheduler.handlers.repos import done as repos_done_handler
events.scheduler.add(
repos_done_handler, ("start_build_batch: fake_msg", builder.module_build_tag["name"]))
return
return further_work + continue_batch_build(
config, module, builder, unbuilt_components_after_reuse)
continue_batch_build(config, module, builder, unbuilt_components_after_reuse)

View File

@@ -9,7 +9,8 @@ from module_build_service.scheduler import events
from module_build_service.utils.mse import get_base_module_mmds
def reuse_component(component, previous_component_build, change_state_now=False):
def reuse_component(component, previous_component_build, change_state_now=False,
schedule_fake_events=True):
"""
Reuses component build `previous_component_build` instead of building
component `component`
@@ -18,11 +19,17 @@ def reuse_component(component, previous_component_build, change_state_now=False)
This allows callers to reuse multiple component builds and commit them all
at once.
Returns the list of BaseMessage instances to be handled later by the
scheduler.
:param ComponentBuild component: Component whihch will reuse previous module build.
:param ComponentBuild previous_component_build: Previous component build to reuse.
:param bool change_state_now: When True, the component.state will be set to
previous_component_build.state. Otherwise, the component.state will be set to BUILDING.
:param bool schedule_fake_events: When True, the `events.scheduler.add` will be used to
schedule handlers.component.build_task_finalize handler call.
"""
import koji
from module_build_service.scheduler.handlers.components import (
build_task_finalize as build_task_finalize_handler)
log.info(
'Reusing component "{0}" from a previous module '
@@ -43,20 +50,13 @@ def reuse_component(component, previous_component_build, change_state_now=False)
component.state_reason = "Reused component from previous module build"
component.nvr = previous_component_build.nvr
nvr_dict = kobo.rpmlib.parse_nvr(component.nvr)
# Add this message to further_work so that the reused
# component will be tagged properly
return [{
"msg_id": "reuse_component: fake msg",
"event": events.KOJI_BUILD_CHANGE,
"build_id": None,
"task_id": component.task_id,
"build_new_state": previous_component_build.state,
"build_name": nvr_dict["name"],
"build_version": nvr_dict["version"],
"build_release": nvr_dict["release"],
"module_build_id": component.module_id,
"state_reason": component.state_reason,
}]
# Add this event to scheduler so that the reused component will be tagged properly.
if schedule_fake_events:
args = (
"reuse_component: fake msg", None, component.task_id, previous_component_build.state,
nvr_dict["name"], nvr_dict["version"], nvr_dict["release"], component.module_id,
component.state_reason)
events.scheduler.add(build_task_finalize_handler, args)
def get_reusable_module(module):
@@ -214,7 +214,7 @@ def attempt_to_reuse_all_components(builder, module):
module.batch = c.batch
# Reuse the component
reuse_component(c, component_to_reuse, True)
reuse_component(c, component_to_reuse, True, False)
components_to_tag.append(c.nvr)
# Tag them

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: MIT
import sched
import koji
import os
import re
@@ -19,6 +20,10 @@ from module_build_service import models, conf, build_logs
from module_build_service.db_session import db_session
from module_build_service.scheduler import events
from module_build_service.scheduler.local import make_simple_stop_condition
from module_build_service.scheduler.handlers.tags import tagged as tagged_handler
from module_build_service.scheduler.handlers.components import (
build_task_finalize as build_task_finalize_handler)
from module_build_service.scheduler.handlers.repos import done as repos_done_handler
from mock import patch, PropertyMock, Mock, MagicMock
from werkzeug.datastructures import FileStorage
@@ -232,43 +237,20 @@ class FakeModuleBuilder(GenericBuilder):
return {"name": self.tag_name + "-build"}
def _send_repo_done(self):
event_info = {
"msg_id": "a faked internal message",
"event": events.KOJI_REPO_CHANGE,
"repo_tag": self.tag_name + "-build",
}
module_build_service.scheduler.consumer.work_queue_put(event_info)
events.scheduler.add(repos_done_handler, ("fake_msg", self.tag_name + "-build"))
def _send_tag(self, artifact, nvr, dest_tag=True):
if dest_tag:
tag = self.tag_name
else:
tag = self.tag_name + "-build"
event_info = {
"msg_id": "a faked internal message",
"event": events.KOJI_TAG_CHANGE,
"tag_name": tag,
"build_name": artifact,
"build_nvr": nvr
}
module_build_service.scheduler.consumer.work_queue_put(event_info)
events.scheduler.add(tagged_handler, ("a faked internal message", tag, artifact, nvr))
def _send_build_change(self, state, name, build_id):
# build_id=1 and task_id=1 are OK here, because we are building just
# one RPM at the time.
event_info = {
"msg_id": "a faked internal message",
"event": events.KOJI_BUILD_CHANGE,
"build_id": build_id,
"task_id": build_id,
"build_name": name,
"build_new_state": state,
"build_release": "1",
"build_version": "1",
"module_build_id": None,
"state_reason": None
}
module_build_service.scheduler.consumer.work_queue_put(event_info)
args = ("a faked internal message", build_id, build_id, state, name, "1", "1", None, None)
events.scheduler.add(build_task_finalize_handler, args)
def build(self, artifact_name, source):
print("Starting building artifact %s: %s" % (artifact_name, source))
@@ -297,7 +279,6 @@ class FakeModuleBuilder(GenericBuilder):
pass
def recover_orphaned_artifact(self, component_build):
msgs = []
if self.INSTANT_COMPLETE:
disttag = module_build_service.utils.get_rpm_release(
self.db_session, component_build.module_build)
@@ -309,27 +290,17 @@ class FakeModuleBuilder(GenericBuilder):
component_build.state_reason = "Found existing build"
nvr_dict = kobo.rpmlib.parse_nvr(component_build.nvr)
# Send a message stating the build is complete
msgs.append({
"msg_id": "recover_orphaned_artifact: fake message",
"event": events.KOJI_BUILD_CHANGE,
"build_id": randint(1, 9999999),
"task_id": component_build.task_id,
"build_new_state": koji.BUILD_STATES["COMPLETE"],
"build_name": component_build.package,
"build_version": nvr_dict["version"],
"build_release": nvr_dict["release"],
"module_build_id": component_build.module_build.id,
"state_reason": None
})
args = ("recover_orphaned_artifact: fake message", randint(1, 9999999),
component_build.task_id, koji.BUILD_STATES["COMPLETE"],
component_build.package, nvr_dict["version"], nvr_dict["release"],
component_build.module_build.id, None)
events.scheduler.add(build_task_finalize_handler, args)
# Send a message stating that the build was tagged in the build tag
msgs.append({
"msg_id": "recover_orphaned_artifact: fake message",
"event": events.KOJI_TAG_CHANGE,
"tag_name": component_build.module_build.koji_tag + "-build",
"build_name": component_build.package,
"build_nvr": component_build.nvr,
})
return msgs
args = ("recover_orphaned_artifact: fake message",
component_build.module_build.koji_tag + "-build",
component_build.package, component_build.nvr)
events.scheduler.add(tagged_handler, args)
return True
def finalize(self, succeeded=None):
if FakeModuleBuilder.on_finalize_cb:
@@ -868,8 +839,9 @@ class TestBuild(BaseTestBuild):
new_callable=PropertyMock,
return_value=2,
)
@patch('module_build_service.scheduler.events.Scheduler.run', autospec=True)
def test_try_to_reach_concurrent_threshold(
self, conf_num_concurrent_builds, mocked_scm, mocked_get_user,
self, scheduler_run, conf_num_concurrent_builds, mocked_scm, mocked_get_user,
conf_system, dbg, hmsc
):
"""
@@ -896,24 +868,21 @@ class TestBuild(BaseTestBuild):
# the module build.
TestBuild._global_var = []
def stop(message):
def mocked_scheduler_run(self):
"""
Stop the scheduler when the module is built or when we try to build
more components than the num_concurrent_builds.
Store the number of concurrent builds between each handler call to global list so we
can examine it later.
"""
main_stop = make_simple_stop_condition()
num_building = (
db_session.query(models.ComponentBuild)
.filter_by(state=koji.BUILD_STATES["BUILDING"])
.count()
)
over_threshold = conf.num_concurrent_builds < num_building
TestBuild._global_var.append(num_building)
result = main_stop(message) or over_threshold
db_session.remove()
return result
sched.scheduler.run(self)
self.run_scheduler(stop_condition=stop)
scheduler_run.side_effect = mocked_scheduler_run
self.run_scheduler()
# _global_var looks similar to this: [0, 1, 0, 0, 2, 2, 1, 0, 0, 0]
# It shows the number of concurrent builds in the time. At first we

View File

@@ -89,6 +89,7 @@ class FakeKojiModuleBuilder(KojiModuleBuilder):
class TestKojiBuilder:
def setup_method(self, test_method):
init_data(1)
events.scheduler.reset()
self.config = mock.Mock()
self.config.koji_profile = conf.koji_profile
self.config.koji_repository_url = conf.koji_repository_url
@@ -105,6 +106,7 @@ class TestKojiBuilder:
def teardown_method(self, test_method):
self.p_read_config.stop()
events.scheduler.reset()
def test_tag_to_repo(self):
""" Test that when a repo msg hits us and we have no match,
@@ -145,29 +147,24 @@ class TestKojiBuilder:
component_build.state = None
component_build.nvr = None
actual = builder.recover_orphaned_artifact(component_build)
recovered = builder.recover_orphaned_artifact(component_build)
# recover_orphaned_artifact modifies a component build, but doesn't
# commit the changes.
db_session.commit()
assert len(actual) == 3
assert recovered
assert actual[0]["event"] == events.KOJI_BUILD_CHANGE
assert actual[0]["build_id"] == 91
assert actual[0]["task_id"] == 12345
assert actual[0]["build_new_state"] == koji.BUILD_STATES["COMPLETE"]
assert actual[0]["build_name"] == "rubygem-rails"
assert actual[0]["build_version"] == "1.0"
assert actual[0]["build_release"] == "1.module+e0095747"
assert actual[0]["module_build_id"] == 4
event_info = events.scheduler.queue[0][3]
assert event_info == ('recover_orphaned_artifact: fake message', 91, 12345, 1,
'rubygem-rails', '1.0', '1.module+e0095747', 4, None)
assert actual[1]["event"] == events.KOJI_TAG_CHANGE
assert actual[1]["tag_name"] == "module-foo-build"
assert actual[1]["build_name"] == "rubygem-rails"
event_info = events.scheduler.queue[1][3]
assert event_info == ('recover_orphaned_artifact: fake message', 'module-foo-build',
'rubygem-rails', 'foo-1.0-1.module+e0095747')
assert actual[2]["event"] == events.KOJI_TAG_CHANGE
assert actual[2]["tag_name"] == "module-foo"
assert actual[2]["build_name"] == "rubygem-rails"
event_info = events.scheduler.queue[2][3]
assert event_info == ('recover_orphaned_artifact: fake message', 'module-foo',
'rubygem-rails', 'foo-1.0-1.module+e0095747')
assert component_build.state == koji.BUILD_STATES["COMPLETE"]
assert component_build.task_id == 12345
@@ -206,18 +203,14 @@ class TestKojiBuilder:
component_build.state = None
db_session.commit()
actual = builder.recover_orphaned_artifact(component_build)
recovered = builder.recover_orphaned_artifact(component_build)
db_session.commit()
assert len(actual) == 1
assert actual[0]["event"] == events.KOJI_BUILD_CHANGE
assert actual[0]["build_id"] == 91
assert actual[0]["task_id"] == 12345
assert actual[0]["build_new_state"] == koji.BUILD_STATES["COMPLETE"]
assert actual[0]["build_name"] == "rubygem-rails"
assert actual[0]["build_version"] == "1.0"
assert actual[0]["build_release"] == "1.{0}".format(dist_tag)
assert actual[0]["module_build_id"] == 4
assert recovered
event_info = events.scheduler.queue[0][3]
assert event_info == ('recover_orphaned_artifact: fake message', 91, 12345, 1,
'rubygem-rails', '1.0', '1.module+2+b8661ee4', 4, None)
assert component_build.state == koji.BUILD_STATES["COMPLETE"]
assert component_build.task_id == 12345
assert component_build.state_reason == "Found existing build"
@@ -260,18 +253,14 @@ class TestKojiBuilder:
component_build.state = None
db_session.commit()
actual = builder.recover_orphaned_artifact(component_build)
recovered = builder.recover_orphaned_artifact(component_build)
db_session.commit()
assert len(actual) == 1
assert actual[0]["event"] == events.KOJI_BUILD_CHANGE
assert actual[0]["build_id"] == 91
assert actual[0]["task_id"] == 12345
assert actual[0]["build_new_state"] == koji.BUILD_STATES["COMPLETE"]
assert actual[0]["build_name"] == "module-build-macros"
assert actual[0]["build_version"] == "1.0"
assert actual[0]["build_release"] == "1.{0}".format(dist_tag)
assert actual[0]["module_build_id"] == 4
assert recovered
event_info = events.scheduler.queue[0][3]
assert event_info == ('recover_orphaned_artifact: fake message', 91, 12345, 1,
'module-build-macros', '1.0', "1.{0}".format(dist_tag), 4, None)
assert component_build.state == koji.BUILD_STATES["COMPLETE"]
assert component_build.task_id == 12345
assert component_build.state_reason == "Found existing build"
@@ -312,10 +301,10 @@ class TestKojiBuilder:
component_build.state = None
db_session.commit()
actual = builder.recover_orphaned_artifact(component_build)
recovered = builder.recover_orphaned_artifact(component_build)
db_session.commit()
assert actual == []
assert not recovered
# Make sure nothing erroneous gets tag
assert builder.koji_session.tagBuild.call_count == 0

View File

@@ -1198,11 +1198,13 @@ class DummyModuleBuilder(GenericBuilder):
class TestBatches:
def setup_method(self, test_method):
GenericBuilder.register_backend_class(DummyModuleBuilder)
events.scheduler.reset()
def teardown_method(self, test_method):
# clean_database()
DummyModuleBuilder.TAGGED_COMPONENTS = []
GenericBuilder.register_backend_class(KojiModuleBuilder)
events.scheduler.reset()
def test_start_next_batch_build_reuse(self, default_buildroot_groups):
"""
@@ -1218,7 +1220,8 @@ class TestBatches:
module_build.batch = 1
builder = mock.MagicMock()
further_work = module_build_service.utils.start_next_batch_build(
builder.module_build_tag = {"name": "module-fedora-27-build"}
module_build_service.utils.start_next_batch_build(
conf, module_build, builder)
# Batch number should increase.
@@ -1228,34 +1231,19 @@ class TestBatches:
# build_new_state set to COMPLETE, but the current component build
# state should be set to BUILDING, so KojiBuildChange message handler
# handles the change properly.
for event_info in further_work:
if event_info["event"] == events.KOJI_BUILD_CHANGE:
assert event_info["build_new_state"] == koji.BUILD_STATES["COMPLETE"]
for event in events.scheduler.queue:
event_info = event[3]
if event_info[0].startswith("reuse_component"):
assert event_info[3] == koji.BUILD_STATES["COMPLETE"]
component_build = models.ComponentBuild.from_component_event(
db_session,
task_id=event_info["task_id"],
module_id=event_info["module_build_id"])
task_id=event_info[2],
module_id=event_info[7])
assert component_build.state == koji.BUILD_STATES["BUILDING"]
# When we handle these KojiBuildChange messages, MBS should tag all
# the components just once.
for event_info in further_work:
if event_info["event"] == events.KOJI_BUILD_CHANGE:
module_build_service.scheduler.handlers.components.build_task_finalize(
msg_id=event_info["msg_id"],
build_id=event_info["build_id"],
task_id=event_info["task_id"],
build_new_state=event_info["build_new_state"],
build_name=event_info["build_name"],
build_version=event_info["build_version"],
build_release=event_info["build_release"],
module_build_id=event_info["module_build_id"],
state_reason=event_info["state_reason"]
)
# Since we have reused all the components in the batch, there should
# be fake KojiRepoChange message.
assert further_work[-1]["event"] == events.KOJI_REPO_CHANGE
events.scheduler.run()
# Check that packages have been tagged just once.
assert len(DummyModuleBuilder.TAGGED_COMPONENTS) == 2
@@ -1283,24 +1271,27 @@ class TestBatches:
builder = mock.MagicMock()
builder.recover_orphaned_artifact.return_value = []
further_work = module_build_service.utils.start_next_batch_build(
module_build_service.utils.start_next_batch_build(
conf, module_build, builder)
# Batch number should increase.
assert module_build.batch == 2
# Make sure we only have one message returned for the one reused component
assert len(further_work) == 1
assert len(events.scheduler.queue) == 1
# The KojiBuildChange message in further_work should have build_new_state
# set to COMPLETE, but the current component build state in the DB should be set
# to BUILDING, so KojiBuildChange message handler handles the change
# properly.
event_info = further_work[0]
assert event_info["build_new_state"] == koji.BUILD_STATES["COMPLETE"]
event_info = events.scheduler.queue[0][3]
assert event_info == ('reuse_component: fake msg', None, 90276227, 1, 'perl-Tangerine',
'0.23', '1.module+0+d027b723', 3,
'Reused component from previous module build')
component_build = models.ComponentBuild.from_component_event(
db_session,
task_id=event_info["task_id"],
module_id=event_info["module_build_id"])
task_id=event_info[2],
module_id=event_info[7],
)
assert component_build.state == koji.BUILD_STATES["BUILDING"]
assert component_build.package == "perl-Tangerine"
assert component_build.reused_component_id is not None
@@ -1328,13 +1319,13 @@ class TestBatches:
builder = mock.MagicMock()
builder.recover_orphaned_artifact.return_value = []
further_work = module_build_service.utils.start_next_batch_build(
module_build_service.utils.start_next_batch_build(
conf, module_build, builder)
# Batch number should increase.
assert module_build.batch == 2
# No component reuse messages should be returned
assert len(further_work) == 0
assert len(events.scheduler.queue) == 0
# Make sure that both components in the batch were submitted
assert len(mock_sbc.mock_calls) == 2
@@ -1375,24 +1366,26 @@ class TestBatches:
builder = mock.MagicMock()
builder.recover_orphaned_artifact.return_value = []
further_work = module_build_service.utils.start_next_batch_build(
module_build_service.utils.start_next_batch_build(
conf, module_build, builder)
# Batch number should increase
assert module_build.batch == 2
# Make sure we only have one message returned for the one reused component
assert len(further_work) == 1
assert len(events.scheduler.queue) == 1
# The buildsys.build.state.change message in further_work should have
# build_new_state set to COMPLETE, but the current component build state
# in the DB should be set to BUILDING, so the build state change handler
# handles the change properly.
event_info = further_work[0]
assert event_info["build_new_state"] == koji.BUILD_STATES["COMPLETE"]
event_info = events.scheduler.queue[0][3]
assert event_info == ('reuse_component: fake msg', None, 90276227, 1,
'perl-Tangerine', '0.23', '1.module+0+d027b723', 3,
'Reused component from previous module build')
component_build = models.ComponentBuild.from_component_event(
db_session,
task_id=event_info["task_id"],
module_id=event_info["module_build_id"],
task_id=event_info[2],
module_id=event_info[7],
)
assert component_build.state == koji.BUILD_STATES["BUILDING"]
assert component_build.package == "perl-Tangerine"
@@ -1409,19 +1402,23 @@ class TestBatches:
db_session, "perl-Tangerine", 3)
pt_component.state = koji.BUILD_STATES["COMPLETE"]
events.scheduler.reset()
# Start the next build batch
further_work = module_build_service.utils.start_next_batch_build(
module_build_service.utils.start_next_batch_build(
conf, module_build, builder)
# Batch number should increase
assert module_build.batch == 3
# Verify that tangerine was reused even though perl-Tangerine was rebuilt in the previous
# batch
event_info = further_work[0]
assert event_info["build_new_state"] == koji.BUILD_STATES["COMPLETE"]
event_info = events.scheduler.queue[0][3]
assert event_info == ('reuse_component: fake msg', None, 90276315, 1, 'tangerine', '0.22',
'3.module+0+d027b723', 3,
'Reused component from previous module build')
component_build = models.ComponentBuild.from_component_event(
db_session,
task_id=event_info["task_id"],
module_id=event_info["module_build_id"]
task_id=event_info[2],
module_id=event_info[7],
)
assert component_build.state == koji.BUILD_STATES["BUILDING"]
assert component_build.package == "tangerine"
@@ -1451,14 +1448,14 @@ class TestBatches:
builder = mock.MagicMock()
builder.recover_orphaned_artifact.return_value = []
further_work = module_build_service.utils.start_next_batch_build(
module_build_service.utils.start_next_batch_build(
conf, module_build, builder)
# Batch number should increase.
assert module_build.batch == 2
# Make sure we don't have any messages returned since no components should be reused
assert len(further_work) == 0
assert len(events.scheduler.queue) == 0
# Make sure both components are set to the build state but not reused
assert pt_component.state == koji.BUILD_STATES["BUILDING"]
assert pt_component.reused_component_id is None
@@ -1487,7 +1484,7 @@ class TestBatches:
db_session.commit()
builder = mock.MagicMock()
further_work = module_build_service.utils.start_next_batch_build(
module_build_service.utils.start_next_batch_build(
conf, module_build, builder)
# Batch number should not increase.
@@ -1495,7 +1492,8 @@ class TestBatches:
# Make sure start build was called for the second component which wasn't reused
mock_sbc.assert_called_once()
# No further work should be returned
assert len(further_work) == 0
assert len(events.scheduler.queue) == 0
def test_start_next_batch_build_repo_building(self, default_buildroot_groups):
"""
@@ -1524,9 +1522,11 @@ class TestBatches:
class TestLocalBuilds:
def setup_method(self):
clean_database()
events.scheduler.reset()
def teardown_method(self):
clean_database()
events.scheduler.reset()
def test_load_local_builds_name(self, conf_system, conf_resultsdir):
module_build_service.utils.load_local_builds("testmodule")