mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-05 03:38:12 +08:00
Merge #284 Support message topic prefixes + service/category names
This commit is contained in:
@@ -25,6 +25,7 @@ class BaseConfiguration(object):
|
||||
|
||||
SYSTEM = 'koji'
|
||||
MESSAGING = 'fedmsg' # or amq
|
||||
MESSAGING_TOPIC_PREFIX = ['org.fedoraproject.prod']
|
||||
KOJI_CONFIG = '/etc/module-build-service/koji.conf'
|
||||
KOJI_PROFILE = 'koji'
|
||||
KOJI_ARCHES = ['i686', 'armv7hl', 'x86_64']
|
||||
@@ -89,6 +90,8 @@ class DevConfiguration(BaseConfiguration):
|
||||
LOG_BACKEND = 'console'
|
||||
LOG_LEVEL = 'debug'
|
||||
|
||||
MESSAGING_TOPIC_PREFIX = ['org.fedoraproject.dev', 'org.fedoraproject.stg']
|
||||
|
||||
# Global network-related values, in seconds
|
||||
NET_TIMEOUT = 5
|
||||
NET_RETRY_INTERVAL = 1
|
||||
|
||||
@@ -197,6 +197,10 @@ class Config(object):
|
||||
'type': str,
|
||||
'default': 'fedmsg',
|
||||
'desc': 'The messaging system to use.'},
|
||||
'messaging_topic_prefix': {
|
||||
'type': list,
|
||||
'default': 'org.fedoraproject.prod',
|
||||
'desc': 'The messaging system topic prefixes which we are interested in.'},
|
||||
'amq_recv_addresses': {
|
||||
'type': list,
|
||||
'default': [],
|
||||
|
||||
@@ -83,11 +83,11 @@ class BaseMessage(object):
|
||||
msg_obj = None
|
||||
|
||||
if not hasattr(msg, 'properties'):
|
||||
return None # Unrelated message not identifying service origin
|
||||
return None # Unrelated message not identifying service origin
|
||||
properties = json.loads(msg.properties, encoding='utf8')
|
||||
service = properties.get('service')
|
||||
|
||||
if service not in ('koji', 'mbs'):
|
||||
if service not in _messaging_backends['amq']['services']:
|
||||
log.debug('Skipping msg due service=%s which is not related (msg=%r): ' % (service, msg))
|
||||
return None
|
||||
|
||||
@@ -135,13 +135,13 @@ class BaseMessage(object):
|
||||
body = json.loads(msg.body, encoding='utf8')
|
||||
if topic == 'module.state.change':
|
||||
msg_obj = RidaModule(
|
||||
msg.id, body['id'], body['state'] )
|
||||
msg.id, body['id'], body['state'])
|
||||
|
||||
if msg_obj:
|
||||
return msg_obj
|
||||
|
||||
log.debug('Skipping unrecognized message: %s' % msg)
|
||||
return None
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def from_fedmsg(topic, msg):
|
||||
@@ -152,8 +152,10 @@ class BaseMessage(object):
|
||||
:return: an object of BaseMessage descent if the message is a type
|
||||
that the app looks for, otherwise None is returned
|
||||
"""
|
||||
topic_categories = _messaging_backends['fedmsg']['services']
|
||||
categories_re = '|'.join(map(re.escape, topic_categories))
|
||||
regex_pattern = re.compile(
|
||||
(r'(?P<category>buildsys|mbs)(?:\.)'
|
||||
(r'(?P<category>' + categories_re + r')(?:\.)'
|
||||
r'(?P<object>build|repo|module)(?:(?:\.)'
|
||||
r'(?P<subobject>state))?(?:\.)(?P<event>change|done)$'))
|
||||
regex_results = re.search(regex_pattern, topic)
|
||||
@@ -252,6 +254,7 @@ class RidaModule(BaseMessage):
|
||||
self.module_build_id = module_build_id
|
||||
self.module_build_state = module_build_state
|
||||
|
||||
|
||||
def publish(topic, msg, conf, service):
|
||||
"""
|
||||
Publish a single message to a given backend, and return
|
||||
@@ -267,6 +270,7 @@ def publish(topic, msg, conf, service):
|
||||
raise KeyError("No messaging backend found for %r" % conf.messaging)
|
||||
return handler(topic, msg, conf, service)
|
||||
|
||||
|
||||
def _fedmsg_publish(topic, msg, conf, service):
|
||||
# fedmsg doesn't really need access to conf, however other backends do
|
||||
import fedmsg
|
||||
@@ -336,6 +340,7 @@ def _amq_get_messenger(conf):
|
||||
log.debug('proton.Messenger: Subscribing to address=%s' % url)
|
||||
return msngr
|
||||
|
||||
|
||||
def _amq_publish(topic, msg, conf, service):
|
||||
import proton
|
||||
msngr = _amq_get_messenger(conf)
|
||||
@@ -351,11 +356,14 @@ def _amq_publish(topic, msg, conf, service):
|
||||
_messaging_backends = {
|
||||
'fedmsg': {
|
||||
'publish': _fedmsg_publish,
|
||||
'services': ['buildsys', 'mbs']
|
||||
},
|
||||
'amq': {
|
||||
'publish': _amq_publish,
|
||||
'services': ['koji', 'mbs']
|
||||
},
|
||||
'in_memory': {
|
||||
'publish': _in_memory_publish,
|
||||
'services': []
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ to use.
|
||||
|
||||
import koji
|
||||
import inspect
|
||||
import itertools
|
||||
import fedmsg.consumers
|
||||
import moksha.hub
|
||||
|
||||
@@ -42,7 +43,14 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
""" This is triggered by running fedmsg-hub. This class is responsible for
|
||||
ingesting and processing messages from the message bus.
|
||||
"""
|
||||
topic = '*'
|
||||
topic = ['{}.{}.'.format(pref.rstrip('.'), cat)
|
||||
for pref, cat
|
||||
in itertools.product(
|
||||
conf.messaging_topic_prefix,
|
||||
module_build_service.messaging._messaging_backends[conf.messaging]['services'])]
|
||||
if not topic:
|
||||
topic = '*'
|
||||
log.debug('Setting topics: {}'.format(', '.join(topic)))
|
||||
config_key = 'mbsconsumer'
|
||||
|
||||
def __init__(self, hub):
|
||||
|
||||
Reference in New Issue
Block a user