mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-13 16:59:52 +08:00
Migrate scheduler to be run by fedmsg-hub
This commit is contained in:
3
Vagrantfile
vendored
3
Vagrantfile
vendored
@@ -6,6 +6,7 @@ $script = <<SCRIPT
|
||||
echo "export MODULE_BUILD_SERVICE_DEVELOPER_ENV=1" >> /etc/profile.d/module_build_service_developer_env.sh
|
||||
source /etc/profile.d/module_build_service_developer_env.sh
|
||||
dnf install -y \
|
||||
fedmsg-hub \
|
||||
fedmsg-relay \
|
||||
fedpkg \
|
||||
gcc \
|
||||
@@ -45,5 +46,5 @@ Vagrant.configure("2") do |config|
|
||||
config.vm.network "forwarded_port", guest: 13747, host: 13747
|
||||
config.vm.provision "shell", inline: $script
|
||||
config.vm.provision :shell, inline: "mbs-frontend &", run: "always"
|
||||
config.vm.provision :shell, inline: "mbs-daemon &", run: "always"
|
||||
config.vm.provision :shell, inline: "cd /tmp/module_build_service && fedmsg-hub &", run: "always"
|
||||
end
|
||||
|
||||
@@ -23,7 +23,7 @@ services:
|
||||
depends_on:
|
||||
- base
|
||||
build: .
|
||||
command: mbs-daemon
|
||||
command: fedmsg-hub
|
||||
links:
|
||||
- fedmsg-relay
|
||||
environment:
|
||||
|
||||
4
fedmsg.d/mbs-scheduler.py
Normal file
4
fedmsg.d/mbs-scheduler.py
Normal file
@@ -0,0 +1,4 @@
|
||||
config = {
|
||||
'mbsconsumer': True,
|
||||
'mbsproducer': True,
|
||||
}
|
||||
@@ -53,8 +53,8 @@ from OpenSSL.SSL import SysCallError
|
||||
from module_build_service import conf, log, db
|
||||
from module_build_service.models import ModuleBuild
|
||||
import module_build_service.scm
|
||||
import module_build_service.scheduler.main
|
||||
import module_build_service.utils
|
||||
import module_build_service.scheduler
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
@@ -1165,7 +1165,7 @@ $repos
|
||||
msg_id='a faked internal message',
|
||||
repo_tag=self.tag_name + "-build",
|
||||
)
|
||||
module_build_service.scheduler.main.outgoing_work_queue_put(msg)
|
||||
module_build_service.scheduler.work_queue.put(msg)
|
||||
|
||||
def _send_build_change(self, state, source, build_id):
|
||||
nvr = kobo.rpmlib.parse_nvr(source)
|
||||
|
||||
@@ -34,7 +34,6 @@ from module_build_service import models
|
||||
from module_build_service.pdc import (
|
||||
get_pdc_client_session, get_module, get_module_runtime_dependencies,
|
||||
get_module_tag, get_module_build_dependencies)
|
||||
import module_build_service.scheduler.main
|
||||
from module_build_service.utils import (
|
||||
submit_module_build,
|
||||
insert_fake_baseruntime,
|
||||
@@ -146,9 +145,10 @@ def build_module_locally(url):
|
||||
username = getpass.getuser()
|
||||
submit_module_build(username, url, allow_local_url=True)
|
||||
|
||||
msgs = []
|
||||
msgs.append(RidaModule("local module build", 2, 1))
|
||||
module_build_service.scheduler.main.main(msgs, True)
|
||||
# TODO: Ralph to the rescue
|
||||
# msgs = []
|
||||
# msgs.append(RidaModule("local module build", 2, 1))
|
||||
# module_build_service.scheduler.main.main(msgs, True)
|
||||
|
||||
|
||||
@manager.command
|
||||
|
||||
@@ -255,38 +255,11 @@ def publish(topic, msg, conf, service):
|
||||
raise KeyError("No messaging backend found for %r" % conf.messaging)
|
||||
return handler(topic, msg, conf, service)
|
||||
|
||||
|
||||
def listen(conf, **kwargs):
|
||||
"""
|
||||
Yield messages from the messaging backend in conf.messaging.
|
||||
:param conf: a Config object from the class in config.py
|
||||
:param kwargs: any additional arguments to pass to the backend handler
|
||||
:return: yields a message object (child class from BaseMessage)
|
||||
"""
|
||||
try:
|
||||
handler = _messaging_backends[conf.messaging]['listen']
|
||||
except KeyError:
|
||||
raise KeyError("No messaging backend found for %r" % conf.messaging)
|
||||
|
||||
for event in handler(conf, **kwargs):
|
||||
yield event
|
||||
|
||||
|
||||
def _fedmsg_publish(topic, msg, conf, service):
|
||||
# fedmsg doesn't really need access to conf, however other backends do
|
||||
import fedmsg
|
||||
return fedmsg.publish(topic, msg=msg, modname=service)
|
||||
|
||||
def _fedmsg_listen(conf, **kwargs):
|
||||
"""
|
||||
Parses a fedmsg event and constructs it into the appropriate message object
|
||||
"""
|
||||
import fedmsg
|
||||
for name, endpoint, topic, msg in fedmsg.tail_messages(**kwargs):
|
||||
msg_obj = BaseMessage.from_fedmsg(topic, msg)
|
||||
if msg_obj:
|
||||
yield msg_obj
|
||||
|
||||
def _amq_get_messenger(conf):
|
||||
import proton
|
||||
for attr in ('amq_private_key_file', 'amq_trusted_cert_file', 'amq_cert_file'):
|
||||
@@ -317,19 +290,6 @@ def _amq_get_messenger(conf):
|
||||
log.debug('proton.Messenger: Subscribing to address=%s' % url)
|
||||
return msngr
|
||||
|
||||
def _amq_listen(conf, **kwargs):
|
||||
import proton
|
||||
msngr = _amq_get_messenger(conf)
|
||||
msg = proton.Message()
|
||||
while True:
|
||||
msngr.recv()
|
||||
|
||||
while msngr.incoming:
|
||||
msngr.get(msg)
|
||||
msg_obj = BaseMessage.from_amq(msg.address, msg)
|
||||
if msg_obj:
|
||||
yield msg_obj
|
||||
|
||||
def _amq_publish(topic, msg, conf, service):
|
||||
import proton
|
||||
msngr = _amq_get_messenger(conf)
|
||||
@@ -356,6 +316,7 @@ def _in_memory_init(conf, **kwargs):
|
||||
_in_memory_msg_id = 0
|
||||
_in_memory_work_queue = queue.Queue()
|
||||
|
||||
# TODO: Ralph to the rescue
|
||||
def _in_memory_publish(topic, msg, conf, service):
|
||||
"""
|
||||
Puts the message to _in_memory_work_queue".
|
||||
@@ -377,6 +338,7 @@ def _in_memory_publish(topic, msg, conf, service):
|
||||
# Put the message to queue.
|
||||
_in_memory_work_queue.put(wrapped_msg)
|
||||
|
||||
# TODO: Ralph to the rescue
|
||||
def _in_memory_listen(conf, **kwargs):
|
||||
"""
|
||||
Yields the message from the _in_memory_work_queue when ready.
|
||||
@@ -394,16 +356,13 @@ _messaging_backends = {
|
||||
'fedmsg': {
|
||||
'init': _no_op,
|
||||
'publish': _fedmsg_publish,
|
||||
'listen': _fedmsg_listen,
|
||||
},
|
||||
'amq': {
|
||||
'init': _no_op,
|
||||
'publish': _amq_publish,
|
||||
'listen': _amq_listen,
|
||||
},
|
||||
'in_memory': {
|
||||
'init': _in_memory_init,
|
||||
'publish': _in_memory_publish,
|
||||
'listen': _in_memory_listen,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
import six.moves.queue as queue
|
||||
|
||||
work_queue = queue.Queue()
|
||||
|
||||
162
module_build_service/scheduler/consumer.py
Normal file
162
module_build_service/scheduler/consumer.py
Normal file
@@ -0,0 +1,162 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
#
|
||||
|
||||
""" The FedmsgConsumer class that acts as a consumer entry point for fedmsg-hub.
|
||||
This class reads and processes messages from the message bus it is configured
|
||||
to use.
|
||||
"""
|
||||
|
||||
import koji
|
||||
import inspect
|
||||
import fedmsg.consumers
|
||||
|
||||
from module_build_service.utils import module_build_state_from_msg
|
||||
import module_build_service.messaging
|
||||
import module_build_service.scheduler.handlers.repos
|
||||
import module_build_service.scheduler.handlers.components
|
||||
import module_build_service.scheduler.handlers.modules
|
||||
from module_build_service.scheduler import work_queue
|
||||
from module_build_service import models, log, conf
|
||||
|
||||
|
||||
class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
""" This is triggered by running fedmsg-hub. This class is responsible for
|
||||
ingesting and processing messages from the message bus.
|
||||
"""
|
||||
topic = '*'
|
||||
config_key = 'mbsconsumer'
|
||||
|
||||
def __init__(self, hub, initial_msgs=[]):
|
||||
super(MBSConsumer, self).__init__(hub)
|
||||
|
||||
for msg in initial_msgs:
|
||||
work_queue.put(msg)
|
||||
|
||||
# These are our main lookup tables for figuring out what to run in
|
||||
# response to what messaging events.
|
||||
self.NO_OP = NO_OP = lambda config, session, msg: True
|
||||
self.on_build_change = {
|
||||
koji.BUILD_STATES["BUILDING"]: NO_OP,
|
||||
koji.BUILD_STATES[
|
||||
"COMPLETE"]: module_build_service.scheduler.handlers.components.complete,
|
||||
koji.BUILD_STATES[
|
||||
"FAILED"]: module_build_service.scheduler.handlers.components.failed,
|
||||
koji.BUILD_STATES[
|
||||
"CANCELED"]: module_build_service.scheduler.handlers.components.canceled,
|
||||
koji.BUILD_STATES["DELETED"]: NO_OP,
|
||||
}
|
||||
self.on_module_change = {
|
||||
models.BUILD_STATES["init"]: NO_OP,
|
||||
models.BUILD_STATES[
|
||||
"wait"]: module_build_service.scheduler.handlers.modules.wait,
|
||||
models.BUILD_STATES["build"]: NO_OP,
|
||||
models.BUILD_STATES[
|
||||
"failed"]: module_build_service.scheduler.handlers.modules.failed,
|
||||
models.BUILD_STATES[
|
||||
"done"]: module_build_service.scheduler.handlers.modules.done,
|
||||
# XXX: DIRECT TRANSITION TO READY
|
||||
models.BUILD_STATES["ready"]: NO_OP,
|
||||
}
|
||||
# Only one kind of repo change event, though...
|
||||
self.on_repo_change = module_build_service.scheduler.handlers.repos.done
|
||||
self.sanity_check()
|
||||
|
||||
def consume(self, message):
|
||||
# Add the message to the work queue
|
||||
work_queue.put(self.get_abstracted_msg(message['body']))
|
||||
# Process all the messages in the work queue
|
||||
while not work_queue.empty():
|
||||
msg = work_queue.get()
|
||||
try:
|
||||
with models.make_session(conf) as session:
|
||||
self.process_message(session, msg)
|
||||
except Exception:
|
||||
log.exception('Failed while handling {0!r}'.format(
|
||||
msg.msg_id))
|
||||
log.info(msg)
|
||||
|
||||
def get_abstracted_msg(self, message):
|
||||
# Convert the message to an abstracted message
|
||||
if conf.messaging == 'fedmsg':
|
||||
msg = module_build_service.messaging.BaseMessage.from_fedmsg(
|
||||
message['topic'], message)
|
||||
elif conf.messaging == 'amq':
|
||||
msg = module_build_service.messaging.BaseMessage.from_amq(
|
||||
message['topic'], message)
|
||||
else:
|
||||
raise ValueError('The messaging format "{0}" is not supported'
|
||||
.format(conf.messaging))
|
||||
return msg
|
||||
|
||||
def sanity_check(self):
|
||||
""" On startup, make sure our implementation is sane. """
|
||||
# Ensure we have every state covered
|
||||
for state in models.BUILD_STATES:
|
||||
if models.BUILD_STATES[state] not in self.on_module_change:
|
||||
raise KeyError("Module build states %r not handled." % state)
|
||||
for state in koji.BUILD_STATES:
|
||||
if koji.BUILD_STATES[state] not in self.on_build_change:
|
||||
raise KeyError("Koji build states %r not handled." % state)
|
||||
|
||||
all_fns = (list(self.on_build_change.items()) +
|
||||
list(self.on_module_change.items()))
|
||||
for key, callback in all_fns:
|
||||
expected = ['config', 'session', 'msg']
|
||||
argspec = inspect.getargspec(callback)[0]
|
||||
if argspec != expected:
|
||||
raise ValueError("Callback %r, state %r has argspec %r!=%r" % (
|
||||
callback, key, argspec, expected))
|
||||
|
||||
def process_message(self, session, msg):
|
||||
log.debug('Received a message with an ID of "{0}" and of type "{1}"'
|
||||
.format(msg.msg_id, type(msg).__name__))
|
||||
|
||||
# Choose a handler for this message
|
||||
if type(msg) == module_build_service.messaging.KojiBuildChange:
|
||||
handler = self.on_build_change[msg.build_new_state]
|
||||
elif type(msg) == module_build_service.messaging.KojiRepoChange:
|
||||
handler = self.on_repo_change
|
||||
elif type(msg) == module_build_service.messaging.RidaModule:
|
||||
handler = self.on_module_change[module_build_state_from_msg(msg)]
|
||||
else:
|
||||
log.debug("Unhandled message...")
|
||||
return
|
||||
|
||||
# Execute our chosen handler
|
||||
idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__,
|
||||
msg.msg_id)
|
||||
if handler is self.NO_OP:
|
||||
log.debug("Handler is NO_OP: %s" % idx)
|
||||
else:
|
||||
log.info("Calling %s" % idx)
|
||||
further_work = handler(conf, session, msg) or []
|
||||
log.info("Done with %s" % idx)
|
||||
|
||||
# Handlers can *optionally* return a list of fake messages that
|
||||
# should be re-inserted back into the main work queue. We can use
|
||||
# this (for instance) when we submit a new component build but (for
|
||||
# some reason) it has already been built, then it can fake its own
|
||||
# completion back to the scheduler so that work resumes as if it
|
||||
# was submitted for real and koji announced its completion.
|
||||
for event in further_work:
|
||||
log.info(" Scheduling faked event %r" % event)
|
||||
work_queue.put(event)
|
||||
@@ -1,357 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
#
|
||||
# Written by Petr Šabata <contyk@redhat.com>
|
||||
# Ralph Bean <rbean@redhat.com>
|
||||
|
||||
"""The module build orchestrator for Modularity, the builder.
|
||||
|
||||
This is the main component of the orchestrator and is responsible for
|
||||
proper scheduling component builds in the supported build systems.
|
||||
"""
|
||||
|
||||
import inspect
|
||||
import operator
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import six.moves.queue as queue
|
||||
|
||||
import module_build_service.config
|
||||
import module_build_service.messaging
|
||||
import module_build_service.utils
|
||||
import module_build_service.scheduler.handlers.components
|
||||
import module_build_service.scheduler.handlers.modules
|
||||
import module_build_service.scheduler.handlers.repos
|
||||
|
||||
import koji
|
||||
|
||||
from module_build_service import conf, models, log
|
||||
|
||||
from sqlalchemy.orm import lazyload
|
||||
|
||||
|
||||
class STOP_WORK(object):
|
||||
""" A sentinel value, indicating that work should be stopped. """
|
||||
pass
|
||||
|
||||
|
||||
def module_build_state_from_msg(msg):
|
||||
state = int(msg.module_build_state)
|
||||
# TODO better handling
|
||||
assert state in models.BUILD_STATES.values(), (
|
||||
'state=%s(%s) is not in %s'
|
||||
% (state, type(state), list(models.BUILD_STATES.values())))
|
||||
return state
|
||||
|
||||
|
||||
class MessageIngest(threading.Thread):
|
||||
def __init__(self, outgoing_work_queue, stop_after_build, *args, **kwargs):
|
||||
self.outgoing_work_queue = outgoing_work_queue
|
||||
super(MessageIngest, self).__init__(*args, **kwargs)
|
||||
self.stop_after_build = stop_after_build
|
||||
|
||||
def run(self):
|
||||
for msg in module_build_service.messaging.listen(conf):
|
||||
self.outgoing_work_queue.put(msg)
|
||||
|
||||
if type(msg) == module_build_service.messaging.RidaModule:
|
||||
if (self.stop_after_build and module_build_state_from_msg(msg)
|
||||
in [models.BUILD_STATES["failed"], models.BUILD_STATES["ready"]]):
|
||||
break
|
||||
|
||||
|
||||
class MessageWorker(threading.Thread):
|
||||
|
||||
def __init__(self, incoming_work_queue, stop_after_build, *args, **kwargs):
|
||||
self.incoming_work_queue = incoming_work_queue
|
||||
self.stop_after_build = stop_after_build
|
||||
super(MessageWorker, self).__init__(*args, **kwargs)
|
||||
|
||||
# These are our main lookup tables for figuring out what to run in response
|
||||
# to what messaging events.
|
||||
self.NO_OP = NO_OP = lambda config, session, msg: True
|
||||
self.on_build_change = {
|
||||
koji.BUILD_STATES["BUILDING"]: NO_OP,
|
||||
koji.BUILD_STATES["COMPLETE"]: module_build_service.scheduler.handlers.components.complete,
|
||||
koji.BUILD_STATES["FAILED"]: module_build_service.scheduler.handlers.components.failed,
|
||||
koji.BUILD_STATES["CANCELED"]: module_build_service.scheduler.handlers.components.canceled,
|
||||
koji.BUILD_STATES["DELETED"]: NO_OP,
|
||||
}
|
||||
self.on_module_change = {
|
||||
models.BUILD_STATES["init"]: NO_OP,
|
||||
models.BUILD_STATES["wait"]: module_build_service.scheduler.handlers.modules.wait,
|
||||
models.BUILD_STATES["build"]: NO_OP,
|
||||
models.BUILD_STATES["failed"]: module_build_service.scheduler.handlers.modules.failed,
|
||||
models.BUILD_STATES["done"]: module_build_service.scheduler.handlers.modules.done, # XXX: DIRECT TRANSITION TO READY
|
||||
models.BUILD_STATES["ready"]: NO_OP,
|
||||
}
|
||||
# Only one kind of repo change event, though...
|
||||
self.on_repo_change = module_build_service.scheduler.handlers.repos.done
|
||||
|
||||
def sanity_check(self):
|
||||
""" On startup, make sure our implementation is sane. """
|
||||
# Ensure we have every state covered
|
||||
for state in models.BUILD_STATES:
|
||||
if models.BUILD_STATES[state] not in self.on_module_change:
|
||||
raise KeyError("Module build states %r not handled." % state)
|
||||
for state in koji.BUILD_STATES:
|
||||
if koji.BUILD_STATES[state] not in self.on_build_change:
|
||||
raise KeyError("Koji build states %r not handled." % state)
|
||||
|
||||
all_fns = (list(self.on_build_change.items()) +
|
||||
list(self.on_module_change.items()))
|
||||
for key, callback in all_fns:
|
||||
expected = ['config', 'session', 'msg']
|
||||
argspec = inspect.getargspec(callback)[0]
|
||||
if argspec != expected:
|
||||
raise ValueError("Callback %r, state %r has argspec %r!=%r" % (
|
||||
callback, key, argspec, expected))
|
||||
|
||||
def run(self):
|
||||
self.sanity_check()
|
||||
|
||||
while True:
|
||||
msg = self.incoming_work_queue.get()
|
||||
|
||||
if msg is STOP_WORK:
|
||||
log.info("Worker thread received STOP_WORK, shutting down...")
|
||||
break
|
||||
|
||||
try:
|
||||
with models.make_session(conf) as session:
|
||||
self.process_message(session, msg)
|
||||
except Exception:
|
||||
log.exception("Failed while handling %r" % msg.msg_id)
|
||||
log.info(msg)
|
||||
|
||||
def process_message(self, session, msg):
|
||||
log.debug('Received a message with an ID of "{0}" and of type "{1}"'
|
||||
.format(msg.msg_id, type(msg).__name__))
|
||||
|
||||
# Choose a handler for this message
|
||||
if type(msg) == module_build_service.messaging.KojiBuildChange:
|
||||
handler = self.on_build_change[msg.build_new_state]
|
||||
elif type(msg) == module_build_service.messaging.KojiRepoChange:
|
||||
handler = self.on_repo_change
|
||||
elif type(msg) == module_build_service.messaging.RidaModule:
|
||||
handler = self.on_module_change[module_build_state_from_msg(msg)]
|
||||
if (self.stop_after_build and module_build_state_from_msg(msg)
|
||||
in [models.BUILD_STATES["failed"], models.BUILD_STATES["ready"]]):
|
||||
self.incoming_work_queue.put(STOP_WORK)
|
||||
else:
|
||||
log.debug("Unhandled message...")
|
||||
return
|
||||
|
||||
# Execute our chosen handler
|
||||
idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__,
|
||||
msg.msg_id)
|
||||
if handler is self.NO_OP:
|
||||
log.debug("Handler is NO_OP: %s" % idx)
|
||||
else:
|
||||
log.info("Calling %s" % idx)
|
||||
further_work = handler(conf, session, msg) or []
|
||||
log.info("Done with %s" % idx)
|
||||
|
||||
# Handlers can *optionally* return a list of fake messages that
|
||||
# should be re-inserted back into the main work queue. We can use
|
||||
# this (for instance) when we submit a new component build but (for
|
||||
# some reason) it has already been built, then it can fake its own
|
||||
# completion back to the scheduler so that work resumes as if it
|
||||
# was submitted for real and koji announced its completion.
|
||||
for event in further_work:
|
||||
log.info(" Scheduling faked event %r" % event)
|
||||
self.incoming_work_queue.put(event)
|
||||
|
||||
|
||||
class Poller(threading.Thread):
|
||||
def __init__(self, outgoing_work_queue, *args, **kwargs):
|
||||
self.outgoing_work_queue = outgoing_work_queue
|
||||
super(Poller, self).__init__(*args, **kwargs)
|
||||
self.stop = False
|
||||
|
||||
def run(self):
|
||||
while not self.stop:
|
||||
with models.make_session(conf) as session:
|
||||
self.log_summary(session)
|
||||
# XXX: detect whether it's really stucked first
|
||||
# self.process_waiting_module_builds(session)
|
||||
self.process_open_component_builds(session)
|
||||
self.fail_lost_builds(session)
|
||||
self.process_paused_module_builds(conf, session)
|
||||
|
||||
log.info("Polling thread sleeping, %rs" % conf.polling_interval)
|
||||
for i in range(0, conf.polling_interval):
|
||||
time.sleep(1)
|
||||
if self.stop:
|
||||
break
|
||||
|
||||
def fail_lost_builds(self, session):
|
||||
# This function is supposed to be handling only
|
||||
# the part which can't be updated trough messaging (srpm-build failures).
|
||||
# Please keep it fit `n` slim. We do want rest to be processed elsewhere
|
||||
|
||||
# TODO re-use
|
||||
|
||||
if conf.system == "koji":
|
||||
# we don't do this on behalf of users
|
||||
koji_session = (
|
||||
module_build_service.builder.KojiModuleBuilder.get_session(conf, None))
|
||||
log.info("Querying tasks for statuses:")
|
||||
res = models.ComponentBuild.query.filter_by(state=koji.BUILD_STATES['BUILDING']).options(lazyload('module_build')).all()
|
||||
|
||||
log.info("Checking status for %d tasks." % len(res))
|
||||
for component_build in res:
|
||||
log.debug(component_build.json())
|
||||
if not component_build.task_id: # Don't check tasks which has not been triggered yet
|
||||
continue
|
||||
|
||||
log.info("Checking status of task_id=%s" % component_build.task_id)
|
||||
task_info = koji_session.getTaskInfo(component_build.task_id)
|
||||
|
||||
dead_states = (
|
||||
koji.TASK_STATES['CANCELED'],
|
||||
koji.TASK_STATES['FAILED'],
|
||||
)
|
||||
log.info(" task %r is in state %r" % (component_build.task_id, task_info['state']))
|
||||
if task_info['state'] in dead_states:
|
||||
# Fake a fedmsg message on our internal queue
|
||||
msg = module_build_service.messaging.KojiBuildChange(
|
||||
msg_id='a faked internal message',
|
||||
build_id=component_build.task_id,
|
||||
task_id=component_build.task_id,
|
||||
build_name=component_build.package,
|
||||
build_new_state=koji.BUILD_STATES['FAILED'],
|
||||
build_release=None,
|
||||
build_version=None
|
||||
)
|
||||
self.outgoing_work_queue.put(msg)
|
||||
|
||||
elif conf.system == "copr":
|
||||
# @TODO
|
||||
pass
|
||||
|
||||
elif conf.system == "mock":
|
||||
pass
|
||||
|
||||
def log_summary(self, session):
|
||||
log.info("Current status:")
|
||||
backlog = self.outgoing_work_queue.qsize()
|
||||
log.info(" * internal queue backlog is %i." % backlog)
|
||||
states = sorted(models.BUILD_STATES.items(), key=operator.itemgetter(1))
|
||||
for name, code in states:
|
||||
query = models.ModuleBuild.query.filter_by(state=code)
|
||||
count = query.count()
|
||||
if count:
|
||||
log.info(" * %i module builds in the %s state." % (count, name))
|
||||
if name == 'build':
|
||||
for module_build in query.all():
|
||||
log.info(" * %r" % module_build)
|
||||
for i in range(module_build.batch):
|
||||
n = len([c for c in module_build.component_builds
|
||||
if c.batch == i])
|
||||
log.info(" * %i components in batch %i" % (n, i))
|
||||
|
||||
def process_waiting_module_builds(self, session):
|
||||
log.info("Looking for module builds stuck in the wait state.")
|
||||
builds = models.ModuleBuild.by_state(session, "wait")
|
||||
# TODO -- do throttling calculation here...
|
||||
log.info(" %r module builds in the wait state..." % len(builds))
|
||||
for build in builds:
|
||||
# Fake a message to kickstart the build anew
|
||||
msg = {
|
||||
'topic': '.module.build.state.change',
|
||||
'msg': build.json(),
|
||||
}
|
||||
module_build_service.scheduler.handlers.modules.wait(conf, session, msg)
|
||||
|
||||
def process_open_component_builds(self, session):
|
||||
log.warning("process_open_component_builds is not yet implemented...")
|
||||
|
||||
def process_paused_module_builds(self, config, session):
|
||||
if module_build_service.utils.at_concurrent_component_threshold(
|
||||
config, session):
|
||||
log.debug('Will not attempt to start paused module builds due to '
|
||||
'the concurrent build threshold being met')
|
||||
return
|
||||
# Check to see if module builds that are in build state but don't have
|
||||
# any component builds being built can be worked on
|
||||
for module_build in session.query(models.ModuleBuild).filter_by(
|
||||
state=models.BUILD_STATES['build']).all():
|
||||
# If there are no components in the build state on the module build,
|
||||
# then no possible event will start off new component builds
|
||||
if not module_build.current_batch(koji.BUILD_STATES['BUILDING']):
|
||||
further_work = module_build_service.utils.start_build_batch(
|
||||
config, module_build, session, config.system)
|
||||
for event in further_work:
|
||||
log.info(" Scheduling faked event %r" % event)
|
||||
self.outgoing_work_queue.put(event)
|
||||
|
||||
|
||||
_work_queue = queue.Queue()
|
||||
|
||||
|
||||
def outgoing_work_queue_put(msg):
|
||||
_work_queue.put(msg)
|
||||
|
||||
|
||||
def graceful_stop():
|
||||
"""
|
||||
Here is the place to perform shutdown actions.
|
||||
|
||||
Do whatever is needed to do except for leaving the main thread, which
|
||||
would result in losing control of POSIX signals handling.
|
||||
"""
|
||||
log.warning("graceful_stop is not yet implemented, press Ctrl+C again...")
|
||||
while True:
|
||||
time.sleep(30)
|
||||
|
||||
|
||||
def main(initial_msgs=[], return_after_build=False):
|
||||
log.info("Starting module_build_service_daemon.")
|
||||
|
||||
module_build_service.messaging.init(conf)
|
||||
|
||||
for msg in initial_msgs:
|
||||
outgoing_work_queue_put(msg)
|
||||
|
||||
# This ingest thread puts work on the queue
|
||||
messaging_thread = MessageIngest(_work_queue, return_after_build)
|
||||
# This poller does other work, but also sometimes puts work in queue.
|
||||
polling_thread = Poller(_work_queue)
|
||||
# This worker takes work off the queue and handles it.
|
||||
worker_thread = MessageWorker(_work_queue, return_after_build)
|
||||
|
||||
messaging_thread.start()
|
||||
polling_thread.start()
|
||||
worker_thread.start()
|
||||
|
||||
try:
|
||||
while worker_thread.is_alive():
|
||||
time.sleep(3)
|
||||
except KeyboardInterrupt:
|
||||
log.info("Stopping module_build_service_daemon. Press Ctrl+C again to force.")
|
||||
try:
|
||||
graceful_stop()
|
||||
except KeyboardInterrupt:
|
||||
os._exit(0)
|
||||
finally:
|
||||
polling_thread.stop = True
|
||||
160
module_build_service/scheduler/producer.py
Normal file
160
module_build_service/scheduler/producer.py
Normal file
@@ -0,0 +1,160 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
#
|
||||
|
||||
""" The PollingProducer class that acts as a producer entry point for
|
||||
fedmsg-hub. This class polls the database for tasks to do.
|
||||
"""
|
||||
|
||||
import koji
|
||||
import operator
|
||||
from datetime import timedelta
|
||||
from sqlalchemy.orm import lazyload
|
||||
from moksha.hub.api.producer import PollingProducer
|
||||
|
||||
from module_build_service.scheduler import work_queue
|
||||
import module_build_service.scheduler
|
||||
from module_build_service import conf, models, log
|
||||
|
||||
|
||||
class MBSProducer(PollingProducer):
|
||||
frequency = timedelta(seconds=conf.polling_interval)
|
||||
|
||||
def poll(self):
|
||||
with models.make_session(conf) as session:
|
||||
self.log_summary(session)
|
||||
# XXX: detect whether it's actually stuck first
|
||||
# self.process_waiting_module_builds(session)
|
||||
self.process_open_component_builds(session)
|
||||
self.fail_lost_builds(session)
|
||||
self.process_paused_module_builds(conf, session)
|
||||
|
||||
log.info('Poller will now sleep for "{}" seconds'
|
||||
.format(conf.polling_interval))
|
||||
|
||||
def fail_lost_builds(self, session):
|
||||
# This function is supposed to be handling only the part which can't be
|
||||
# updated through messaging (e.g. srpm-build failures). Please keep it
|
||||
# fit `n` slim. We do want rest to be processed elsewhere
|
||||
# TODO re-use
|
||||
|
||||
if conf.system == 'koji':
|
||||
# We don't do this on behalf of users
|
||||
koji_session = module_build_service.builder.KojiModuleBuilder\
|
||||
.get_session(conf, None)
|
||||
log.info('Querying tasks for statuses:')
|
||||
res = models.ComponentBuild.query.filter_by(
|
||||
state=koji.BUILD_STATES['BUILDING']).options(
|
||||
lazyload('module_build')).all()
|
||||
|
||||
log.info('Checking status for {0} tasks'.format(len(res)))
|
||||
for component_build in res:
|
||||
log.debug(component_build.json())
|
||||
# Don't check tasks which haven't been triggered yet
|
||||
if not component_build.task_id:
|
||||
continue
|
||||
|
||||
log.info('Checking status of task_id "{0}"'
|
||||
.format(component_build.task_id))
|
||||
task_info = koji_session.getTaskInfo(component_build.task_id)
|
||||
|
||||
dead_states = (
|
||||
koji.TASK_STATES['CANCELED'],
|
||||
koji.TASK_STATES['FAILED'],
|
||||
)
|
||||
|
||||
log.info(' task {0!r} is in state {0!r}'.format(
|
||||
component_build.task_id, task_info['state']))
|
||||
if task_info['state'] in dead_states:
|
||||
# Fake a fedmsg message on our internal queue
|
||||
msg = module_build_service.messaging.KojiBuildChange(
|
||||
msg_id='a faked internal message',
|
||||
build_id=component_build.task_id,
|
||||
task_id=component_build.task_id,
|
||||
build_name=component_build.package,
|
||||
build_new_state=koji.BUILD_STATES['FAILED'],
|
||||
build_release=None,
|
||||
build_version=None
|
||||
)
|
||||
work_queue.put(msg)
|
||||
|
||||
elif conf.system == 'copr':
|
||||
# @TODO
|
||||
pass
|
||||
|
||||
elif conf.system == 'mock':
|
||||
pass
|
||||
|
||||
def log_summary(self, session):
|
||||
log.info('Current status:')
|
||||
backlog = work_queue.qsize()
|
||||
log.info(' * internal queue backlog is {0}'.format(backlog))
|
||||
states = sorted(models.BUILD_STATES.items(), key=operator.itemgetter(1))
|
||||
for name, code in states:
|
||||
query = models.ModuleBuild.query.filter_by(state=code)
|
||||
count = query.count()
|
||||
if count:
|
||||
log.info(' * {0} module builds in the {1} state'.format(
|
||||
count, name))
|
||||
if name == 'build':
|
||||
for module_build in query.all():
|
||||
log.info(' * {0!r}'.format(module_build))
|
||||
for i in range(module_build.batch):
|
||||
n = len([c for c in module_build.component_builds
|
||||
if c.batch == i ])
|
||||
log.info(' * {0} components in batch {1}'
|
||||
.format(n, i))
|
||||
|
||||
def process_waiting_module_builds(self, session):
|
||||
log.info('Looking for module builds stuck in the wait state')
|
||||
builds = models.ModuleBuild.by_state(session, 'wait')
|
||||
log.info(' {0!r} module builds in the wait state...'
|
||||
.format(len(builds)))
|
||||
for build in builds:
|
||||
# Fake a message to kickstart the build anew
|
||||
msg = {
|
||||
'topic': '.module.build.state.change',
|
||||
'msg': build.json(),
|
||||
}
|
||||
module_build_service.scheduler.handlers.modules.wait(
|
||||
conf, session, msg)
|
||||
|
||||
def process_open_component_builds(self, session):
|
||||
log.warning('process_open_component_builds is not yet implemented...')
|
||||
|
||||
def process_paused_module_builds(self, config, session):
|
||||
if module_build_service.utils.at_concurrent_component_threshold(
|
||||
config, session):
|
||||
log.debug('Will not attempt to start paused module builds due to '
|
||||
'the concurrent build threshold being met')
|
||||
return
|
||||
# Check to see if module builds that are in build state but don't have
|
||||
# any component builds being built can be worked on
|
||||
for module_build in session.query(models.ModuleBuild).filter_by(
|
||||
state=models.BUILD_STATES['build']).all():
|
||||
# If there are no components in the build state on the module build,
|
||||
# then no possible event will start off new component builds
|
||||
if not module_build.current_batch(koji.BUILD_STATES['BUILDING']):
|
||||
further_work = module_build_service.utils.start_build_batch(
|
||||
config, module_build, session, config.system)
|
||||
for event in further_work:
|
||||
log.info(" Scheduling faked event %r" % event)
|
||||
work_queue.put(event)
|
||||
@@ -548,3 +548,11 @@ def scm_url_schemes(terse=False):
|
||||
for scm_type, scm_schemes in scm_types.items():
|
||||
scheme_list.extend([scheme[:-3] for scheme in scm_schemes])
|
||||
return list(set(scheme_list))
|
||||
|
||||
def module_build_state_from_msg(msg):
|
||||
state = int(msg.module_build_state)
|
||||
# TODO better handling
|
||||
assert state in models.BUILD_STATES.values(), (
|
||||
'state=%s(%s) is not in %s'
|
||||
% (state, type(state), list(models.BUILD_STATES.values())))
|
||||
return state
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
#
|
||||
# Written by Petr Šabata <contyk@redhat.com>
|
||||
# Ralph Bean <rbean@redhat.com>
|
||||
"""The module build orchestrator for Modularity, the builder. """
|
||||
|
||||
import module_build_service.scheduler.main
|
||||
|
||||
if __name__ == '__main__':
|
||||
module_build_service.scheduler.main.main()
|
||||
7
setup.py
7
setup.py
@@ -24,11 +24,12 @@ setup(name='module-build-service',
|
||||
install_requires=requirements,
|
||||
tests_require=test_requirements,
|
||||
entry_points={
|
||||
'console_scripts': ['mbs-daemon = module_build_service.scheduler.main:main',
|
||||
'mbs-upgradedb = module_build_service.manage:upgradedb',
|
||||
'console_scripts': ['mbs-upgradedb = module_build_service.manage:upgradedb',
|
||||
'mbs-gencert = module_build_service.manage:generatelocalhostcert',
|
||||
'mbs-frontend = module_build_service.manage:runssl',
|
||||
'mbs-manager = module_build_service.manage:manager_wrapper']
|
||||
'mbs-manager = module_build_service.manage:manager_wrapper'],
|
||||
'moksha.consumer': 'mbsconsumer = module_build_service.scheduler.consumer:MBSConsumer',
|
||||
'moksha.producer': 'mbspoller = module_build_service.scheduler.producer:MBSProducer',
|
||||
},
|
||||
data_files=[('/etc/module-build-service/', ['conf/cacert.pem',
|
||||
'conf/config.py',
|
||||
|
||||
Reference in New Issue
Block a user