From 34ef9ce5fd05aa12cbf2b12d9f0d1e12f6494c94 Mon Sep 17 00:00:00 2001 From: Filip Valder Date: Thu, 12 Jan 2017 15:45:57 +0100 Subject: [PATCH 1/2] Add support for message topic prefixes + explicit services/categories (incl. code lints) --- conf/config.py | 3 +++ module_build_service/config.py | 4 ++++ module_build_service/messaging.py | 17 ++++++++++++----- module_build_service/scheduler/consumer.py | 8 +++++++- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/conf/config.py b/conf/config.py index f77076c5..97eed6e0 100644 --- a/conf/config.py +++ b/conf/config.py @@ -25,6 +25,7 @@ class BaseConfiguration(object): SYSTEM = 'koji' MESSAGING = 'fedmsg' # or amq + MESSAGING_TOPIC_PREFIX = ['org.fedoraproject.prod'] KOJI_CONFIG = '/etc/module-build-service/koji.conf' KOJI_PROFILE = 'koji' KOJI_ARCHES = ['i686', 'armv7hl', 'x86_64'] @@ -89,6 +90,8 @@ class DevConfiguration(BaseConfiguration): LOG_BACKEND = 'console' LOG_LEVEL = 'debug' + MESSAGING_TOPIC_PREFIX = ['org.fedoraproject.dev', 'org.fedoraproject.stg'] + # Global network-related values, in seconds NET_TIMEOUT = 5 NET_RETRY_INTERVAL = 1 diff --git a/module_build_service/config.py b/module_build_service/config.py index 9361adbc..ca264397 100644 --- a/module_build_service/config.py +++ b/module_build_service/config.py @@ -197,6 +197,10 @@ class Config(object): 'type': str, 'default': 'fedmsg', 'desc': 'The messaging system to use.'}, + 'messaging_topic_prefix': { + 'type': list, + 'default': 'org.fedoraproject.prod', + 'desc': 'The messaging system topic prefixes which we are interested in.'}, 'amq_recv_addresses': { 'type': list, 'default': [], diff --git a/module_build_service/messaging.py b/module_build_service/messaging.py index ea3792e9..571c6220 100644 --- a/module_build_service/messaging.py +++ b/module_build_service/messaging.py @@ -83,11 +83,11 @@ class BaseMessage(object): msg_obj = None if not hasattr(msg, 'properties'): - return None # Unrelated message not identifying service origin + return None # Unrelated message not identifying service origin properties = json.loads(msg.properties, encoding='utf8') service = properties.get('service') - if service not in ('koji', 'mbs'): + if service not in _messaging_backends['amq']['services']: log.debug('Skipping msg due service=%s which is not related (msg=%r): ' % (service, msg)) return None @@ -135,13 +135,13 @@ class BaseMessage(object): body = json.loads(msg.body, encoding='utf8') if topic == 'module.state.change': msg_obj = RidaModule( - msg.id, body['id'], body['state'] ) + msg.id, body['id'], body['state']) if msg_obj: return msg_obj log.debug('Skipping unrecognized message: %s' % msg) - return None + return None @staticmethod def from_fedmsg(topic, msg): @@ -152,8 +152,10 @@ class BaseMessage(object): :return: an object of BaseMessage descent if the message is a type that the app looks for, otherwise None is returned """ + topic_categories = _messaging_backends['fedmsg']['services'] + categories_re = '|'.join(map(re.escape, topic_categories)) regex_pattern = re.compile( - (r'(?Pbuildsys|mbs)(?:\.)' + (r'(?P' + categories_re + r')(?:\.)' r'(?Pbuild|repo|module)(?:(?:\.)' r'(?Pstate))?(?:\.)(?Pchange|done)$')) regex_results = re.search(regex_pattern, topic) @@ -252,6 +254,7 @@ class RidaModule(BaseMessage): self.module_build_id = module_build_id self.module_build_state = module_build_state + def publish(topic, msg, conf, service): """ Publish a single message to a given backend, and return @@ -267,6 +270,7 @@ def publish(topic, msg, conf, service): raise KeyError("No messaging backend found for %r" % conf.messaging) return handler(topic, msg, conf, service) + def _fedmsg_publish(topic, msg, conf, service): # fedmsg doesn't really need access to conf, however other backends do import fedmsg @@ -336,6 +340,7 @@ def _amq_get_messenger(conf): log.debug('proton.Messenger: Subscribing to address=%s' % url) return msngr + def _amq_publish(topic, msg, conf, service): import proton msngr = _amq_get_messenger(conf) @@ -351,9 +356,11 @@ def _amq_publish(topic, msg, conf, service): _messaging_backends = { 'fedmsg': { 'publish': _fedmsg_publish, + 'services': ['buildsys', 'mbs'] }, 'amq': { 'publish': _amq_publish, + 'services': ['koji', 'mbs'] }, 'in_memory': { 'publish': _in_memory_publish, diff --git a/module_build_service/scheduler/consumer.py b/module_build_service/scheduler/consumer.py index f617639e..6c0bfc3d 100644 --- a/module_build_service/scheduler/consumer.py +++ b/module_build_service/scheduler/consumer.py @@ -27,6 +27,7 @@ to use. import koji import inspect +import itertools import fedmsg.consumers import moksha.hub @@ -42,7 +43,12 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): """ This is triggered by running fedmsg-hub. This class is responsible for ingesting and processing messages from the message bus. """ - topic = '*' + topic = ['{}.{}.'.format(pref.rstrip('.'), cat) + for pref, cat + in itertools.product( + conf.messaging_topic_prefix, + module_build_service.messaging._messaging_backends[conf.messaging]['services'])] + log.debug('Setting topics: {}'.format(', '.join(topic))) config_key = 'mbsconsumer' def __init__(self, hub): From abf37e98e10f1c736ae90ee4cf987193f245ac53 Mon Sep 17 00:00:00 2001 From: Filip Valder Date: Thu, 12 Jan 2017 19:57:34 +0100 Subject: [PATCH 2/2] in_memory messaging -> fallback to '*' topics --- module_build_service/messaging.py | 1 + module_build_service/scheduler/consumer.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/module_build_service/messaging.py b/module_build_service/messaging.py index 571c6220..85fc13ab 100644 --- a/module_build_service/messaging.py +++ b/module_build_service/messaging.py @@ -364,5 +364,6 @@ _messaging_backends = { }, 'in_memory': { 'publish': _in_memory_publish, + 'services': [] } } diff --git a/module_build_service/scheduler/consumer.py b/module_build_service/scheduler/consumer.py index 6c0bfc3d..9d206d22 100644 --- a/module_build_service/scheduler/consumer.py +++ b/module_build_service/scheduler/consumer.py @@ -48,6 +48,8 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): in itertools.product( conf.messaging_topic_prefix, module_build_service.messaging._messaging_backends[conf.messaging]['services'])] + if not topic: + topic = '*' log.debug('Setting topics: {}'.format(', '.join(topic))) config_key = 'mbsconsumer'