mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-13 20:29:53 +08:00
Merge #263 Migrate scheduler to be run by fedmsg-hub
This commit is contained in:
6
Vagrantfile
vendored
6
Vagrantfile
vendored
@@ -6,6 +6,7 @@ $script = <<SCRIPT
|
||||
echo "export MODULE_BUILD_SERVICE_DEVELOPER_ENV=1" >> /etc/profile.d/module_build_service_developer_env.sh
|
||||
source /etc/profile.d/module_build_service_developer_env.sh
|
||||
dnf install -y \
|
||||
fedmsg-hub \
|
||||
fedmsg-relay \
|
||||
fedpkg \
|
||||
gcc \
|
||||
@@ -27,6 +28,8 @@ $script = <<SCRIPT
|
||||
rpm-build \
|
||||
swig \
|
||||
systemd-devel
|
||||
mkdir /usr/share/fedmsg
|
||||
chown fedmsg:fedmsg /usr/share/fedmsg
|
||||
systemctl enable fedmsg-relay
|
||||
systemctl start fedmsg-relay
|
||||
mkdir /etc/module-build-service/
|
||||
@@ -42,7 +45,8 @@ Vagrant.configure("2") do |config|
|
||||
config.vm.synced_folder "./", "/tmp/module_build_service"
|
||||
config.vm.provision "file", source: "/var/tmp/krbcc", destination: "/var/tmp/krbcc", run: "always"
|
||||
config.vm.network "forwarded_port", guest: 5000, host: 5000
|
||||
config.vm.network "forwarded_port", guest: 13747, host: 13747
|
||||
config.vm.provision "shell", inline: $script
|
||||
config.vm.provision :shell, inline: "mbs-frontend &", run: "always"
|
||||
config.vm.provision :shell, inline: "mbs-daemon &", run: "always"
|
||||
config.vm.provision :shell, inline: "cd /tmp/module_build_service && fedmsg-hub &", run: "always"
|
||||
end
|
||||
|
||||
@@ -119,7 +119,7 @@ class DevConfiguration(BaseConfiguration):
|
||||
KOJI_ARCHES = ['x86_64']
|
||||
KOJI_REPOSITORY_URL = 'http://kojipkgs.stg.fedoraproject.org/repos'
|
||||
|
||||
OIDC_CLIENT_SECRETS = "client_secrets.json"
|
||||
OIDC_CLIENT_SECRETS = path.join(confdir, 'client_secrets.json')
|
||||
|
||||
SSL_CERTIFICATE_FILE = path.join(confdir, 'server.crt')
|
||||
SSL_CERTIFICATE_KEY_FILE = path.join(confdir, 'server.key')
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
[Unit]
|
||||
Description=Scheduler for the Module Build Service
|
||||
After=network.target
|
||||
Documentation=https://pagure.io/fm-orchestrator
|
||||
|
||||
[Service]
|
||||
ExecStart=/usr/bin/mbs-daemon
|
||||
Type=simple
|
||||
User=fedmsg
|
||||
Group=fedmsg
|
||||
Restart=on-failure
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
|
||||
@@ -8,7 +8,7 @@ def listen_for_token():
|
||||
Listens on port 13747 on localhost for a redirect request by OIDC
|
||||
server, parses the response and returns the "access_token" value.
|
||||
"""
|
||||
TCP_IP = '127.0.0.1'
|
||||
TCP_IP = '0.0.0.0'
|
||||
TCP_PORT = 13747
|
||||
BUFFER_SIZE = 1024
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ services:
|
||||
depends_on:
|
||||
- base
|
||||
build: .
|
||||
command: mbs-daemon
|
||||
command: fedmsg-hub
|
||||
links:
|
||||
- fedmsg-relay
|
||||
environment:
|
||||
|
||||
@@ -73,9 +73,9 @@ To start the frontend manually, run the following inside the guest::
|
||||
|
||||
$ mbs-frontend
|
||||
|
||||
To start the scheduler manually, run the following inside the guest::
|
||||
To start the scheduler manually, run the following at `/tmp/module_build_service` inside the guest::
|
||||
|
||||
$ mbs-daemon
|
||||
$ fedmsg-hub
|
||||
|
||||
Alternatively, you can restart the Vagrant guest, which inherently starts/restarts the frontend and the scheduler with::
|
||||
|
||||
|
||||
4
fedmsg.d/mbs-scheduler.py
Normal file
4
fedmsg.d/mbs-scheduler.py
Normal file
@@ -0,0 +1,4 @@
|
||||
config = {
|
||||
'mbsconsumer': True,
|
||||
'mbsproducer': True,
|
||||
}
|
||||
@@ -47,6 +47,7 @@ BuildRequires: systemd
|
||||
|
||||
Requires: systemd
|
||||
Requires: fedmsg
|
||||
Requires: fedmsg-hub
|
||||
Requires: git
|
||||
Requires: kobo
|
||||
Requires: kobo-rpmlib
|
||||
@@ -98,28 +99,14 @@ tasks:
|
||||
%install
|
||||
%py2_install
|
||||
|
||||
mkdir -p %{buildroot}%{_unitdir}/
|
||||
%{__install} -pm644 conf/mbs-scheduler.service \
|
||||
%{buildroot}%{_unitdir}/mbs-scheduler.service
|
||||
|
||||
%check
|
||||
nosetests-2
|
||||
|
||||
%post
|
||||
%systemd_post mbs-scheduler.service
|
||||
|
||||
%preun
|
||||
%systemd_preun mbs-scheduler.service
|
||||
|
||||
%postun
|
||||
%systemd_postun_with_restart mbs-scheduler.service
|
||||
|
||||
%files
|
||||
%doc README.rst
|
||||
%license LICENSE
|
||||
%{python2_sitelib}/module_build_service*
|
||||
%{_bindir}/mbs-*
|
||||
%{_unitdir}/mbs-scheduler.service
|
||||
%dir %{_sysconfdir}/module-build-service
|
||||
%config(noreplace) %{_sysconfdir}/module-build-service/config.py
|
||||
%config(noreplace) %{_sysconfdir}/module-build-service/koji.conf
|
||||
@@ -134,6 +121,9 @@ nosetests-2
|
||||
|
||||
|
||||
%changelog
|
||||
* Thu Dec 15 2016 Matt Prahl <mprahl@redhat.com> - 1.0.3-1
|
||||
- Replace systemd unit with fedmsg-hub
|
||||
|
||||
* Wed Dec 14 2016 Ralph Bean <rbean@redhat.com> - 1.0.2-1
|
||||
- Enable test suite in the check section.
|
||||
- Add systemd scriptlets, per review feedback.
|
||||
|
||||
@@ -53,8 +53,9 @@ from OpenSSL.SSL import SysCallError
|
||||
from module_build_service import conf, log, db
|
||||
from module_build_service.models import ModuleBuild
|
||||
import module_build_service.scm
|
||||
import module_build_service.scheduler.main
|
||||
import module_build_service.utils
|
||||
import module_build_service.scheduler
|
||||
import module_build_service.scheduler.consumer
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
@@ -1165,7 +1166,7 @@ $repos
|
||||
msg_id='a faked internal message',
|
||||
repo_tag=self.tag_name + "-build",
|
||||
)
|
||||
module_build_service.scheduler.main.outgoing_work_queue_put(msg)
|
||||
module_build_service.scheduler.consumer.work_queue_put(msg)
|
||||
|
||||
def _send_build_change(self, state, source, build_id):
|
||||
nvr = kobo.rpmlib.parse_nvr(source)
|
||||
@@ -1181,7 +1182,7 @@ $repos
|
||||
build_release=nvr["release"],
|
||||
build_version=nvr["version"]
|
||||
)
|
||||
module_build_service.scheduler.main.outgoing_work_queue_put(msg)
|
||||
module_build_service.scheduler.consumer.work_queue_put(msg)
|
||||
|
||||
def _save_log(self, log_name, artifact_name):
|
||||
old_log = os.path.join(self.resultsdir, log_name)
|
||||
@@ -1205,7 +1206,7 @@ $repos
|
||||
"--resultdir=%s" % self.resultsdir])
|
||||
|
||||
# Emit messages simulating complete build. These messages
|
||||
# are put in the scheduler.main._work_queue and are handled
|
||||
# are put in the scheduler's work queue and are handled
|
||||
# by MBS after the build_srpm() method returns and scope gets
|
||||
# back to scheduler.main.main() method.
|
||||
self._send_repo_done()
|
||||
@@ -1220,7 +1221,7 @@ $repos
|
||||
str(e)))
|
||||
|
||||
# Emit messages simulating complete build. These messages
|
||||
# are put in the scheduler.main._work_queue and are handled
|
||||
# are put in the scheduler's work queue and are handled
|
||||
# by MBS after the build_srpm() method returns and scope gets
|
||||
# back to scheduler.main.main() method.
|
||||
self._send_repo_done()
|
||||
|
||||
@@ -29,18 +29,23 @@ import ssl
|
||||
from shutil import rmtree
|
||||
import getpass
|
||||
|
||||
import fedmsg.config
|
||||
import moksha.hub
|
||||
import moksha.hub.hub
|
||||
import moksha.hub.reactor
|
||||
|
||||
from module_build_service import app, conf, db
|
||||
from module_build_service import models
|
||||
from module_build_service.pdc import (
|
||||
get_pdc_client_session, get_module, get_module_runtime_dependencies,
|
||||
get_module_tag, get_module_build_dependencies)
|
||||
import module_build_service.scheduler.main
|
||||
from module_build_service.utils import (
|
||||
submit_module_build,
|
||||
insert_fake_baseruntime,
|
||||
)
|
||||
from module_build_service.messaging import RidaModule
|
||||
import module_build_service.messaging
|
||||
import module_build_service.scheduler.consumer
|
||||
|
||||
|
||||
manager = Manager(app)
|
||||
@@ -119,7 +124,6 @@ def cleardb():
|
||||
@manager.command
|
||||
def build_module_locally(url):
|
||||
conf.set_item("system", "mock")
|
||||
conf.set_item("messaging", "in_memory")
|
||||
|
||||
# Use our own local SQLite3 database.
|
||||
confdir = os.path.abspath(os.path.dirname(__file__))
|
||||
@@ -146,9 +150,11 @@ def build_module_locally(url):
|
||||
username = getpass.getuser()
|
||||
submit_module_build(username, url, allow_local_url=True)
|
||||
|
||||
msgs = []
|
||||
msgs.append(RidaModule("local module build", 2, 1))
|
||||
module_build_service.scheduler.main.main(msgs, True)
|
||||
stop = module_build_service.scheduler.make_simple_stop_condition(db.session)
|
||||
initial_messages = [RidaModule("local module build", 2, 1)]
|
||||
|
||||
# Run the consumer until stop_condition returns True
|
||||
module_build_service.scheduler.main(initial_messages, stop)
|
||||
|
||||
|
||||
@manager.command
|
||||
@@ -294,8 +300,6 @@ def runssl(host=conf.host, port=conf.port, debug=conf.debug):
|
||||
"""
|
||||
logging.info('Starting Module Build Service frontend')
|
||||
|
||||
module_build_service.messaging.init(conf)
|
||||
|
||||
ssl_ctx = _establish_ssl_context()
|
||||
app.run(
|
||||
host=host,
|
||||
|
||||
@@ -33,7 +33,7 @@ except ImportError:
|
||||
from funcsigs import signature
|
||||
|
||||
from module_build_service import log
|
||||
import six.moves.queue as queue
|
||||
|
||||
|
||||
class BaseMessage(object):
|
||||
def __init__(self, msg_id):
|
||||
@@ -43,6 +43,18 @@ class BaseMessage(object):
|
||||
"""
|
||||
self.msg_id = msg_id
|
||||
|
||||
# Moksha calls `consumer.validate` on messages that it receives, and
|
||||
# even though we have validation turned off in the config there's still
|
||||
# a step that tries to access `msg['body']`, `msg['topic']` and
|
||||
# `msg.get('topic')`.
|
||||
# These are here just so that the `validate` method won't raise an
|
||||
# exception when we push our fake messages through.
|
||||
# Note that, our fake message pushing has worked for a while... but the
|
||||
# *latest* version of fedmsg has some code that exercises the bug. I
|
||||
# didn't hit this until I went to test in jenkins.
|
||||
self.body = {}
|
||||
self.topic = None
|
||||
|
||||
def __repr__(self):
|
||||
init_sig = signature(self.__init__)
|
||||
|
||||
@@ -54,6 +66,18 @@ class BaseMessage(object):
|
||||
|
||||
return "{}({})".format(type(self).__name__, ', '.join(args_strs))
|
||||
|
||||
def __getitem__(self, key):
|
||||
""" Used to trick moksha into thinking we are a dict. """
|
||||
return getattr(self, key)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
""" Used to trick moksha into thinking we are a dict. """
|
||||
return setattr(self, key, value)
|
||||
|
||||
def get(self, key, value=None):
|
||||
""" Used to trick moksha into thinking we are a dict. """
|
||||
return getattr(self, key, value)
|
||||
|
||||
@staticmethod
|
||||
def from_amq(topic, msg):
|
||||
msg_obj = None
|
||||
@@ -228,18 +252,6 @@ class RidaModule(BaseMessage):
|
||||
self.module_build_id = module_build_id
|
||||
self.module_build_state = module_build_state
|
||||
|
||||
def init(conf, **kwargs):
|
||||
"""
|
||||
Initialize the messaging backend.
|
||||
:param conf: a Config object from the class in config.py
|
||||
:param kwargs: any additional arguments to pass to the backend handler
|
||||
"""
|
||||
try:
|
||||
handler = _messaging_backends[conf.messaging]['init']
|
||||
except KeyError:
|
||||
raise KeyError("No messaging backend found for %r" % conf.messaging)
|
||||
return handler(conf, **kwargs)
|
||||
|
||||
def publish(topic, msg, conf, service):
|
||||
"""
|
||||
Publish a single message to a given backend, and return
|
||||
@@ -255,37 +267,44 @@ def publish(topic, msg, conf, service):
|
||||
raise KeyError("No messaging backend found for %r" % conf.messaging)
|
||||
return handler(topic, msg, conf, service)
|
||||
|
||||
|
||||
def listen(conf, **kwargs):
|
||||
"""
|
||||
Yield messages from the messaging backend in conf.messaging.
|
||||
:param conf: a Config object from the class in config.py
|
||||
:param kwargs: any additional arguments to pass to the backend handler
|
||||
:return: yields a message object (child class from BaseMessage)
|
||||
"""
|
||||
try:
|
||||
handler = _messaging_backends[conf.messaging]['listen']
|
||||
except KeyError:
|
||||
raise KeyError("No messaging backend found for %r" % conf.messaging)
|
||||
|
||||
for event in handler(conf, **kwargs):
|
||||
yield event
|
||||
|
||||
|
||||
def _fedmsg_publish(topic, msg, conf, service):
|
||||
# fedmsg doesn't really need access to conf, however other backends do
|
||||
import fedmsg
|
||||
return fedmsg.publish(topic, msg=msg, modname=service)
|
||||
|
||||
def _fedmsg_listen(conf, **kwargs):
|
||||
"""
|
||||
Parses a fedmsg event and constructs it into the appropriate message object
|
||||
"""
|
||||
import fedmsg
|
||||
for name, endpoint, topic, msg in fedmsg.tail_messages(**kwargs):
|
||||
msg_obj = BaseMessage.from_fedmsg(topic, msg)
|
||||
if msg_obj:
|
||||
yield msg_obj
|
||||
|
||||
# A counter used for in-memory messages.
|
||||
_in_memory_msg_id = 0
|
||||
_initial_messages = []
|
||||
|
||||
|
||||
def _in_memory_publish(topic, msg, conf, service):
|
||||
""" Puts the message into the in memory work queue. """
|
||||
# Increment the message ID.
|
||||
global _in_memory_msg_id
|
||||
_in_memory_msg_id += 1
|
||||
|
||||
# Create fake fedmsg from the message so we can reuse
|
||||
# the BaseMessage.from_fedmsg code to get the particular BaseMessage
|
||||
# class instance.
|
||||
wrapped_msg = BaseMessage.from_fedmsg(
|
||||
service + "." + topic,
|
||||
{"msg_id": str(_in_memory_msg_id), "msg": msg},
|
||||
)
|
||||
|
||||
# Put the message to queue.
|
||||
from module_build_service.scheduler.consumer import work_queue_put
|
||||
try:
|
||||
work_queue_put(wrapped_msg)
|
||||
except ValueError as e:
|
||||
log.warn("No MBSConsumer found. Shutting down? %r" % e)
|
||||
except AttributeError as e:
|
||||
# In the event that `moksha.hub._hub` hasn't yet been initialized, we
|
||||
# need to store messages on the side until it becomes available.
|
||||
# As a last-ditch effort, try to hang initial messages in the config.
|
||||
log.warn("Hub not initialized. Queueing on the side.")
|
||||
_initial_messages.append(wrapped_msg)
|
||||
|
||||
|
||||
def _amq_get_messenger(conf):
|
||||
import proton
|
||||
@@ -317,19 +336,6 @@ def _amq_get_messenger(conf):
|
||||
log.debug('proton.Messenger: Subscribing to address=%s' % url)
|
||||
return msngr
|
||||
|
||||
def _amq_listen(conf, **kwargs):
|
||||
import proton
|
||||
msngr = _amq_get_messenger(conf)
|
||||
msg = proton.Message()
|
||||
while True:
|
||||
msngr.recv()
|
||||
|
||||
while msngr.incoming:
|
||||
msngr.get(msg)
|
||||
msg_obj = BaseMessage.from_amq(msg.address, msg)
|
||||
if msg_obj:
|
||||
yield msg_obj
|
||||
|
||||
def _amq_publish(topic, msg, conf, service):
|
||||
import proton
|
||||
msngr = _amq_get_messenger(conf)
|
||||
@@ -341,69 +347,15 @@ def _amq_publish(topic, msg, conf, service):
|
||||
msngr.put(message)
|
||||
msngr.send()
|
||||
|
||||
# Queue for "in_memory" messaging.
|
||||
_in_memory_work_queue = queue.Queue()
|
||||
|
||||
# Message id for "in_memory" messaging.
|
||||
_in_memory_msg_id = 0
|
||||
|
||||
def _in_memory_init(conf, **kwargs):
|
||||
"""
|
||||
Initializes the In Memory messaging backend.
|
||||
"""
|
||||
global _in_memory_work_queue
|
||||
global _in_memory_msg_id
|
||||
_in_memory_msg_id = 0
|
||||
_in_memory_work_queue = queue.Queue()
|
||||
|
||||
def _in_memory_publish(topic, msg, conf, service):
|
||||
"""
|
||||
Puts the message to _in_memory_work_queue".
|
||||
"""
|
||||
|
||||
# Increment the message ID.
|
||||
global _in_memory_msg_id
|
||||
_in_memory_msg_id += 1
|
||||
|
||||
# Create fake fedmsg from the message so we can reuse
|
||||
# the BaseMessage.from_fedmsg code to get the particular BaseMessage
|
||||
# class instance.
|
||||
topic = service + "." + topic
|
||||
wrapped_msg = {}
|
||||
wrapped_msg["msg_id"] = str(_in_memory_msg_id)
|
||||
wrapped_msg["msg"] = msg
|
||||
wrapped_msg = BaseMessage.from_fedmsg(topic, wrapped_msg)
|
||||
|
||||
# Put the message to queue.
|
||||
_in_memory_work_queue.put(wrapped_msg)
|
||||
|
||||
def _in_memory_listen(conf, **kwargs):
|
||||
"""
|
||||
Yields the message from the _in_memory_work_queue when ready.
|
||||
"""
|
||||
while True:
|
||||
yield _in_memory_work_queue.get(True)
|
||||
|
||||
def _no_op(conf, **kwargs):
|
||||
"""
|
||||
No operation.
|
||||
"""
|
||||
pass
|
||||
|
||||
_messaging_backends = {
|
||||
'fedmsg': {
|
||||
'init': _no_op,
|
||||
'publish': _fedmsg_publish,
|
||||
'listen': _fedmsg_listen,
|
||||
},
|
||||
'amq': {
|
||||
'init': _no_op,
|
||||
'publish': _amq_publish,
|
||||
'listen': _amq_listen,
|
||||
},
|
||||
'in_memory': {
|
||||
'init': _in_memory_init,
|
||||
'publish': _in_memory_publish,
|
||||
'listen': _in_memory_listen,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
""" This is a sub-module for backend/scheduler functionality. """
|
||||
|
||||
import fedmsg
|
||||
import moksha.hub
|
||||
|
||||
import module_build_service.models
|
||||
import module_build_service.scheduler.consumer
|
||||
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def main(initial_messages, stop_condition):
|
||||
""" Run the consumer until some condition is met.
|
||||
|
||||
Setting stop_condition to None will run the consumer forever.
|
||||
"""
|
||||
|
||||
config = fedmsg.config.load_config()
|
||||
config['mbsconsumer'] = True
|
||||
config['mbsconsumer.stop_condition'] = stop_condition
|
||||
config['mbsconsumer.initial_messages'] = initial_messages
|
||||
|
||||
# Moksha requires that we subscribe to *something*, so tell it /dev/null
|
||||
# since we'll just be doing in-memory queue-based messaging for this single
|
||||
# build.
|
||||
config['zmq_enabled'] = True
|
||||
config['zmq_subscribe_endpoints'] = 'ipc:///dev/null'
|
||||
|
||||
consumers = [module_build_service.scheduler.consumer.MBSConsumer]
|
||||
|
||||
# Note that the hub we kick off here cannot send any message. You
|
||||
# should use fedmsg.publish(...) still for that.
|
||||
moksha.hub.main(
|
||||
# Pass in our config dict
|
||||
options=config,
|
||||
# Only run the specified consumers if any are so specified.
|
||||
consumers=consumers,
|
||||
# Tell moksha to quiet its logging.
|
||||
framework=False,
|
||||
)
|
||||
|
||||
|
||||
def make_simple_stop_condition(session):
|
||||
""" Return a simple stop_condition callable.
|
||||
|
||||
Intended to be used with the main() function here in manage.py and tests.
|
||||
|
||||
The stop_condition returns true when the latest module build enters the any
|
||||
of the finished states.
|
||||
"""
|
||||
|
||||
def stop_condition(message):
|
||||
# XXX - We ignore the message here and instead just query the DB.
|
||||
|
||||
# Grab the latest module build.
|
||||
module = session.query(module_build_service.models.ModuleBuild)\
|
||||
.order_by(module_build_service.models.ModuleBuild.id.desc())\
|
||||
.first()
|
||||
done = (
|
||||
module_build_service.models.BUILD_STATES["failed"],
|
||||
module_build_service.models.BUILD_STATES["ready"],
|
||||
# XXX should this one be removed?
|
||||
module_build_service.models.BUILD_STATES["done"],
|
||||
)
|
||||
result = module.state in done
|
||||
log.debug("stop_condition checking %r, got %r" % (module, result))
|
||||
return result
|
||||
|
||||
return stop_condition
|
||||
|
||||
206
module_build_service/scheduler/consumer.py
Normal file
206
module_build_service/scheduler/consumer.py
Normal file
@@ -0,0 +1,206 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
#
|
||||
|
||||
""" The FedmsgConsumer class that acts as a consumer entry point for fedmsg-hub.
|
||||
This class reads and processes messages from the message bus it is configured
|
||||
to use.
|
||||
"""
|
||||
|
||||
import koji
|
||||
import inspect
|
||||
import fedmsg.consumers
|
||||
import moksha.hub
|
||||
|
||||
from module_build_service.utils import module_build_state_from_msg
|
||||
import module_build_service.messaging
|
||||
import module_build_service.scheduler.handlers.repos
|
||||
import module_build_service.scheduler.handlers.components
|
||||
import module_build_service.scheduler.handlers.modules
|
||||
from module_build_service import models, log, conf
|
||||
|
||||
|
||||
class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
""" This is triggered by running fedmsg-hub. This class is responsible for
|
||||
ingesting and processing messages from the message bus.
|
||||
"""
|
||||
topic = '*'
|
||||
config_key = 'mbsconsumer'
|
||||
|
||||
def __init__(self, hub):
|
||||
super(MBSConsumer, self).__init__(hub)
|
||||
|
||||
# These two values are typically provided either by the unit tests or
|
||||
# by the local build command. They are empty in the production environ
|
||||
self.stop_condition = hub.config.get('mbsconsumer.stop_condition')
|
||||
initial_messages = hub.config.get('mbsconsumer.initial_messages', [])
|
||||
for msg in initial_messages:
|
||||
self.incoming.put(msg)
|
||||
|
||||
# Furthermore, extend our initial messages with any that were queued up
|
||||
# in the test environment before our hub was initialized.
|
||||
while module_build_service.messaging._initial_messages:
|
||||
msg = module_build_service.messaging._initial_messages.pop()
|
||||
self.incoming.put(msg)
|
||||
|
||||
# These are our main lookup tables for figuring out what to run in
|
||||
# response to what messaging events.
|
||||
self.NO_OP = NO_OP = lambda config, session, msg: True
|
||||
self.on_build_change = {
|
||||
koji.BUILD_STATES["BUILDING"]: NO_OP,
|
||||
koji.BUILD_STATES[
|
||||
"COMPLETE"]: module_build_service.scheduler.handlers.components.complete,
|
||||
koji.BUILD_STATES[
|
||||
"FAILED"]: module_build_service.scheduler.handlers.components.failed,
|
||||
koji.BUILD_STATES[
|
||||
"CANCELED"]: module_build_service.scheduler.handlers.components.canceled,
|
||||
koji.BUILD_STATES["DELETED"]: NO_OP,
|
||||
}
|
||||
self.on_module_change = {
|
||||
models.BUILD_STATES["init"]: NO_OP,
|
||||
models.BUILD_STATES[
|
||||
"wait"]: module_build_service.scheduler.handlers.modules.wait,
|
||||
models.BUILD_STATES["build"]: NO_OP,
|
||||
models.BUILD_STATES[
|
||||
"failed"]: module_build_service.scheduler.handlers.modules.failed,
|
||||
models.BUILD_STATES[
|
||||
"done"]: module_build_service.scheduler.handlers.modules.done,
|
||||
# XXX: DIRECT TRANSITION TO READY
|
||||
models.BUILD_STATES["ready"]: NO_OP,
|
||||
}
|
||||
# Only one kind of repo change event, though...
|
||||
self.on_repo_change = module_build_service.scheduler.handlers.repos.done
|
||||
self.sanity_check()
|
||||
|
||||
def shutdown(self):
|
||||
log.info("Scheduling shutdown.")
|
||||
from moksha.hub.reactor import reactor
|
||||
reactor.callFromThread(self.hub.stop)
|
||||
reactor.callFromThread(reactor.stop)
|
||||
|
||||
def consume(self, message):
|
||||
log.info("Received %r" % message)
|
||||
|
||||
# Sometimes, the messages put into our queue are artificially put there
|
||||
# by other parts of our own codebase. If they are already abstracted
|
||||
# messages, then just use them as-is. If they are not already
|
||||
# instances of our message abstraction base class, then first transform
|
||||
# them before proceeding.
|
||||
if isinstance(message, module_build_service.messaging.BaseMessage):
|
||||
msg = message
|
||||
else:
|
||||
msg = self.get_abstracted_msg(message['body'])
|
||||
|
||||
# Primary work is done here.
|
||||
try:
|
||||
with models.make_session(conf) as session:
|
||||
self.process_message(session, msg)
|
||||
except Exception:
|
||||
log.exception('Failed while handling {0!r}'.format(msg.msg_id))
|
||||
log.info(msg)
|
||||
|
||||
if self.stop_condition and self.stop_condition(message):
|
||||
self.shutdown()
|
||||
|
||||
def get_abstracted_msg(self, message):
|
||||
# Convert the message to an abstracted message
|
||||
if conf.messaging == 'fedmsg':
|
||||
msg = module_build_service.messaging.BaseMessage.from_fedmsg(
|
||||
message['topic'], message)
|
||||
elif conf.messaging == 'amq':
|
||||
msg = module_build_service.messaging.BaseMessage.from_amq(
|
||||
message['topic'], message)
|
||||
else:
|
||||
raise ValueError('The messaging format "{0}" is not supported'
|
||||
.format(conf.messaging))
|
||||
return msg
|
||||
|
||||
def sanity_check(self):
|
||||
""" On startup, make sure our implementation is sane. """
|
||||
# Ensure we have every state covered
|
||||
for state in models.BUILD_STATES:
|
||||
if models.BUILD_STATES[state] not in self.on_module_change:
|
||||
raise KeyError("Module build states %r not handled." % state)
|
||||
for state in koji.BUILD_STATES:
|
||||
if koji.BUILD_STATES[state] not in self.on_build_change:
|
||||
raise KeyError("Koji build states %r not handled." % state)
|
||||
|
||||
all_fns = (list(self.on_build_change.items()) +
|
||||
list(self.on_module_change.items()))
|
||||
for key, callback in all_fns:
|
||||
expected = ['config', 'session', 'msg']
|
||||
argspec = inspect.getargspec(callback)[0]
|
||||
if argspec != expected:
|
||||
raise ValueError("Callback %r, state %r has argspec %r!=%r" % (
|
||||
callback, key, argspec, expected))
|
||||
|
||||
def process_message(self, session, msg):
|
||||
log.debug('Received a message with an ID of "{0}" and of type "{1}"'
|
||||
.format(msg.msg_id, type(msg).__name__))
|
||||
|
||||
# Choose a handler for this message
|
||||
if type(msg) == module_build_service.messaging.KojiBuildChange:
|
||||
handler = self.on_build_change[msg.build_new_state]
|
||||
elif type(msg) == module_build_service.messaging.KojiRepoChange:
|
||||
handler = self.on_repo_change
|
||||
elif type(msg) == module_build_service.messaging.RidaModule:
|
||||
handler = self.on_module_change[module_build_state_from_msg(msg)]
|
||||
else:
|
||||
log.debug("Unhandled message...")
|
||||
return
|
||||
|
||||
# Execute our chosen handler
|
||||
idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__, msg.msg_id)
|
||||
if handler is self.NO_OP:
|
||||
log.debug("Handler is NO_OP: %s" % idx)
|
||||
else:
|
||||
log.info("Calling %s" % idx)
|
||||
further_work = handler(conf, session, msg) or []
|
||||
log.info("Done with %s" % idx)
|
||||
|
||||
# Handlers can *optionally* return a list of fake messages that
|
||||
# should be re-inserted back into the main work queue. We can use
|
||||
# this (for instance) when we submit a new component build but (for
|
||||
# some reason) it has already been built, then it can fake its own
|
||||
# completion back to the scheduler so that work resumes as if it
|
||||
# was submitted for real and koji announced its completion.
|
||||
for event in further_work:
|
||||
log.info(" Scheduling faked event %r" % event)
|
||||
self.incoming.put(event)
|
||||
|
||||
|
||||
def get_global_consumer():
|
||||
""" Return a handle to the active consumer object, if it exists. """
|
||||
hub = moksha.hub._hub
|
||||
if not hub:
|
||||
raise ValueError("No global moksha-hub obj found.")
|
||||
|
||||
for consumer in hub.consumers:
|
||||
if isinstance(consumer, MBSConsumer):
|
||||
return consumer
|
||||
|
||||
raise ValueError("No MBSConsumer found among %r." % len(hub.consumers))
|
||||
|
||||
|
||||
def work_queue_put(msg):
|
||||
""" Artificially put a message into the work queue of the consumer. """
|
||||
consumer = get_global_consumer()
|
||||
consumer.incoming.put(msg)
|
||||
@@ -1,357 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
#
|
||||
# Written by Petr Šabata <contyk@redhat.com>
|
||||
# Ralph Bean <rbean@redhat.com>
|
||||
|
||||
"""The module build orchestrator for Modularity, the builder.
|
||||
|
||||
This is the main component of the orchestrator and is responsible for
|
||||
proper scheduling component builds in the supported build systems.
|
||||
"""
|
||||
|
||||
import inspect
|
||||
import operator
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import six.moves.queue as queue
|
||||
|
||||
import module_build_service.config
|
||||
import module_build_service.messaging
|
||||
import module_build_service.utils
|
||||
import module_build_service.scheduler.handlers.components
|
||||
import module_build_service.scheduler.handlers.modules
|
||||
import module_build_service.scheduler.handlers.repos
|
||||
|
||||
import koji
|
||||
|
||||
from module_build_service import conf, models, log
|
||||
|
||||
from sqlalchemy.orm import lazyload
|
||||
|
||||
|
||||
class STOP_WORK(object):
|
||||
""" A sentinel value, indicating that work should be stopped. """
|
||||
pass
|
||||
|
||||
|
||||
def module_build_state_from_msg(msg):
|
||||
state = int(msg.module_build_state)
|
||||
# TODO better handling
|
||||
assert state in models.BUILD_STATES.values(), (
|
||||
'state=%s(%s) is not in %s'
|
||||
% (state, type(state), list(models.BUILD_STATES.values())))
|
||||
return state
|
||||
|
||||
|
||||
class MessageIngest(threading.Thread):
|
||||
def __init__(self, outgoing_work_queue, stop_after_build, *args, **kwargs):
|
||||
self.outgoing_work_queue = outgoing_work_queue
|
||||
super(MessageIngest, self).__init__(*args, **kwargs)
|
||||
self.stop_after_build = stop_after_build
|
||||
|
||||
def run(self):
|
||||
for msg in module_build_service.messaging.listen(conf):
|
||||
self.outgoing_work_queue.put(msg)
|
||||
|
||||
if type(msg) == module_build_service.messaging.RidaModule:
|
||||
if (self.stop_after_build and module_build_state_from_msg(msg)
|
||||
in [models.BUILD_STATES["failed"], models.BUILD_STATES["ready"]]):
|
||||
break
|
||||
|
||||
|
||||
class MessageWorker(threading.Thread):
|
||||
|
||||
def __init__(self, incoming_work_queue, stop_after_build, *args, **kwargs):
|
||||
self.incoming_work_queue = incoming_work_queue
|
||||
self.stop_after_build = stop_after_build
|
||||
super(MessageWorker, self).__init__(*args, **kwargs)
|
||||
|
||||
# These are our main lookup tables for figuring out what to run in response
|
||||
# to what messaging events.
|
||||
self.NO_OP = NO_OP = lambda config, session, msg: True
|
||||
self.on_build_change = {
|
||||
koji.BUILD_STATES["BUILDING"]: NO_OP,
|
||||
koji.BUILD_STATES["COMPLETE"]: module_build_service.scheduler.handlers.components.complete,
|
||||
koji.BUILD_STATES["FAILED"]: module_build_service.scheduler.handlers.components.failed,
|
||||
koji.BUILD_STATES["CANCELED"]: module_build_service.scheduler.handlers.components.canceled,
|
||||
koji.BUILD_STATES["DELETED"]: NO_OP,
|
||||
}
|
||||
self.on_module_change = {
|
||||
models.BUILD_STATES["init"]: NO_OP,
|
||||
models.BUILD_STATES["wait"]: module_build_service.scheduler.handlers.modules.wait,
|
||||
models.BUILD_STATES["build"]: NO_OP,
|
||||
models.BUILD_STATES["failed"]: module_build_service.scheduler.handlers.modules.failed,
|
||||
models.BUILD_STATES["done"]: module_build_service.scheduler.handlers.modules.done, # XXX: DIRECT TRANSITION TO READY
|
||||
models.BUILD_STATES["ready"]: NO_OP,
|
||||
}
|
||||
# Only one kind of repo change event, though...
|
||||
self.on_repo_change = module_build_service.scheduler.handlers.repos.done
|
||||
|
||||
def sanity_check(self):
|
||||
""" On startup, make sure our implementation is sane. """
|
||||
# Ensure we have every state covered
|
||||
for state in models.BUILD_STATES:
|
||||
if models.BUILD_STATES[state] not in self.on_module_change:
|
||||
raise KeyError("Module build states %r not handled." % state)
|
||||
for state in koji.BUILD_STATES:
|
||||
if koji.BUILD_STATES[state] not in self.on_build_change:
|
||||
raise KeyError("Koji build states %r not handled." % state)
|
||||
|
||||
all_fns = (list(self.on_build_change.items()) +
|
||||
list(self.on_module_change.items()))
|
||||
for key, callback in all_fns:
|
||||
expected = ['config', 'session', 'msg']
|
||||
argspec = inspect.getargspec(callback)[0]
|
||||
if argspec != expected:
|
||||
raise ValueError("Callback %r, state %r has argspec %r!=%r" % (
|
||||
callback, key, argspec, expected))
|
||||
|
||||
def run(self):
|
||||
self.sanity_check()
|
||||
|
||||
while True:
|
||||
msg = self.incoming_work_queue.get()
|
||||
|
||||
if msg is STOP_WORK:
|
||||
log.info("Worker thread received STOP_WORK, shutting down...")
|
||||
break
|
||||
|
||||
try:
|
||||
with models.make_session(conf) as session:
|
||||
self.process_message(session, msg)
|
||||
except Exception:
|
||||
log.exception("Failed while handling %r" % msg.msg_id)
|
||||
log.info(msg)
|
||||
|
||||
def process_message(self, session, msg):
|
||||
log.debug('Received a message with an ID of "{0}" and of type "{1}"'
|
||||
.format(msg.msg_id, type(msg).__name__))
|
||||
|
||||
# Choose a handler for this message
|
||||
if type(msg) == module_build_service.messaging.KojiBuildChange:
|
||||
handler = self.on_build_change[msg.build_new_state]
|
||||
elif type(msg) == module_build_service.messaging.KojiRepoChange:
|
||||
handler = self.on_repo_change
|
||||
elif type(msg) == module_build_service.messaging.RidaModule:
|
||||
handler = self.on_module_change[module_build_state_from_msg(msg)]
|
||||
if (self.stop_after_build and module_build_state_from_msg(msg)
|
||||
in [models.BUILD_STATES["failed"], models.BUILD_STATES["ready"]]):
|
||||
self.incoming_work_queue.put(STOP_WORK)
|
||||
else:
|
||||
log.debug("Unhandled message...")
|
||||
return
|
||||
|
||||
# Execute our chosen handler
|
||||
idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__,
|
||||
msg.msg_id)
|
||||
if handler is self.NO_OP:
|
||||
log.debug("Handler is NO_OP: %s" % idx)
|
||||
else:
|
||||
log.info("Calling %s" % idx)
|
||||
further_work = handler(conf, session, msg) or []
|
||||
log.info("Done with %s" % idx)
|
||||
|
||||
# Handlers can *optionally* return a list of fake messages that
|
||||
# should be re-inserted back into the main work queue. We can use
|
||||
# this (for instance) when we submit a new component build but (for
|
||||
# some reason) it has already been built, then it can fake its own
|
||||
# completion back to the scheduler so that work resumes as if it
|
||||
# was submitted for real and koji announced its completion.
|
||||
for event in further_work:
|
||||
log.info(" Scheduling faked event %r" % event)
|
||||
self.incoming_work_queue.put(event)
|
||||
|
||||
|
||||
class Poller(threading.Thread):
|
||||
def __init__(self, outgoing_work_queue, *args, **kwargs):
|
||||
self.outgoing_work_queue = outgoing_work_queue
|
||||
super(Poller, self).__init__(*args, **kwargs)
|
||||
self.stop = False
|
||||
|
||||
def run(self):
|
||||
while not self.stop:
|
||||
with models.make_session(conf) as session:
|
||||
self.log_summary(session)
|
||||
# XXX: detect whether it's really stucked first
|
||||
# self.process_waiting_module_builds(session)
|
||||
self.process_open_component_builds(session)
|
||||
self.fail_lost_builds(session)
|
||||
self.process_paused_module_builds(conf, session)
|
||||
|
||||
log.info("Polling thread sleeping, %rs" % conf.polling_interval)
|
||||
for i in range(0, conf.polling_interval):
|
||||
time.sleep(1)
|
||||
if self.stop:
|
||||
break
|
||||
|
||||
def fail_lost_builds(self, session):
|
||||
# This function is supposed to be handling only
|
||||
# the part which can't be updated trough messaging (srpm-build failures).
|
||||
# Please keep it fit `n` slim. We do want rest to be processed elsewhere
|
||||
|
||||
# TODO re-use
|
||||
|
||||
if conf.system == "koji":
|
||||
# we don't do this on behalf of users
|
||||
koji_session = (
|
||||
module_build_service.builder.KojiModuleBuilder.get_session(conf, None))
|
||||
log.info("Querying tasks for statuses:")
|
||||
res = models.ComponentBuild.query.filter_by(state=koji.BUILD_STATES['BUILDING']).options(lazyload('module_build')).all()
|
||||
|
||||
log.info("Checking status for %d tasks." % len(res))
|
||||
for component_build in res:
|
||||
log.debug(component_build.json())
|
||||
if not component_build.task_id: # Don't check tasks which has not been triggered yet
|
||||
continue
|
||||
|
||||
log.info("Checking status of task_id=%s" % component_build.task_id)
|
||||
task_info = koji_session.getTaskInfo(component_build.task_id)
|
||||
|
||||
dead_states = (
|
||||
koji.TASK_STATES['CANCELED'],
|
||||
koji.TASK_STATES['FAILED'],
|
||||
)
|
||||
log.info(" task %r is in state %r" % (component_build.task_id, task_info['state']))
|
||||
if task_info['state'] in dead_states:
|
||||
# Fake a fedmsg message on our internal queue
|
||||
msg = module_build_service.messaging.KojiBuildChange(
|
||||
msg_id='a faked internal message',
|
||||
build_id=component_build.task_id,
|
||||
task_id=component_build.task_id,
|
||||
build_name=component_build.package,
|
||||
build_new_state=koji.BUILD_STATES['FAILED'],
|
||||
build_release=None,
|
||||
build_version=None
|
||||
)
|
||||
self.outgoing_work_queue.put(msg)
|
||||
|
||||
elif conf.system == "copr":
|
||||
# @TODO
|
||||
pass
|
||||
|
||||
elif conf.system == "mock":
|
||||
pass
|
||||
|
||||
def log_summary(self, session):
|
||||
log.info("Current status:")
|
||||
backlog = self.outgoing_work_queue.qsize()
|
||||
log.info(" * internal queue backlog is %i." % backlog)
|
||||
states = sorted(models.BUILD_STATES.items(), key=operator.itemgetter(1))
|
||||
for name, code in states:
|
||||
query = models.ModuleBuild.query.filter_by(state=code)
|
||||
count = query.count()
|
||||
if count:
|
||||
log.info(" * %i module builds in the %s state." % (count, name))
|
||||
if name == 'build':
|
||||
for module_build in query.all():
|
||||
log.info(" * %r" % module_build)
|
||||
for i in range(module_build.batch):
|
||||
n = len([c for c in module_build.component_builds
|
||||
if c.batch == i])
|
||||
log.info(" * %i components in batch %i" % (n, i))
|
||||
|
||||
def process_waiting_module_builds(self, session):
|
||||
log.info("Looking for module builds stuck in the wait state.")
|
||||
builds = models.ModuleBuild.by_state(session, "wait")
|
||||
# TODO -- do throttling calculation here...
|
||||
log.info(" %r module builds in the wait state..." % len(builds))
|
||||
for build in builds:
|
||||
# Fake a message to kickstart the build anew
|
||||
msg = {
|
||||
'topic': '.module.build.state.change',
|
||||
'msg': build.json(),
|
||||
}
|
||||
module_build_service.scheduler.handlers.modules.wait(conf, session, msg)
|
||||
|
||||
def process_open_component_builds(self, session):
|
||||
log.warning("process_open_component_builds is not yet implemented...")
|
||||
|
||||
def process_paused_module_builds(self, config, session):
|
||||
if module_build_service.utils.at_concurrent_component_threshold(
|
||||
config, session):
|
||||
log.debug('Will not attempt to start paused module builds due to '
|
||||
'the concurrent build threshold being met')
|
||||
return
|
||||
# Check to see if module builds that are in build state but don't have
|
||||
# any component builds being built can be worked on
|
||||
for module_build in session.query(models.ModuleBuild).filter_by(
|
||||
state=models.BUILD_STATES['build']).all():
|
||||
# If there are no components in the build state on the module build,
|
||||
# then no possible event will start off new component builds
|
||||
if not module_build.current_batch(koji.BUILD_STATES['BUILDING']):
|
||||
further_work = module_build_service.utils.start_build_batch(
|
||||
config, module_build, session, config.system)
|
||||
for event in further_work:
|
||||
log.info(" Scheduling faked event %r" % event)
|
||||
self.outgoing_work_queue.put(event)
|
||||
|
||||
|
||||
_work_queue = queue.Queue()
|
||||
|
||||
|
||||
def outgoing_work_queue_put(msg):
|
||||
_work_queue.put(msg)
|
||||
|
||||
|
||||
def graceful_stop():
|
||||
"""
|
||||
Here is the place to perform shutdown actions.
|
||||
|
||||
Do whatever is needed to do except for leaving the main thread, which
|
||||
would result in losing control of POSIX signals handling.
|
||||
"""
|
||||
log.warning("graceful_stop is not yet implemented, press Ctrl+C again...")
|
||||
while True:
|
||||
time.sleep(30)
|
||||
|
||||
|
||||
def main(initial_msgs=[], return_after_build=False):
|
||||
log.info("Starting module_build_service_daemon.")
|
||||
|
||||
module_build_service.messaging.init(conf)
|
||||
|
||||
for msg in initial_msgs:
|
||||
outgoing_work_queue_put(msg)
|
||||
|
||||
# This ingest thread puts work on the queue
|
||||
messaging_thread = MessageIngest(_work_queue, return_after_build)
|
||||
# This poller does other work, but also sometimes puts work in queue.
|
||||
polling_thread = Poller(_work_queue)
|
||||
# This worker takes work off the queue and handles it.
|
||||
worker_thread = MessageWorker(_work_queue, return_after_build)
|
||||
|
||||
messaging_thread.start()
|
||||
polling_thread.start()
|
||||
worker_thread.start()
|
||||
|
||||
try:
|
||||
while worker_thread.is_alive():
|
||||
time.sleep(3)
|
||||
except KeyboardInterrupt:
|
||||
log.info("Stopping module_build_service_daemon. Press Ctrl+C again to force.")
|
||||
try:
|
||||
graceful_stop()
|
||||
except KeyboardInterrupt:
|
||||
os._exit(0)
|
||||
finally:
|
||||
polling_thread.stop = True
|
||||
166
module_build_service/scheduler/producer.py
Normal file
166
module_build_service/scheduler/producer.py
Normal file
@@ -0,0 +1,166 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
#
|
||||
|
||||
""" The PollingProducer class that acts as a producer entry point for
|
||||
fedmsg-hub. This class polls the database for tasks to do.
|
||||
"""
|
||||
|
||||
import koji
|
||||
import operator
|
||||
from datetime import timedelta
|
||||
from sqlalchemy.orm import lazyload
|
||||
from moksha.hub.api.producer import PollingProducer
|
||||
|
||||
import module_build_service.messaging
|
||||
import module_build_service.scheduler
|
||||
import module_build_service.scheduler.consumer
|
||||
from module_build_service import conf, models, log
|
||||
|
||||
|
||||
class MBSProducer(PollingProducer):
|
||||
frequency = timedelta(seconds=conf.polling_interval)
|
||||
|
||||
def poll(self):
|
||||
with models.make_session(conf) as session:
|
||||
self.log_summary(session)
|
||||
# XXX: detect whether it's actually stuck first
|
||||
# self.process_waiting_module_builds(session)
|
||||
self.process_open_component_builds(session)
|
||||
self.fail_lost_builds(session)
|
||||
self.process_paused_module_builds(conf, session)
|
||||
|
||||
log.info('Poller will now sleep for "{}" seconds'
|
||||
.format(conf.polling_interval))
|
||||
|
||||
def fail_lost_builds(self, session):
|
||||
# This function is supposed to be handling only the part which can't be
|
||||
# updated through messaging (e.g. srpm-build failures). Please keep it
|
||||
# fit `n` slim. We do want rest to be processed elsewhere
|
||||
# TODO re-use
|
||||
|
||||
if conf.system == 'koji':
|
||||
# We don't do this on behalf of users
|
||||
koji_session = module_build_service.builder.KojiModuleBuilder\
|
||||
.get_session(conf, None)
|
||||
log.info('Querying tasks for statuses:')
|
||||
res = models.ComponentBuild.query.filter_by(
|
||||
state=koji.BUILD_STATES['BUILDING']).options(
|
||||
lazyload('module_build')).all()
|
||||
|
||||
log.info('Checking status for {0} tasks'.format(len(res)))
|
||||
for component_build in res:
|
||||
log.debug(component_build.json())
|
||||
# Don't check tasks which haven't been triggered yet
|
||||
if not component_build.task_id:
|
||||
continue
|
||||
|
||||
log.info('Checking status of task_id "{0}"'
|
||||
.format(component_build.task_id))
|
||||
task_info = koji_session.getTaskInfo(component_build.task_id)
|
||||
|
||||
dead_states = (
|
||||
koji.TASK_STATES['CANCELED'],
|
||||
koji.TASK_STATES['FAILED'],
|
||||
)
|
||||
|
||||
log.info(' task {0!r} is in state {0!r}'.format(
|
||||
component_build.task_id, task_info['state']))
|
||||
if task_info['state'] in dead_states:
|
||||
# Fake a fedmsg message on our internal queue
|
||||
msg = module_build_service.messaging.KojiBuildChange(
|
||||
msg_id='a faked internal message',
|
||||
build_id=component_build.task_id,
|
||||
task_id=component_build.task_id,
|
||||
build_name=component_build.package,
|
||||
build_new_state=koji.BUILD_STATES['FAILED'],
|
||||
build_release=None,
|
||||
build_version=None
|
||||
)
|
||||
module_build_service.scheduler.consumer.work_queue_put(msg)
|
||||
|
||||
elif conf.system == 'copr':
|
||||
# @TODO
|
||||
pass
|
||||
|
||||
elif conf.system == 'mock':
|
||||
pass
|
||||
|
||||
def log_summary(self, session):
|
||||
log.info('Current status:')
|
||||
consumer = module_build_service.scheduler.consumer.get_global_consumer()
|
||||
backlog = consumer.incoming.qsize()
|
||||
log.info(' * internal queue backlog is {0}'.format(backlog))
|
||||
states = sorted(models.BUILD_STATES.items(), key=operator.itemgetter(1))
|
||||
for name, code in states:
|
||||
query = models.ModuleBuild.query.filter_by(state=code)
|
||||
count = query.count()
|
||||
if count:
|
||||
log.info(' * {0} module builds in the {1} state'.format(
|
||||
count, name))
|
||||
if name == 'build':
|
||||
for module_build in query.all():
|
||||
log.info(' * {0!r}'.format(module_build))
|
||||
for i in range(module_build.batch):
|
||||
n = len([c for c in module_build.component_builds
|
||||
if c.batch == i ])
|
||||
log.info(' * {0} components in batch {1}'
|
||||
.format(n, i))
|
||||
|
||||
def process_waiting_module_builds(self, session):
|
||||
log.info('Looking for module builds stuck in the wait state')
|
||||
builds = models.ModuleBuild.by_state(session, 'wait')
|
||||
log.info(' {0!r} module builds in the wait state...'
|
||||
.format(len(builds)))
|
||||
for build in builds:
|
||||
# Fake a message to kickstart the build anew
|
||||
msg = module_build_service.messaging.RidaModule(
|
||||
'fake message',
|
||||
build.id,
|
||||
module_build_service.models.BUILD_STATES['wait']
|
||||
)
|
||||
further_work = module_build_service.scheduler.handlers.modules.wait(
|
||||
conf, session, msg) or []
|
||||
for event in further_work:
|
||||
log.info(" Scheduling faked event %r" % event)
|
||||
module_build_service.scheduler.consumer.work_queue_put(event)
|
||||
|
||||
def process_open_component_builds(self, session):
|
||||
log.warning('process_open_component_builds is not yet implemented...')
|
||||
|
||||
def process_paused_module_builds(self, config, session):
|
||||
if module_build_service.utils.at_concurrent_component_threshold(
|
||||
config, session):
|
||||
log.debug('Will not attempt to start paused module builds due to '
|
||||
'the concurrent build threshold being met')
|
||||
return
|
||||
# Check to see if module builds that are in build state but don't have
|
||||
# any component builds being built can be worked on
|
||||
for module_build in session.query(models.ModuleBuild).filter_by(
|
||||
state=models.BUILD_STATES['build']).all():
|
||||
# If there are no components in the build state on the module build,
|
||||
# then no possible event will start off new component builds
|
||||
if not module_build.current_batch(koji.BUILD_STATES['BUILDING']):
|
||||
further_work = module_build_service.utils.start_build_batch(
|
||||
config, module_build, session, config.system)
|
||||
for event in further_work:
|
||||
log.info(" Scheduling faked event %r" % event)
|
||||
module_build_service.scheduler.consumer.work_queue_put(event)
|
||||
@@ -548,3 +548,11 @@ def scm_url_schemes(terse=False):
|
||||
for scm_type, scm_schemes in scm_types.items():
|
||||
scheme_list.extend([scheme[:-3] for scheme in scm_schemes])
|
||||
return list(set(scheme_list))
|
||||
|
||||
def module_build_state_from_msg(msg):
|
||||
state = int(msg.module_build_state)
|
||||
# TODO better handling
|
||||
assert state in models.BUILD_STATES.values(), (
|
||||
'state=%s(%s) is not in %s'
|
||||
% (state, type(state), list(models.BUILD_STATES.values())))
|
||||
return state
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
#
|
||||
# Written by Petr Šabata <contyk@redhat.com>
|
||||
# Ralph Bean <rbean@redhat.com>
|
||||
"""The module build orchestrator for Modularity, the builder. """
|
||||
|
||||
import module_build_service.scheduler.main
|
||||
|
||||
if __name__ == '__main__':
|
||||
module_build_service.scheduler.main.main()
|
||||
@@ -3,6 +3,8 @@ Flask-Migrate
|
||||
Flask-SQLAlchemy
|
||||
Flask-Script
|
||||
fedmsg
|
||||
moksha.hub
|
||||
psutil
|
||||
funcsigs # Python2 only
|
||||
httplib2
|
||||
kobo
|
||||
|
||||
7
setup.py
7
setup.py
@@ -24,11 +24,12 @@ setup(name='module-build-service',
|
||||
install_requires=requirements,
|
||||
tests_require=test_requirements,
|
||||
entry_points={
|
||||
'console_scripts': ['mbs-daemon = module_build_service.scheduler.main:main',
|
||||
'mbs-upgradedb = module_build_service.manage:upgradedb',
|
||||
'console_scripts': ['mbs-upgradedb = module_build_service.manage:upgradedb',
|
||||
'mbs-gencert = module_build_service.manage:generatelocalhostcert',
|
||||
'mbs-frontend = module_build_service.manage:runssl',
|
||||
'mbs-manager = module_build_service.manage:manager_wrapper']
|
||||
'mbs-manager = module_build_service.manage:manager_wrapper'],
|
||||
'moksha.consumer': 'mbsconsumer = module_build_service.scheduler.consumer:MBSConsumer',
|
||||
'moksha.producer': 'mbspoller = module_build_service.scheduler.producer:MBSProducer',
|
||||
},
|
||||
data_files=[('/etc/module-build-service/', ['conf/cacert.pem',
|
||||
'conf/config.py',
|
||||
|
||||
0
tests/test_build/__init__.py
Normal file
0
tests/test_build/__init__.py
Normal file
@@ -21,31 +21,24 @@
|
||||
# Written by Jan Kaluza <jkaluza@redhat.com>
|
||||
|
||||
import unittest
|
||||
import munch
|
||||
import mock
|
||||
import koji
|
||||
import xmlrpclib
|
||||
from os import path, mkdir
|
||||
from shutil import copyfile
|
||||
|
||||
from nose.tools import timed
|
||||
|
||||
from module_build_service import db
|
||||
|
||||
import module_build_service.messaging
|
||||
import module_build_service.scheduler.handlers.repos
|
||||
from module_build_service import models, conf
|
||||
from module_build_service.utils import submit_module_build
|
||||
from module_build_service import db, models, conf
|
||||
from module_build_service.messaging import RidaModule
|
||||
|
||||
from mock import patch
|
||||
|
||||
from tests import app, init_data
|
||||
from tests import conf as test_conf
|
||||
import json
|
||||
|
||||
from module_build_service.builder import KojiModuleBuilder, GenericBuilder
|
||||
import module_build_service.scheduler.main
|
||||
import module_build_service.scheduler.consumer
|
||||
|
||||
|
||||
class MockedSCM(object):
|
||||
@@ -128,7 +121,7 @@ class TestModuleBuilder(GenericBuilder):
|
||||
msg_id='a faked internal message',
|
||||
repo_tag=self.tag_name + "-build",
|
||||
)
|
||||
module_build_service.scheduler.main.outgoing_work_queue_put(msg)
|
||||
module_build_service.scheduler.consumer.work_queue_put(msg)
|
||||
|
||||
def _send_build_change(self, state, source, build_id):
|
||||
# build_id=1 and task_id=1 are OK here, because we are building just
|
||||
@@ -142,7 +135,7 @@ class TestModuleBuilder(GenericBuilder):
|
||||
build_release="1",
|
||||
build_version="1"
|
||||
)
|
||||
module_build_service.scheduler.main.outgoing_work_queue_put(msg)
|
||||
module_build_service.scheduler.consumer.work_queue_put(msg)
|
||||
|
||||
def build(self, artifact_name, source):
|
||||
print "Starting building artifact %s: %s" % (artifact_name, source)
|
||||
@@ -192,6 +185,13 @@ class TestBuild(unittest.TestCase):
|
||||
conf.set_item("system", "koji")
|
||||
TestModuleBuilder.reset()
|
||||
|
||||
# Necessary to restart the twisted reactor for the next test.
|
||||
import sys
|
||||
del sys.modules['twisted.internet.reactor']
|
||||
del sys.modules['moksha.hub.reactor']
|
||||
del sys.modules['moksha.hub']
|
||||
import moksha.hub.reactor
|
||||
|
||||
@timed(30)
|
||||
@patch('module_build_service.auth.get_username', return_value='Homer J. Simpson')
|
||||
@patch('module_build_service.auth.assert_is_packager')
|
||||
@@ -202,7 +202,7 @@ class TestBuild(unittest.TestCase):
|
||||
Tests the build of testmodule.yaml using TestModuleBuilder which
|
||||
succeeds everytime.
|
||||
"""
|
||||
mocked_scm_obj = MockedSCM(mocked_scm, "testmodule", "testmodule.yaml")
|
||||
MockedSCM(mocked_scm, "testmodule", "testmodule.yaml")
|
||||
|
||||
rv = self.client.post('/module-build-service/1/module-builds/', data=json.dumps(
|
||||
{'scmurl': 'git://pkgs.stg.fedoraproject.org/modules/'
|
||||
@@ -211,9 +211,9 @@ class TestBuild(unittest.TestCase):
|
||||
data = json.loads(rv.data)
|
||||
module_build_id = data['id']
|
||||
|
||||
msgs = []
|
||||
msgs.append(RidaModule("fake msg", 1, 1))
|
||||
module_build_service.scheduler.main.main(msgs, True)
|
||||
msgs = [RidaModule("fake msg", 1, 1)]
|
||||
stop = module_build_service.scheduler.make_simple_stop_condition(db.session)
|
||||
module_build_service.scheduler.main(msgs, stop)
|
||||
|
||||
# All components should be built and module itself should be in "done"
|
||||
# or "ready" state.
|
||||
@@ -230,7 +230,7 @@ class TestBuild(unittest.TestCase):
|
||||
"""
|
||||
Submit all builds for a module and cancel the module build later.
|
||||
"""
|
||||
mocked_scm_obj = MockedSCM(mocked_scm, "testmodule", "testmodule.yaml")
|
||||
MockedSCM(mocked_scm, "testmodule", "testmodule.yaml")
|
||||
|
||||
rv = self.client.post('/module-build-service/1/module-builds/', data=json.dumps(
|
||||
{'scmurl': 'git://pkgs.stg.fedoraproject.org/modules/'
|
||||
@@ -257,9 +257,9 @@ class TestBuild(unittest.TestCase):
|
||||
TestModuleBuilder.on_build_cb = on_build_cb
|
||||
TestModuleBuilder.on_cancel_cb = on_cancel_cb
|
||||
|
||||
msgs = []
|
||||
msgs.append(RidaModule("fake msg", 1, 1))
|
||||
module_build_service.scheduler.main.main(msgs, True)
|
||||
msgs = [RidaModule("fake msg", 1, 1)]
|
||||
stop = module_build_service.scheduler.make_simple_stop_condition(db.session)
|
||||
module_build_service.scheduler.main(msgs, stop)
|
||||
|
||||
# Because we did not finished single component build and canceled the
|
||||
# module build, all components and even the module itself should be in
|
||||
@@ -284,7 +284,7 @@ class TestBuild(unittest.TestCase):
|
||||
Tests the build of testmodule.yaml using TestModuleBuilder which
|
||||
succeeds everytime.
|
||||
"""
|
||||
mocked_scm_obj = MockedSCM(mocked_scm, "testmodule", "testmodule.yaml")
|
||||
MockedSCM(mocked_scm, "testmodule", "testmodule.yaml")
|
||||
|
||||
rv = self.client.post('/module-build-service/1/module-builds/', data=json.dumps(
|
||||
{'scmurl': 'git://pkgs.stg.fedoraproject.org/modules/'
|
||||
@@ -296,9 +296,9 @@ class TestBuild(unittest.TestCase):
|
||||
TestModuleBuilder.BUILD_STATE = "BUILDING"
|
||||
TestModuleBuilder.INSTANT_COMPLETE = True
|
||||
|
||||
msgs = []
|
||||
msgs.append(RidaModule("fake msg", 1, 1))
|
||||
module_build_service.scheduler.main.main(msgs, True)
|
||||
msgs = [RidaModule("fake msg", 1, 1)]
|
||||
stop = module_build_service.scheduler.make_simple_stop_condition(db.session)
|
||||
module_build_service.scheduler.main(msgs, stop)
|
||||
|
||||
# All components should be built and module itself should be in "done"
|
||||
# or "ready" state.
|
||||
|
||||
@@ -1,124 +0,0 @@
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
#
|
||||
# Written by Matt Prahl <mprahl@redhat.com>
|
||||
from __future__ import unicode_literals
|
||||
import unittest
|
||||
import mock
|
||||
from datetime import datetime
|
||||
import module_build_service.messaging
|
||||
|
||||
|
||||
class TestUtilFunctions(unittest.TestCase):
|
||||
|
||||
module_build_service_msg = msg = {
|
||||
'component_builds': [1, 2],
|
||||
'id': 1,
|
||||
'name': 'testmodule',
|
||||
'owner': 'some_user',
|
||||
'release': '16',
|
||||
'scmurl': 'git://domain.local/modules/testmodule.git?#c23acdc',
|
||||
'state': 3,
|
||||
'state_name': 'done',
|
||||
'time_completed': datetime(2016, 9, 1, 2, 30),
|
||||
'time_modified': datetime(2016, 9, 1, 2, 30),
|
||||
'time_submitted': datetime(2016, 9, 1, 2, 28),
|
||||
'version': '4.3.43'
|
||||
}
|
||||
|
||||
@mock.patch('fedmsg.tail_messages')
|
||||
def test_fedmsg_listen_build_msg(self, mock_tail_messages):
|
||||
endpoint = 'tcp://hub.fedoraproject.org:9940'
|
||||
topic = 'org.fedoraproject.prod.buildsys.build.state.change'
|
||||
msg = {
|
||||
'source_name': 'datanommer',
|
||||
'i': 2,
|
||||
'timestamp': 1473252386.0,
|
||||
'msg_id': '2016-e05415d9-9b35-4f13-8b25-0daddeabfb8c',
|
||||
'topic': 'org.fedoraproject.prod.buildsys.build.state.change',
|
||||
'source_version': '1.2.3',
|
||||
'msg': {
|
||||
'build_id': 2345678,
|
||||
'old': None,
|
||||
'name': 'some-package',
|
||||
'task_id': 1234567,
|
||||
'attribute': 'state',
|
||||
'instance': 'arm',
|
||||
'version': '2.1.0',
|
||||
'owner': 'some_owner',
|
||||
'new': 0,
|
||||
'release': '1.fc26'
|
||||
}
|
||||
}
|
||||
mock_tail_messages.side_effect = \
|
||||
lambda: [('fedora-infrastructure', endpoint, topic, msg)]
|
||||
msg_obj = next(module_build_service.messaging._fedmsg_listen(None))
|
||||
self.assertEquals(type(msg_obj), module_build_service.messaging.KojiBuildChange)
|
||||
self.assertEquals(msg_obj.build_id, 2345678)
|
||||
self.assertEquals(msg_obj.task_id, 1234567)
|
||||
self.assertEquals(msg_obj.build_new_state, 0)
|
||||
self.assertEquals(msg_obj.build_name, 'some-package')
|
||||
self.assertEquals(msg_obj.build_version, '2.1.0')
|
||||
self.assertEquals(msg_obj.build_release, '1.fc26')
|
||||
self.assertEquals(msg_obj.msg_id,
|
||||
'2016-e05415d9-9b35-4f13-8b25-0daddeabfb8c')
|
||||
|
||||
@mock.patch('fedmsg.tail_messages')
|
||||
def test_fedmsg_listen_repo_msg(self, mock_tail_messages):
|
||||
endpoint = 'tcp://hub.fedoraproject.org:9940'
|
||||
topic = 'org.fedoraproject.prod.buildsys.repo.done'
|
||||
msg = {
|
||||
'source_name': 'datanommer',
|
||||
'i': 1,
|
||||
'timestamp': 1473252506.0,
|
||||
'msg_id': '2016-e05415d9-9b35-4f13-8b25-0daddeabfb8c',
|
||||
'topic': 'org.fedoraproject.prod.buildsys.repo.done',
|
||||
'source_version': '1.2.0',
|
||||
'msg': {
|
||||
'instance': 'arm',
|
||||
'repo_id': 402102,
|
||||
'tag': 'f23-build',
|
||||
'tag_id': 155
|
||||
}
|
||||
}
|
||||
mock_tail_messages.side_effect = \
|
||||
lambda: [('fedora-infrastructure', endpoint, topic, msg)]
|
||||
msg_obj = next(module_build_service.messaging._fedmsg_listen(None))
|
||||
self.assertEquals(type(msg_obj), module_build_service.messaging.KojiRepoChange)
|
||||
self.assertEquals(msg_obj.repo_tag, 'f23-build')
|
||||
self.assertEquals(msg_obj.msg_id,
|
||||
'2016-e05415d9-9b35-4f13-8b25-0daddeabfb8c')
|
||||
|
||||
@mock.patch('fedmsg.tail_messages')
|
||||
def test_fedmsg_listen_module_build_service_msg(self, mock_tail_messages):
|
||||
endpoint = 'tcp://hub.fedoraproject.org:9940'
|
||||
topic = 'org.fedoraproject.prod.module_build_service.module.state.change'
|
||||
msg = {
|
||||
'msg_id': '2016-e05415d9-9b35-4f13-8b25-0daddeabfb8c',
|
||||
'topic': 'org.fedoraproject.prod.module_build_service.module.state.change',
|
||||
'msg': self.module_build_service_msg
|
||||
}
|
||||
mock_tail_messages.side_effect = \
|
||||
lambda: [('fedora-infrastructure', endpoint, topic, msg)]
|
||||
msg_obj = next(module_build_service.messaging._fedmsg_listen(None))
|
||||
self.assertEquals(msg_obj.module_build_id, msg['msg']['id'])
|
||||
self.assertEquals(msg_obj.module_build_state, msg['msg']['state'])
|
||||
self.assertEquals(msg_obj.msg_id,
|
||||
'2016-e05415d9-9b35-4f13-8b25-0daddeabfb8c')
|
||||
Reference in New Issue
Block a user