Introduce pluggable backends.

This allows an operator to write a custom messaging backend to handle
whatever transport and format.
This commit is contained in:
Ralph Bean
2017-09-19 13:09:07 -04:00
parent 1f077a12ea
commit 5e08d2ffe4
6 changed files with 92 additions and 156 deletions

View File

@@ -24,9 +24,9 @@
"""Generic messaging functions."""
import json
import os
import re
import pkg_resources
try:
from inspect import signature
except ImportError:
@@ -81,80 +81,25 @@ class BaseMessage(object):
def __json__(self):
return dict(msg_id=self.msg_id, topic=self.topic, body=self.body)
@staticmethod
def from_amq(topic, msg):
msg_obj = None
if not hasattr(msg, 'properties'):
return None # Unrelated message not identifying service origin
properties = json.loads(msg.properties, encoding='utf8')
service = properties.get('service')
class MessageParser(object):
if service not in _messaging_backends['amq']['services']:
log.debug('Skipping msg due service=%s which is not related (msg=%r): ' % (service, msg))
return None
def parse(self, msg):
raise NotImplementedError()
# This probably appies only for brew
# Also wouldn't be easier to use properties?
if service == 'koji':
content = json.loads(msg.body, encoding='utf8')['content']
log.debug("Found koji related msg: %s" % msg)
method = content['info']['method']
msg_type = content['info']['type']
if method == 'newRepo':
attr = content['attribute']
state = content['info']['new']
if attr == "state" and state == "CLOSED":
repo_tag = content['info']['request']
assert len(repo_tag) == 1
msg_obj = KojiRepoChange(msg.id, repo_tag[0])
class FedmsgMessageParser(MessageParser):
elif method == 'build' and msg_type == 'TaskStateChange':
attr = content['attribute']
if attr == "state":
build_id = content['info']['id']
# TODO: Someone with AMQ knowledge should check if
# info.id is build id or task_id here. For now I presume
# it is task_id.
task_id = content['info']['id']
build_state = content['new']
# These are not available before build is assigned
build_name = None
build_version = None
build_release = None
nvr_req = set(['name', 'version', 'release'])
if nvr_req.issubset(set(content['info'].keys())):
build_name = content['info']['name']
build_version = content['info']['version']
build_release = content['info']['release']
msg_obj = KojiBuildChange(
msg.id, build_id, task_id, build_state, build_name,
build_version, build_release)
elif service == 'mbs':
log.debug("Found mbs related msg: %s" % msg)
body = json.loads(msg.body, encoding='utf8')
if topic == 'module.state.change':
msg_obj = MBSModule(
msg.id, body['id'], body['state'])
if msg_obj:
return msg_obj
log.debug('Skipping unrecognized message: %s' % msg)
return None
@staticmethod
def from_fedmsg(topic, msg):
def parse(self, msg):
"""
Takes a fedmsg topic and message and converts it to a message object
:param topic: the topic of the fedmsg message
:param msg: the message contents from the fedmsg message
:return: an object of BaseMessage descent if the message is a type
that the app looks for, otherwise None is returned
"""
if 'body' in msg:
msg = msg['body']
topic = msg['topic']
topic_categories = _messaging_backends['fedmsg']['services']
categories_re = '|'.join(map(re.escape, topic_categories))
regex_pattern = re.compile(
@@ -347,7 +292,8 @@ def publish(topic, msg, conf, service):
try:
handler = _messaging_backends[conf.messaging]['publish']
except KeyError:
raise KeyError("No messaging backend found for %r" % conf.messaging)
raise KeyError("No messaging backend found for %r in %r" % (
conf.messaging, _messaging_backends.keys()))
return handler(topic, msg, conf, service)
@@ -371,10 +317,11 @@ def _in_memory_publish(topic, msg, conf, service):
# Create fake fedmsg from the message so we can reuse
# the BaseMessage.from_fedmsg code to get the particular BaseMessage
# class instance.
wrapped_msg = BaseMessage.from_fedmsg(
service + "." + topic,
{"msg_id": str(_in_memory_msg_id), "msg": msg},
)
wrapped_msg = FedmsgMessageParser().parse({
"msg_id": str(_in_memory_msg_id),
"topic": service + "." + topic,
"msg": msg,
})
# Put the message to queue.
from module_build_service.scheduler.consumer import work_queue_put
@@ -390,60 +337,27 @@ def _in_memory_publish(topic, msg, conf, service):
_initial_messages.append(wrapped_msg)
def _amq_get_messenger(conf):
import proton
for attr in ('amq_private_key_file', 'amq_trusted_cert_file', 'amq_cert_file'):
val = getattr(conf, attr)
log.debug('Checking config.%s=%s' % (attr, val))
assert os.path.exists(val), 'config.%s=%s file does not exist' % (attr, val)
for attr in ('amq_recv_addresses', 'amq_dest_address'):
val = getattr(conf, attr)
log.debug('Checking config.%s=%s' % (attr, val))
# list values
if isinstance(val, (list, tuple)):
assert val, 'config.%s is not supposed to be empty' % attr
# individual urls
for v in val:
assert v and '://' in v, 'config.%s: value "%s" does not seem like a valid url' % (attr, val)
# string values
else:
assert val and '://' in val, 'config.%s: value "%s" does not seem like a valid url' % (attr, val)
msngr = proton.Messenger()
msngr.certificate = conf.amq_cert_file
msngr.private_key = conf.amq_private_key_file
msngr.trusted_certificates = conf.amq_trusted_cert_file
msngr.start()
for url in conf.amq_recv_addresses:
msngr.subscribe(url)
log.debug('proton.Messenger: Subscribing to address=%s' % url)
return msngr
def _amq_publish(topic, msg, conf, service):
import proton
msngr = _amq_get_messenger(conf)
message = proton.Message()
message.address = conf.amq_dest_address
message.subject = topic
message.properties['service'] = service
message.content = json.dumps(msg, ensure_ascii=False).encode('utf8')
msngr.put(message)
msngr.send()
_messaging_backends = {
'fedmsg': {
'publish': _fedmsg_publish,
'services': ['buildsys', 'mbs', 'copr']
},
'amq': {
'publish': _amq_publish,
'services': ['koji', 'mbs']
},
'in_memory': {
'publish': _in_memory_publish,
'services': []
}
_fedmsg_backend = {
'publish': _fedmsg_publish,
'services': ['buildsys', 'mbs', 'copr'],
'parser': FedmsgMessageParser(),
'topic_suffix': '.',
}
_in_memory_backend = {
'publish': _in_memory_publish,
'services': [],
'parser': FedmsgMessageParser(), # re-used. :)
'topic_suffix': '.',
}
_messaging_backends = {}
for entrypoint in pkg_resources.iter_entry_points('mbs.messaging_backends'):
_messaging_backends[entrypoint.name] = ep = entrypoint.load()
required = ['publish', 'services', 'parser']
if any([key not in ep for key in required]):
raise ValueError('messaging backend %r is malformed: %r' % (
entrypoint.name, ep))
if not _messaging_backends:
raise ValueError("No messaging plugins are installed or available.")