mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-06-15 06:27:00 +08:00
Merge #703 Introduce pluggable backends.
This commit is contained in:
@@ -26,9 +26,9 @@
|
||||
|
||||
import imp
|
||||
import os
|
||||
import pkg_resources
|
||||
import re
|
||||
|
||||
from os import sys
|
||||
import sys
|
||||
|
||||
from module_build_service import logger
|
||||
|
||||
@@ -500,9 +500,17 @@ class Config(object):
|
||||
self._log_level = logger.str_to_log_level(level)
|
||||
|
||||
def _setifok_messaging(self, s):
|
||||
""" Validate that the specified messaging backend corresponds with one
|
||||
of the installed plugins. The MBS core provides two such plugins, but
|
||||
a third-party could install another usable one.
|
||||
"""
|
||||
entrypoints = pkg_resources.iter_entry_points('mbs.messaging_backends')
|
||||
installed_backends = [e.name for e in entrypoints]
|
||||
s = str(s)
|
||||
if s not in ("fedmsg", "amq", "in_memory"):
|
||||
raise ValueError("Unsupported messaging system.")
|
||||
if s not in installed_backends:
|
||||
raise ValueError('The messaging plugin for "{0}" is not installed.'
|
||||
' The following are installed: {1}'
|
||||
.format(s, ', '.join(installed_backends)))
|
||||
self._messaging = s
|
||||
|
||||
def _setifok_amq_recv_addresses(self, l):
|
||||
|
||||
@@ -24,9 +24,9 @@
|
||||
|
||||
"""Generic messaging functions."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import pkg_resources
|
||||
|
||||
try:
|
||||
from inspect import signature
|
||||
except ImportError:
|
||||
@@ -81,80 +81,25 @@ class BaseMessage(object):
|
||||
def __json__(self):
|
||||
return dict(msg_id=self.msg_id, topic=self.topic, body=self.body)
|
||||
|
||||
@staticmethod
|
||||
def from_amq(topic, msg):
|
||||
msg_obj = None
|
||||
|
||||
if not hasattr(msg, 'properties'):
|
||||
return None # Unrelated message not identifying service origin
|
||||
properties = json.loads(msg.properties, encoding='utf8')
|
||||
service = properties.get('service')
|
||||
class MessageParser(object):
|
||||
|
||||
if service not in _messaging_backends['amq']['services']:
|
||||
log.debug('Skipping msg due service=%s which is not related (msg=%r): ' % (service, msg))
|
||||
return None
|
||||
def parse(self, msg):
|
||||
raise NotImplementedError()
|
||||
|
||||
# This probably appies only for brew
|
||||
# Also wouldn't be easier to use properties?
|
||||
if service == 'koji':
|
||||
content = json.loads(msg.body, encoding='utf8')['content']
|
||||
log.debug("Found koji related msg: %s" % msg)
|
||||
method = content['info']['method']
|
||||
msg_type = content['info']['type']
|
||||
|
||||
if method == 'newRepo':
|
||||
attr = content['attribute']
|
||||
state = content['info']['new']
|
||||
if attr == "state" and state == "CLOSED":
|
||||
repo_tag = content['info']['request']
|
||||
assert len(repo_tag) == 1
|
||||
msg_obj = KojiRepoChange(msg.id, repo_tag[0])
|
||||
class FedmsgMessageParser(MessageParser):
|
||||
|
||||
elif method == 'build' and msg_type == 'TaskStateChange':
|
||||
attr = content['attribute']
|
||||
if attr == "state":
|
||||
build_id = content['info']['id']
|
||||
# TODO: Someone with AMQ knowledge should check if
|
||||
# info.id is build id or task_id here. For now I presume
|
||||
# it is task_id.
|
||||
task_id = content['info']['id']
|
||||
build_state = content['new']
|
||||
# These are not available before build is assigned
|
||||
build_name = None
|
||||
build_version = None
|
||||
build_release = None
|
||||
nvr_req = set(['name', 'version', 'release'])
|
||||
if nvr_req.issubset(set(content['info'].keys())):
|
||||
build_name = content['info']['name']
|
||||
build_version = content['info']['version']
|
||||
build_release = content['info']['release']
|
||||
|
||||
msg_obj = KojiBuildChange(
|
||||
msg.id, build_id, task_id, build_state, build_name,
|
||||
build_version, build_release)
|
||||
|
||||
elif service == 'mbs':
|
||||
log.debug("Found mbs related msg: %s" % msg)
|
||||
body = json.loads(msg.body, encoding='utf8')
|
||||
if topic == 'module.state.change':
|
||||
msg_obj = MBSModule(
|
||||
msg.id, body['id'], body['state'])
|
||||
|
||||
if msg_obj:
|
||||
return msg_obj
|
||||
|
||||
log.debug('Skipping unrecognized message: %s' % msg)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def from_fedmsg(topic, msg):
|
||||
def parse(self, msg):
|
||||
"""
|
||||
Takes a fedmsg topic and message and converts it to a message object
|
||||
:param topic: the topic of the fedmsg message
|
||||
:param msg: the message contents from the fedmsg message
|
||||
:return: an object of BaseMessage descent if the message is a type
|
||||
that the app looks for, otherwise None is returned
|
||||
"""
|
||||
if 'body' in msg:
|
||||
msg = msg['body']
|
||||
topic = msg['topic']
|
||||
topic_categories = _messaging_backends['fedmsg']['services']
|
||||
categories_re = '|'.join(map(re.escape, topic_categories))
|
||||
regex_pattern = re.compile(
|
||||
@@ -347,7 +292,8 @@ def publish(topic, msg, conf, service):
|
||||
try:
|
||||
handler = _messaging_backends[conf.messaging]['publish']
|
||||
except KeyError:
|
||||
raise KeyError("No messaging backend found for %r" % conf.messaging)
|
||||
raise KeyError("No messaging backend found for %r in %r" % (
|
||||
conf.messaging, _messaging_backends.keys()))
|
||||
return handler(topic, msg, conf, service)
|
||||
|
||||
|
||||
@@ -371,10 +317,11 @@ def _in_memory_publish(topic, msg, conf, service):
|
||||
# Create fake fedmsg from the message so we can reuse
|
||||
# the BaseMessage.from_fedmsg code to get the particular BaseMessage
|
||||
# class instance.
|
||||
wrapped_msg = BaseMessage.from_fedmsg(
|
||||
service + "." + topic,
|
||||
{"msg_id": str(_in_memory_msg_id), "msg": msg},
|
||||
)
|
||||
wrapped_msg = FedmsgMessageParser().parse({
|
||||
"msg_id": str(_in_memory_msg_id),
|
||||
"topic": service + "." + topic,
|
||||
"msg": msg,
|
||||
})
|
||||
|
||||
# Put the message to queue.
|
||||
from module_build_service.scheduler.consumer import work_queue_put
|
||||
@@ -390,60 +337,27 @@ def _in_memory_publish(topic, msg, conf, service):
|
||||
_initial_messages.append(wrapped_msg)
|
||||
|
||||
|
||||
def _amq_get_messenger(conf):
|
||||
import proton
|
||||
for attr in ('amq_private_key_file', 'amq_trusted_cert_file', 'amq_cert_file'):
|
||||
val = getattr(conf, attr)
|
||||
log.debug('Checking config.%s=%s' % (attr, val))
|
||||
assert os.path.exists(val), 'config.%s=%s file does not exist' % (attr, val)
|
||||
|
||||
for attr in ('amq_recv_addresses', 'amq_dest_address'):
|
||||
val = getattr(conf, attr)
|
||||
log.debug('Checking config.%s=%s' % (attr, val))
|
||||
# list values
|
||||
if isinstance(val, (list, tuple)):
|
||||
assert val, 'config.%s is not supposed to be empty' % attr
|
||||
# individual urls
|
||||
for v in val:
|
||||
assert v and '://' in v, 'config.%s: value "%s" does not seem like a valid url' % (attr, val)
|
||||
# string values
|
||||
else:
|
||||
assert val and '://' in val, 'config.%s: value "%s" does not seem like a valid url' % (attr, val)
|
||||
|
||||
msngr = proton.Messenger()
|
||||
msngr.certificate = conf.amq_cert_file
|
||||
msngr.private_key = conf.amq_private_key_file
|
||||
msngr.trusted_certificates = conf.amq_trusted_cert_file
|
||||
msngr.start()
|
||||
for url in conf.amq_recv_addresses:
|
||||
msngr.subscribe(url)
|
||||
log.debug('proton.Messenger: Subscribing to address=%s' % url)
|
||||
return msngr
|
||||
|
||||
|
||||
def _amq_publish(topic, msg, conf, service):
|
||||
import proton
|
||||
msngr = _amq_get_messenger(conf)
|
||||
message = proton.Message()
|
||||
message.address = conf.amq_dest_address
|
||||
message.subject = topic
|
||||
message.properties['service'] = service
|
||||
message.content = json.dumps(msg, ensure_ascii=False).encode('utf8')
|
||||
msngr.put(message)
|
||||
msngr.send()
|
||||
|
||||
|
||||
_messaging_backends = {
|
||||
'fedmsg': {
|
||||
'publish': _fedmsg_publish,
|
||||
'services': ['buildsys', 'mbs', 'copr']
|
||||
},
|
||||
'amq': {
|
||||
'publish': _amq_publish,
|
||||
'services': ['koji', 'mbs']
|
||||
},
|
||||
'in_memory': {
|
||||
'publish': _in_memory_publish,
|
||||
'services': []
|
||||
}
|
||||
_fedmsg_backend = {
|
||||
'publish': _fedmsg_publish,
|
||||
'services': ['buildsys', 'mbs', 'copr'],
|
||||
'parser': FedmsgMessageParser(),
|
||||
'topic_suffix': '.',
|
||||
}
|
||||
_in_memory_backend = {
|
||||
'publish': _in_memory_publish,
|
||||
'services': [],
|
||||
'parser': FedmsgMessageParser(), # re-used. :)
|
||||
'topic_suffix': '.',
|
||||
}
|
||||
|
||||
|
||||
_messaging_backends = {}
|
||||
for entrypoint in pkg_resources.iter_entry_points('mbs.messaging_backends'):
|
||||
_messaging_backends[entrypoint.name] = ep = entrypoint.load()
|
||||
required = ['publish', 'services', 'parser', 'topic_suffix']
|
||||
if any([key not in ep for key in required]):
|
||||
raise ValueError('messaging backend %r is malformed: %r' % (
|
||||
entrypoint.name, ep))
|
||||
|
||||
if not _messaging_backends:
|
||||
raise ValueError("No messaging plugins are installed or available.")
|
||||
|
||||
@@ -25,9 +25,17 @@ This class reads and processes messages from the message bus it is configured
|
||||
to use.
|
||||
"""
|
||||
|
||||
import koji
|
||||
import inspect
|
||||
import itertools
|
||||
|
||||
try:
|
||||
# python3
|
||||
import queue
|
||||
except ImportError:
|
||||
# python2
|
||||
import Queue as queue
|
||||
|
||||
import koji
|
||||
import fedmsg.consumers
|
||||
import moksha.hub
|
||||
|
||||
@@ -44,13 +52,6 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
""" This is triggered by running fedmsg-hub. This class is responsible for
|
||||
ingesting and processing messages from the message bus.
|
||||
"""
|
||||
topic = ['{}.{}.'.format(pref.rstrip('.'), cat)
|
||||
for pref, cat
|
||||
in itertools.product(conf.messaging_topic_prefix,
|
||||
module_build_service.messaging._messaging_backends[conf.messaging]['services'])]
|
||||
if not topic:
|
||||
topic = '*'
|
||||
log.debug('Setting topics: {}'.format(', '.join(topic)))
|
||||
config_key = 'mbsconsumer'
|
||||
|
||||
# It is set to the id of currently handled module build. It is used to
|
||||
@@ -59,8 +60,29 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
current_module_build_id = None
|
||||
|
||||
def __init__(self, hub):
|
||||
# Topic setting needs to be done *before* the call to `super`.
|
||||
|
||||
backends = module_build_service.messaging._messaging_backends
|
||||
prefixes = conf.messaging_topic_prefix # This is a list.
|
||||
services = backends[conf.messaging]['services']
|
||||
suffix = backends[conf.messaging]['topic_suffix']
|
||||
self.topic = [
|
||||
'{}.{}{}'.format(prefix.rstrip('.'), category, suffix)
|
||||
for prefix, category in itertools.product(prefixes, services)
|
||||
]
|
||||
if not self.topic:
|
||||
self.topic = '*'
|
||||
log.debug('Setting topics: {}'.format(', '.join(self.topic)))
|
||||
|
||||
# The call to `super` takes action based on the setting of topics above
|
||||
super(MBSConsumer, self).__init__(hub)
|
||||
|
||||
# Our call to `super` above should have initialized an `incoming` queue
|
||||
# for us.. but in certain test situations, it does not. So here,
|
||||
# establish a fake `incoming` queue.
|
||||
if not hasattr(self, 'incoming'):
|
||||
self.incoming = queue.Queue()
|
||||
|
||||
# These two values are typically provided either by the unit tests or
|
||||
# by the local build command. They are empty in the production environ
|
||||
self.stop_condition = hub.config.get('mbsconsumer.stop_condition')
|
||||
@@ -131,7 +153,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
if isinstance(message, module_build_service.messaging.BaseMessage):
|
||||
msg = message
|
||||
else:
|
||||
msg = self.get_abstracted_msg(message['body'])
|
||||
msg = self.get_abstracted_msg(message)
|
||||
|
||||
# Primary work is done here.
|
||||
try:
|
||||
@@ -144,17 +166,13 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
self.shutdown()
|
||||
|
||||
def get_abstracted_msg(self, message):
|
||||
# Convert the message to an abstracted message
|
||||
if conf.messaging == 'fedmsg':
|
||||
msg = module_build_service.messaging.BaseMessage.from_fedmsg(
|
||||
message['topic'], message)
|
||||
elif conf.messaging == 'amq':
|
||||
msg = module_build_service.messaging.BaseMessage.from_amq(
|
||||
message['topic'], message)
|
||||
parser = module_build_service.messaging.\
|
||||
_messaging_backends[conf.messaging].get('parser')
|
||||
if parser:
|
||||
return parser.parse(message)
|
||||
else:
|
||||
raise ValueError('The messaging format "{0}" is not supported'
|
||||
raise ValueError('{0} backend does not define a message parser'
|
||||
.format(conf.messaging))
|
||||
return msg
|
||||
|
||||
def sanity_check(self):
|
||||
""" On startup, make sure our implementation is sane. """
|
||||
|
||||
5
setup.py
5
setup.py
@@ -30,6 +30,11 @@ setup(name='module-build-service',
|
||||
'mbs-manager = module_build_service.manage:manager_wrapper'],
|
||||
'moksha.consumer': 'mbsconsumer = module_build_service.scheduler.consumer:MBSConsumer',
|
||||
'moksha.producer': 'mbspoller = module_build_service.scheduler.producer:MBSProducer',
|
||||
'mbs.messaging_backends': [
|
||||
'fedmsg = module_build_service.messaging:_fedmsg_backend',
|
||||
'in_memory = module_build_service.messaging:_in_memory_backend',
|
||||
#'custom = your_organization:_custom_backend',
|
||||
]
|
||||
},
|
||||
scripts=["contrib/mbs-build"],
|
||||
data_files=[('/etc/module-build-service/', ['conf/cacert.pem',
|
||||
|
||||
@@ -23,9 +23,9 @@
|
||||
|
||||
import unittest
|
||||
from module_build_service import messaging
|
||||
from module_build_service.messaging import KojiRepoChange
|
||||
from mock import patch, PropertyMock
|
||||
|
||||
|
||||
class TestFedmsgMessaging(unittest.TestCase):
|
||||
|
||||
def test_buildsys_state_change(self):
|
||||
@@ -48,8 +48,7 @@ class TestFedmsgMessaging(unittest.TestCase):
|
||||
'topic': 'org.fedoraproject.prod.buildsys.build.state.change'
|
||||
}
|
||||
|
||||
topic = 'org.fedoraproject.prod.buildsys.build.state.change'
|
||||
msg = messaging.BaseMessage.from_fedmsg(topic, buildsys_state_change_msg)
|
||||
msg = messaging.FedmsgMessageParser().parse(buildsys_state_change_msg)
|
||||
|
||||
self.assertEqual(msg.build_id, 614503)
|
||||
self.assertEqual(msg.build_new_state, 1)
|
||||
@@ -78,8 +77,7 @@ class TestFedmsgMessaging(unittest.TestCase):
|
||||
'username': 'copr'
|
||||
}
|
||||
|
||||
topic = 'org.fedoraproject.prod.copr.build.end'
|
||||
msg = messaging.BaseMessage.from_fedmsg(topic, copr_build_end_msg)
|
||||
msg = messaging.FedmsgMessageParser().parse(copr_build_end_msg)
|
||||
self.assertIsInstance(msg, messaging.KojiBuildChange)
|
||||
self.assertEqual(msg.msg_id, '2013-b05a323d-37ee-4396-9635-7b5dfaf5441b')
|
||||
self.assertEqual(msg.build_id, 100)
|
||||
@@ -110,8 +108,7 @@ class TestFedmsgMessaging(unittest.TestCase):
|
||||
'topic': 'org.fedoraproject.prod.buildsys.tag'
|
||||
}
|
||||
|
||||
topic = 'org.fedoraproject.prod.buildsys.tag'
|
||||
msg = messaging.BaseMessage.from_fedmsg(topic, buildsys_tag_msg)
|
||||
msg = messaging.FedmsgMessageParser().parse(buildsys_tag_msg)
|
||||
|
||||
self.assertEqual(msg.tag, "module-debugging-tools-master-20170405115403-build")
|
||||
self.assertEqual(msg.artifact, "module-build-macros")
|
||||
@@ -130,7 +127,6 @@ class TestFedmsgMessaging(unittest.TestCase):
|
||||
'topic': 'org.fedoraproject.prod.buildsys.repo.done'
|
||||
}
|
||||
|
||||
topic = 'org.fedoraproject.prod.buildsys.repo.done'
|
||||
msg = messaging.BaseMessage.from_fedmsg(topic, buildsys_tag_msg)
|
||||
msg = messaging.FedmsgMessageParser().parse(buildsys_tag_msg)
|
||||
|
||||
self.assertEqual(msg.repo_tag, "module-f0f7e44f3c6cccab-build")
|
||||
|
||||
100
tests/test_scheduler/test_consumer.py
Normal file
100
tests/test_scheduler/test_consumer.py
Normal file
@@ -0,0 +1,100 @@
|
||||
# Copyright (c) 2017 Red Hat, Inc.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
import unittest
|
||||
from mock import patch, MagicMock
|
||||
from module_build_service.scheduler.consumer import MBSConsumer
|
||||
from module_build_service.messaging import KojiTagChange, KojiRepoChange
|
||||
|
||||
class TestConsumer(unittest.TestCase):
|
||||
|
||||
@patch('module_build_service.messaging.conf.messaging', new='fedmsg')
|
||||
def test_get_abstracted_msg_fedmsg(self):
|
||||
"""
|
||||
Test the output of get_abstracted_msg() when using the
|
||||
fedmsg backend.
|
||||
"""
|
||||
hub = MagicMock(config={})
|
||||
consumer = MBSConsumer(hub)
|
||||
msg = {
|
||||
"username": "apache",
|
||||
"source_name": "datanommer",
|
||||
"i": 1,
|
||||
"timestamp": 1505492681.0,
|
||||
"msg_id": "2017-0627b798-f241-4230-b365-8a8a111a8ec5",
|
||||
"crypto": "x509",
|
||||
"topic": "org.fedoraproject.prod.buildsys.tag",
|
||||
"headers": {},
|
||||
"source_version": "0.8.1",
|
||||
"msg": {
|
||||
"build_id": 962861,
|
||||
"name": "python3-virtualenv",
|
||||
"tag_id": 263,
|
||||
"instance": "primary",
|
||||
"tag": "epel7-pending",
|
||||
"user": "bodhi",
|
||||
"version": "15.1.0",
|
||||
"owner": "orion",
|
||||
"release": "1.el7"
|
||||
}
|
||||
}
|
||||
msg_obj = consumer.get_abstracted_msg(msg)
|
||||
self.assertIsInstance(msg_obj, KojiTagChange)
|
||||
self.assertEqual(msg_obj.msg_id, msg['msg_id'])
|
||||
self.assertEqual(msg_obj.tag, msg['msg']['tag'])
|
||||
self.assertEqual(msg_obj.artifact, msg['msg']['name'])
|
||||
|
||||
@patch('module_build_service.scheduler.consumer.models')
|
||||
@patch.object(MBSConsumer, 'process_message')
|
||||
@patch('module_build_service.messaging.conf.messaging', new='fedmsg')
|
||||
def test_consume_fedmsg(self, process_message, models):
|
||||
"""
|
||||
Test the MBSConsumer.consume() method when using the
|
||||
fedmsg backend.
|
||||
"""
|
||||
hub = MagicMock(config={})
|
||||
consumer = MBSConsumer(hub)
|
||||
msg = {
|
||||
"topic": "org.fedoraproject.prod.buildsys.repo.done",
|
||||
"headers": {},
|
||||
"body": {
|
||||
"username": "apache",
|
||||
"source_name": "datanommer",
|
||||
"i": 1,
|
||||
"timestamp": 1405126329.0,
|
||||
"msg_id": "2014-adbc33f6-51b0-4fce-aa0d-3c699a9920e4",
|
||||
"crypto": "x509",
|
||||
"topic": "org.fedoraproject.prod.buildsys.repo.done",
|
||||
"headers": {},
|
||||
"source_version": "0.6.4",
|
||||
"msg": {
|
||||
"instance": "primary",
|
||||
"repo_id": 400859,
|
||||
"tag": "f22-build",
|
||||
"tag_id": 278
|
||||
}
|
||||
}
|
||||
}
|
||||
consumer.consume(msg)
|
||||
self.assertEqual(process_message.call_count, 1)
|
||||
msg_obj = process_message.call_args[0][1]
|
||||
self.assertIsInstance(msg_obj, KojiRepoChange)
|
||||
self.assertEqual(msg_obj.msg_id, msg['body']['msg_id'])
|
||||
self.assertEqual(msg_obj.repo_tag, msg['body']['msg']['tag'])
|
||||
Reference in New Issue
Block a user