From 5e08d2ffe4d21f6f43ae009808114baa751d2daa Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Tue, 19 Sep 2017 13:09:07 -0400 Subject: [PATCH] Introduce pluggable backends. This allows an operator to write a custom messaging backend to handle whatever transport and format. --- module_build_service/config.py | 15 +- module_build_service/messaging.py | 166 +++++---------------- module_build_service/scheduler/consumer.py | 46 +++--- setup.py | 5 + tests/test_messaging.py | 14 +- tests/test_scheduler/test_consumer.py | 2 + 6 files changed, 92 insertions(+), 156 deletions(-) diff --git a/module_build_service/config.py b/module_build_service/config.py index d2a4ef0a..c50deefa 100644 --- a/module_build_service/config.py +++ b/module_build_service/config.py @@ -26,9 +26,9 @@ import imp import os +import pkg_resources import re - -from os import sys +import sys from module_build_service import logger @@ -500,9 +500,16 @@ class Config(object): self._log_level = logger.str_to_log_level(level) def _setifok_messaging(self, s): + """ Validate that the specified messaging backend corresponds with one + of the installed plugins. The MBS core provides two such plugins, but + a third-party could install another usable one. + """ + entrypoints = pkg_resources.iter_entry_points('mbs.messaging_backends') + installed_backends = [e.name for e in entrypoints] s = str(s) - if s not in ("fedmsg", "amq", "in_memory"): - raise ValueError("Unsupported messaging system.") + if s not in installed_backends: + raise ValueError("Uninstalled messaging system. %r not in %r" % ( + s, installed_backends)) self._messaging = s def _setifok_amq_recv_addresses(self, l): diff --git a/module_build_service/messaging.py b/module_build_service/messaging.py index 1ef10d79..dbef0ede 100644 --- a/module_build_service/messaging.py +++ b/module_build_service/messaging.py @@ -24,9 +24,9 @@ """Generic messaging functions.""" -import json -import os import re +import pkg_resources + try: from inspect import signature except ImportError: @@ -81,80 +81,25 @@ class BaseMessage(object): def __json__(self): return dict(msg_id=self.msg_id, topic=self.topic, body=self.body) - @staticmethod - def from_amq(topic, msg): - msg_obj = None - if not hasattr(msg, 'properties'): - return None # Unrelated message not identifying service origin - properties = json.loads(msg.properties, encoding='utf8') - service = properties.get('service') +class MessageParser(object): - if service not in _messaging_backends['amq']['services']: - log.debug('Skipping msg due service=%s which is not related (msg=%r): ' % (service, msg)) - return None + def parse(self, msg): + raise NotImplementedError() - # This probably appies only for brew - # Also wouldn't be easier to use properties? - if service == 'koji': - content = json.loads(msg.body, encoding='utf8')['content'] - log.debug("Found koji related msg: %s" % msg) - method = content['info']['method'] - msg_type = content['info']['type'] - if method == 'newRepo': - attr = content['attribute'] - state = content['info']['new'] - if attr == "state" and state == "CLOSED": - repo_tag = content['info']['request'] - assert len(repo_tag) == 1 - msg_obj = KojiRepoChange(msg.id, repo_tag[0]) +class FedmsgMessageParser(MessageParser): - elif method == 'build' and msg_type == 'TaskStateChange': - attr = content['attribute'] - if attr == "state": - build_id = content['info']['id'] - # TODO: Someone with AMQ knowledge should check if - # info.id is build id or task_id here. For now I presume - # it is task_id. - task_id = content['info']['id'] - build_state = content['new'] - # These are not available before build is assigned - build_name = None - build_version = None - build_release = None - nvr_req = set(['name', 'version', 'release']) - if nvr_req.issubset(set(content['info'].keys())): - build_name = content['info']['name'] - build_version = content['info']['version'] - build_release = content['info']['release'] - - msg_obj = KojiBuildChange( - msg.id, build_id, task_id, build_state, build_name, - build_version, build_release) - - elif service == 'mbs': - log.debug("Found mbs related msg: %s" % msg) - body = json.loads(msg.body, encoding='utf8') - if topic == 'module.state.change': - msg_obj = MBSModule( - msg.id, body['id'], body['state']) - - if msg_obj: - return msg_obj - - log.debug('Skipping unrecognized message: %s' % msg) - return None - - @staticmethod - def from_fedmsg(topic, msg): + def parse(self, msg): """ Takes a fedmsg topic and message and converts it to a message object - :param topic: the topic of the fedmsg message :param msg: the message contents from the fedmsg message :return: an object of BaseMessage descent if the message is a type that the app looks for, otherwise None is returned """ + if 'body' in msg: + msg = msg['body'] + topic = msg['topic'] topic_categories = _messaging_backends['fedmsg']['services'] categories_re = '|'.join(map(re.escape, topic_categories)) regex_pattern = re.compile( @@ -347,7 +292,8 @@ def publish(topic, msg, conf, service): try: handler = _messaging_backends[conf.messaging]['publish'] except KeyError: - raise KeyError("No messaging backend found for %r" % conf.messaging) + raise KeyError("No messaging backend found for %r in %r" % ( + conf.messaging, _messaging_backends.keys())) return handler(topic, msg, conf, service) @@ -371,10 +317,11 @@ def _in_memory_publish(topic, msg, conf, service): # Create fake fedmsg from the message so we can reuse # the BaseMessage.from_fedmsg code to get the particular BaseMessage # class instance. - wrapped_msg = BaseMessage.from_fedmsg( - service + "." + topic, - {"msg_id": str(_in_memory_msg_id), "msg": msg}, - ) + wrapped_msg = FedmsgMessageParser().parse({ + "msg_id": str(_in_memory_msg_id), + "topic": service + "." + topic, + "msg": msg, + }) # Put the message to queue. from module_build_service.scheduler.consumer import work_queue_put @@ -390,60 +337,27 @@ def _in_memory_publish(topic, msg, conf, service): _initial_messages.append(wrapped_msg) -def _amq_get_messenger(conf): - import proton - for attr in ('amq_private_key_file', 'amq_trusted_cert_file', 'amq_cert_file'): - val = getattr(conf, attr) - log.debug('Checking config.%s=%s' % (attr, val)) - assert os.path.exists(val), 'config.%s=%s file does not exist' % (attr, val) - - for attr in ('amq_recv_addresses', 'amq_dest_address'): - val = getattr(conf, attr) - log.debug('Checking config.%s=%s' % (attr, val)) - # list values - if isinstance(val, (list, tuple)): - assert val, 'config.%s is not supposed to be empty' % attr - # individual urls - for v in val: - assert v and '://' in v, 'config.%s: value "%s" does not seem like a valid url' % (attr, val) - # string values - else: - assert val and '://' in val, 'config.%s: value "%s" does not seem like a valid url' % (attr, val) - - msngr = proton.Messenger() - msngr.certificate = conf.amq_cert_file - msngr.private_key = conf.amq_private_key_file - msngr.trusted_certificates = conf.amq_trusted_cert_file - msngr.start() - for url in conf.amq_recv_addresses: - msngr.subscribe(url) - log.debug('proton.Messenger: Subscribing to address=%s' % url) - return msngr - - -def _amq_publish(topic, msg, conf, service): - import proton - msngr = _amq_get_messenger(conf) - message = proton.Message() - message.address = conf.amq_dest_address - message.subject = topic - message.properties['service'] = service - message.content = json.dumps(msg, ensure_ascii=False).encode('utf8') - msngr.put(message) - msngr.send() - - -_messaging_backends = { - 'fedmsg': { - 'publish': _fedmsg_publish, - 'services': ['buildsys', 'mbs', 'copr'] - }, - 'amq': { - 'publish': _amq_publish, - 'services': ['koji', 'mbs'] - }, - 'in_memory': { - 'publish': _in_memory_publish, - 'services': [] - } +_fedmsg_backend = { + 'publish': _fedmsg_publish, + 'services': ['buildsys', 'mbs', 'copr'], + 'parser': FedmsgMessageParser(), + 'topic_suffix': '.', } +_in_memory_backend = { + 'publish': _in_memory_publish, + 'services': [], + 'parser': FedmsgMessageParser(), # re-used. :) + 'topic_suffix': '.', +} + + +_messaging_backends = {} +for entrypoint in pkg_resources.iter_entry_points('mbs.messaging_backends'): + _messaging_backends[entrypoint.name] = ep = entrypoint.load() + required = ['publish', 'services', 'parser'] + if any([key not in ep for key in required]): + raise ValueError('messaging backend %r is malformed: %r' % ( + entrypoint.name, ep)) + +if not _messaging_backends: + raise ValueError("No messaging plugins are installed or available.") diff --git a/module_build_service/scheduler/consumer.py b/module_build_service/scheduler/consumer.py index 783aeb26..ca2be10e 100644 --- a/module_build_service/scheduler/consumer.py +++ b/module_build_service/scheduler/consumer.py @@ -25,9 +25,11 @@ This class reads and processes messages from the message bus it is configured to use. """ -import koji import inspect import itertools +import queue + +import koji import fedmsg.consumers import moksha.hub @@ -44,13 +46,6 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): """ This is triggered by running fedmsg-hub. This class is responsible for ingesting and processing messages from the message bus. """ - topic = ['{}.{}.'.format(pref.rstrip('.'), cat) - for pref, cat - in itertools.product(conf.messaging_topic_prefix, - module_build_service.messaging._messaging_backends[conf.messaging]['services'])] - if not topic: - topic = '*' - log.debug('Setting topics: {}'.format(', '.join(topic))) config_key = 'mbsconsumer' # It is set to the id of currently handled module build. It is used to @@ -59,8 +54,29 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): current_module_build_id = None def __init__(self, hub): + # Topic setting needs to be done *before* the call to `super`. + + backends = module_build_service.messaging._messaging_backends + prefixes = conf.messaging_topic_prefix # This is a list. + services = backends[conf.messaging]['services'] + suffix = backends[conf.messaging]['topic_suffix'] + self.topic = [ + '{}.{}{}'.format(prefix.rstrip('.'), category, suffix) + for prefix, category in itertools.product(prefixes, services) + ] + if not self.topic: + self.topic = '*' + log.debug('Setting topics: {}'.format(', '.join(self.topic))) + + # The call to `super` takes action based on the setting of topics above super(MBSConsumer, self).__init__(hub) + # Our call to `super` above should have initialized an `incoming` queue + # for us.. but in certain test situations, it does not. So here, + # establish a fake `incoming` queue. + if not hasattr(self, 'incoming'): + self.incoming = queue.Queue() + # These two values are typically provided either by the unit tests or # by the local build command. They are empty in the production environ self.stop_condition = hub.config.get('mbsconsumer.stop_condition') @@ -144,17 +160,13 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): self.shutdown() def get_abstracted_msg(self, message): - # Convert the message to an abstracted message - if conf.messaging == 'fedmsg': - msg = module_build_service.messaging.BaseMessage.from_fedmsg( - message['body']['topic'], message['body']) - elif conf.messaging == 'amq': - msg = module_build_service.messaging.BaseMessage.from_amq( - message['body']['topic'], message['body']) + parser = module_build_service.messaging.\ + _messaging_backends[conf.messaging].get('parser') + if parser: + return parser.parse(message) else: - raise ValueError('The messaging format "{0}" is not supported' + raise ValueError('{0} backend does not define a message parser' .format(conf.messaging)) - return msg def sanity_check(self): """ On startup, make sure our implementation is sane. """ diff --git a/setup.py b/setup.py index 3ad9ed42..28515652 100644 --- a/setup.py +++ b/setup.py @@ -30,6 +30,11 @@ setup(name='module-build-service', 'mbs-manager = module_build_service.manage:manager_wrapper'], 'moksha.consumer': 'mbsconsumer = module_build_service.scheduler.consumer:MBSConsumer', 'moksha.producer': 'mbspoller = module_build_service.scheduler.producer:MBSProducer', + 'mbs.messaging_backends': [ + 'fedmsg = module_build_service.messaging:_fedmsg_backend', + 'in_memory = module_build_service.messaging:_in_memory_backend', + #'custom = your_organization._custom_backend', + ] }, scripts=["contrib/mbs-build"], data_files=[('/etc/module-build-service/', ['conf/cacert.pem', diff --git a/tests/test_messaging.py b/tests/test_messaging.py index 042e2408..0f36a5d5 100644 --- a/tests/test_messaging.py +++ b/tests/test_messaging.py @@ -23,9 +23,9 @@ import unittest from module_build_service import messaging +from module_build_service.messaging import KojiRepoChange from mock import patch, PropertyMock - class TestFedmsgMessaging(unittest.TestCase): def test_buildsys_state_change(self): @@ -48,8 +48,7 @@ class TestFedmsgMessaging(unittest.TestCase): 'topic': 'org.fedoraproject.prod.buildsys.build.state.change' } - topic = 'org.fedoraproject.prod.buildsys.build.state.change' - msg = messaging.BaseMessage.from_fedmsg(topic, buildsys_state_change_msg) + msg = messaging.FedmsgMessageParser().parse(buildsys_state_change_msg) self.assertEqual(msg.build_id, 614503) self.assertEqual(msg.build_new_state, 1) @@ -78,8 +77,7 @@ class TestFedmsgMessaging(unittest.TestCase): 'username': 'copr' } - topic = 'org.fedoraproject.prod.copr.build.end' - msg = messaging.BaseMessage.from_fedmsg(topic, copr_build_end_msg) + msg = messaging.FedmsgMessageParser().parse(copr_build_end_msg) self.assertIsInstance(msg, messaging.KojiBuildChange) self.assertEqual(msg.msg_id, '2013-b05a323d-37ee-4396-9635-7b5dfaf5441b') self.assertEqual(msg.build_id, 100) @@ -110,8 +108,7 @@ class TestFedmsgMessaging(unittest.TestCase): 'topic': 'org.fedoraproject.prod.buildsys.tag' } - topic = 'org.fedoraproject.prod.buildsys.tag' - msg = messaging.BaseMessage.from_fedmsg(topic, buildsys_tag_msg) + msg = messaging.FedmsgMessageParser().parse(buildsys_tag_msg) self.assertEqual(msg.tag, "module-debugging-tools-master-20170405115403-build") self.assertEqual(msg.artifact, "module-build-macros") @@ -130,7 +127,6 @@ class TestFedmsgMessaging(unittest.TestCase): 'topic': 'org.fedoraproject.prod.buildsys.repo.done' } - topic = 'org.fedoraproject.prod.buildsys.repo.done' - msg = messaging.BaseMessage.from_fedmsg(topic, buildsys_tag_msg) + msg = messaging.FedmsgMessageParser().parse(buildsys_tag_msg) self.assertEqual(msg.repo_tag, "module-f0f7e44f3c6cccab-build") diff --git a/tests/test_scheduler/test_consumer.py b/tests/test_scheduler/test_consumer.py index fcbd16be..60ca2343 100644 --- a/tests/test_scheduler/test_consumer.py +++ b/tests/test_scheduler/test_consumer.py @@ -72,6 +72,8 @@ class TestConsumer(unittest.TestCase): hub = MagicMock(config={}) consumer = MBSConsumer(hub) msg = { + "topic": "org.fedoraproject.prod.buildsys.repo.done", + "headers": {}, "body": { "username": "apache", "source_name": "datanommer",