Merge branch 'amq-messaging'

This commit is contained in:
Ralph Bean
2016-10-18 10:45:59 -04:00
5 changed files with 201 additions and 17 deletions

View File

@@ -12,7 +12,7 @@ class BaseConfiguration(object):
PORT = 5000
SYSTEM = 'koji'
MESSAGING = 'fedmsg'
MESSAGING = 'fedmsg' # or amq
KOJI_CONFIG = '/etc/rida/koji.conf'
KOJI_PROFILE = 'koji'
KOJI_ARCHES = ['i686', 'armv7hl', 'x86_64']
@@ -53,6 +53,15 @@ class BaseConfiguration(object):
KRB_PRINCIPAL = None
KRB_CCACHE = None
# AMQ prefixed variables are required only while using 'amq' as messaging backend
# Addresses to listen to
AMQ_RECV_ADDRESSES = ['amqps://messaging.mydomain.com/Consumer.m8y.VirtualTopic.eng.koji',
'amqps://messaging.mydomain.com/Consumer.m8y.VirtualTopic.eng.rida',]
# Address for sending messages
AMQ_DEST_ADDRESS = 'amqps://messaging.mydomain.com/Consumer.m8y.VirtualTopic.eng.rida'
AMQ_CERT_FILE = '/etc/rida/msg-m8y-client.crt'
AMQ_PRIVATE_KEY_FILE = '/etc/rida/msg-m8y-client.key'
AMQ_TRUSTED_CERT_FILE = '/etc/rida/Root-CA.crt'
class DevConfiguration(BaseConfiguration):
LOG_BACKEND = 'console'

View File

@@ -52,7 +52,6 @@ class Config(object):
def __init__(self):
"""Initialize the Config object."""
self._system = ""
self._messaging = ""
self._db = ""
self._polling_interval = 0
self._pdc_url = ""
@@ -78,6 +77,12 @@ class Config(object):
self._krb_keytab = None
self._krb_principal = None
self._krb_ccache = "/tmp/krb5cc_rida"
self._messaging = ""
self._amq_recv_addresses = []
self._amq_dest_address = ""
self._amq_cert_file = ""
self._amq_private_key_file = ""
self._amq_trusted_cert_file = ""
@property
def system(self):
@@ -99,10 +104,58 @@ class Config(object):
@messaging.setter
def messaging(self, s):
s = str(s)
if s not in ("fedmsg"):
if s not in ("fedmsg" , "amq"):
raise ValueError("Unsupported messaging system.")
self._messaging = s
@property
def amq_recv_addresses(self):
"""Apache MQ broker url to receive messages."""
return self._amq_recv_addresses
@amq_recv_addresses.setter
def amq_recv_addresses(self, l):
assert isinstance(l, list) or isinstance(l, tuple)
self._amq_recv_addresses = list(l)
@property
def amq_dest_address(self):
"""Apache MQ broker address to send messages"""
return self._amq_dest_address
@amq_dest_address.setter
def amq_dest_address(self, s):
self._amq_dest_address = str(s)
@property
def amq_cert_file(self):
"""Certificate for Apache MQ broker auth."""
return self._amq_cert_file
@amq_cert_file.setter
def amq_cert_file(self, s):
self._amq_cert_file = str(s)
@property
def amq_private_key_file(self):
"""Private key for Apache MQ broker auth."""
return self._amq_private_key_file
@amq_private_key_file.setter
def amq_private_key_file(self, s):
self._amq_private_key_file = str(s)
@property
def amq_trusted_cert_file(self):
"""Trusted certificate for ssl connection."""
return self._amq_trusted_cert_file
@amq_trusted_cert_file.setter
def amq_trusted_cert_file(self, s):
s = str(s)
self._amq_trusted_cert_file = s
@property
def db(self):
"""RDB URL."""

View File

@@ -24,6 +24,8 @@
"""Generic messaging functions."""
import json
import os
import re
try:
from inspect import signature
@@ -52,6 +54,67 @@ class BaseMessage(object):
return "{}({})".format(type(self).__name__, ', '.join(args_strs))
@staticmethod
def from_amq(topic, msg):
msg_obj = None
if not hasattr(msg, 'properties'):
return None # Unrelated message not identifying service origin
properties = json.loads(msg.properties, encoding='utf8')
service = properties.get('service')
if service not in ('koji', 'rida'):
log.debug('Skipping msg due service=%s which is not related (msg=%r): ' % (service, msg))
return None
# This probably appies only for brew
# Also wouldn't be easier to use properties?
if service == 'koji':
content = json.loads(msg.body, encoding='utf8')['content']
log.debug("Found koji related msg: %s" % msg)
method = content['info']['method']
msg_type = content['info']['type']
if method == 'newRepo':
attr = content['attribute']
state = content['info']['new']
if attr == "state" and state == "CLOSED":
repo_tag = content['info']['request']
assert len(repo_tag) == 1
msg_obj = KojiRepoChange(msg.id, repo_tag[0])
elif method == 'build' and msg_type == 'TaskStateChange':
attr = content['attribute']
if attr == "state":
build_id = content['info']['id']
build_state = content['new']
# These are not available before build is assigned
build_name = None
build_version = None
build_release = None
nvr_req = set(['name', 'version', 'release'])
if nvr_req.issubset(set(content['info'].keys())):
build_name = content['info']['name']
build_version = content['info']['version']
build_release = content['info']['release']
msg_obj = KojiBuildChange(
msg.id, build_id, build_state, build_name,
build_version, build_release)
elif service == 'rida':
log.debug("Found rida related msg: %s" % msg)
body = json.loads(msg.body, encoding='utf8')
if topic == 'module.state.change':
msg_obj = RidaModule(
msg.id, body['id'], body['state'] )
if msg_obj:
return msg_obj
log.debug('Skipping unrecognized message: %s' % msg)
return None
@staticmethod
def from_fedmsg(topic, msg):
"""
@@ -79,7 +142,7 @@ class BaseMessage(object):
# If there isn't a msg dict in msg then this message can be skipped
if not msg_inner_msg:
log.debug(('Skipping message without any content with the '
'topic "{0}"').format(topic))
'topic "{0}"').format(topic))
return None
msg_obj = None
@@ -160,20 +223,20 @@ class RidaModule(BaseMessage):
self.module_build_state = module_build_state
def publish(topic, msg, conf, modname='rida'):
def publish(topic, msg, conf, service):
"""
Publish a single message to a given backend, and return
:param topic: the topic of the message (e.g. module.state.change)
:param msg: the message contents of the message (typically JSON)
:param conf: a Config object from the class in config.py
:param modname: the system that is publishing the message (e.g. rida)
:param service: the system that is publishing the message (e.g. rida)
:return:
"""
try:
handler = _messaging_backends[conf.messaging]['publish']
except KeyError:
raise KeyError("No messaging backend found for %r" % conf.messaging)
return handler(topic, msg, modname=modname)
return handler(topic, msg, conf, service)
def listen(conf, **kwargs):
@@ -188,16 +251,16 @@ def listen(conf, **kwargs):
except KeyError:
raise KeyError("No messaging backend found for %r" % conf.messaging)
for event in handler(**kwargs):
for event in handler(conf, **kwargs):
yield event
def _fedmsg_publish(topic, msg, modname):
def _fedmsg_publish(topic, msg, conf, service):
# fedmsg doesn't really need access to conf, however other backends do
import fedmsg
return fedmsg.publish(topic=topic, msg=msg, modname=modname)
return fedmsg.publish(topic, msg=msg, modname=service)
def _fedmsg_listen(**kwargs):
def _fedmsg_listen(conf, **kwargs):
"""
Parses a fedmsg event and constructs it into the appropriate message object
"""
@@ -207,9 +270,68 @@ def _fedmsg_listen(**kwargs):
if msg_obj:
yield msg_obj
def _amq_get_messenger(conf):
import proton
for attr in ('amq_private_key_file', 'amq_trusted_cert_file', 'amq_cert_file'):
val = getattr(conf, attr)
log.debug('Checking config.%s=%s' % (attr, val))
assert os.path.exists(val), 'config.%s=%s file does not exist' % (attr, val)
for attr in ('amq_recv_addresses', 'amq_dest_address'):
val = getattr(conf, attr)
log.debug('Checking config.%s=%s' % (attr, val))
# list values
if isinstance(val, (list, tuple)):
assert val, 'config.%s is not supposed to be empty' % attr
# individual urls
for v in val:
assert v and '://' in v, 'config.%s: value "%s" does not seem like a valid url' % (attr, val)
# string values
else:
assert val and '://' in val, 'config.%s: value "%s" does not seem like a valid url' % (attr, val)
msngr = proton.Messenger()
msngr.certificate = conf.amq_cert_file
msngr.private_key = conf.amq_private_key_file
msngr.trusted_certificates = conf.amq_trusted_cert_file
msngr.start()
for url in conf.amq_recv_addresses:
msngr.subscribe(url)
log.debug('proton.Messenger: Subscribing to address=%s' % url)
return msngr
def _amq_listen(conf, **kwargs):
import proton
msngr = _amq_get_messenger(conf)
msg = proton.Message()
while True:
msngr.recv()
while msngr.incoming:
msngr.get(msg)
msg_obj = BaseMessage.from_amq(msg.address, msg)
if msg_obj:
yield msg_obj
def _amq_publish(topic, msg, conf, service):
import proton
msngr = _amq_get_messenger(conf)
message = proton.Message()
message.address = conf.amq_dest_address
message.subject = topic
message.properties['service'] = service
message.content = json.dumps(msg, ensure_ascii=False).encode('utf8')
msngr.put(message)
msngr.send()
_messaging_backends = {
'fedmsg': {
'publish': _fedmsg_publish,
'listen': _fedmsg_listen,
},
'amq': {
'publish': _amq_publish,
'listen': _amq_listen,
},
}

View File

@@ -146,7 +146,7 @@ class ModuleBuild(RidaBase):
session.add(module)
session.commit()
rida.messaging.publish(
modname='rida',
service='rida',
topic='module.state.change',
msg=module.json(), # Note the state is "init" here...
conf=conf,
@@ -165,7 +165,7 @@ class ModuleBuild(RidaBase):
log.debug("%r, state %r->%r" % (self, old_state, self.state))
rida.messaging.publish(
modname='rida',
service='rida',
topic='module.state.change',
msg=self.json(), # Note the state is "init" here...
conf=conf,

View File

@@ -69,7 +69,7 @@ class TestUtilFunctions(unittest.TestCase):
}
mock_tail_messages.side_effect = \
lambda: [('fedora-infrastructure', endpoint, topic, msg)]
msg_obj = next(rida.messaging._fedmsg_listen())
msg_obj = next(rida.messaging._fedmsg_listen(None))
self.assertEquals(type(msg_obj), rida.messaging.KojiBuildChange)
self.assertEquals(msg_obj.build_id, 2345678)
self.assertEquals(msg_obj.build_new_state, 0)
@@ -99,7 +99,7 @@ class TestUtilFunctions(unittest.TestCase):
}
mock_tail_messages.side_effect = \
lambda: [('fedora-infrastructure', endpoint, topic, msg)]
msg_obj = next(rida.messaging._fedmsg_listen())
msg_obj = next(rida.messaging._fedmsg_listen(None))
self.assertEquals(type(msg_obj), rida.messaging.KojiRepoChange)
self.assertEquals(msg_obj.repo_tag, 'f23-build')
self.assertEquals(msg_obj.msg_id,
@@ -116,7 +116,7 @@ class TestUtilFunctions(unittest.TestCase):
}
mock_tail_messages.side_effect = \
lambda: [('fedora-infrastructure', endpoint, topic, msg)]
msg_obj = next(rida.messaging._fedmsg_listen())
msg_obj = next(rida.messaging._fedmsg_listen(None))
self.assertEquals(msg_obj.module_build_id, msg['msg']['id'])
self.assertEquals(msg_obj.module_build_state, msg['msg']['state'])
self.assertEquals(msg_obj.msg_id,