diff --git a/config.py b/config.py index 966f1504..a13b9ffb 100644 --- a/config.py +++ b/config.py @@ -12,7 +12,7 @@ class BaseConfiguration(object): PORT = 5000 SYSTEM = 'koji' - MESSAGING = 'fedmsg' + MESSAGING = 'fedmsg' # or amq KOJI_CONFIG = '/etc/rida/koji.conf' KOJI_PROFILE = 'koji' KOJI_ARCHES = ['i686', 'armv7hl', 'x86_64'] @@ -53,6 +53,15 @@ class BaseConfiguration(object): KRB_PRINCIPAL = None KRB_CCACHE = None + # AMQ prefixed variables are required only while using 'amq' as messaging backend + # Addresses to listen to + AMQ_RECV_ADDRESSES = ['amqps://messaging.mydomain.com/Consumer.m8y.VirtualTopic.eng.koji', + 'amqps://messaging.mydomain.com/Consumer.m8y.VirtualTopic.eng.rida',] + # Address for sending messages + AMQ_DEST_ADDRESS = 'amqps://messaging.mydomain.com/Consumer.m8y.VirtualTopic.eng.rida' + AMQ_CERT_FILE = '/etc/rida/msg-m8y-client.crt' + AMQ_PRIVATE_KEY_FILE = '/etc/rida/msg-m8y-client.key' + AMQ_TRUSTED_CERT_FILE = '/etc/rida/Root-CA.crt' class DevConfiguration(BaseConfiguration): LOG_BACKEND = 'console' diff --git a/rida/config.py b/rida/config.py index e8ab9c19..71b8f48f 100644 --- a/rida/config.py +++ b/rida/config.py @@ -52,7 +52,6 @@ class Config(object): def __init__(self): """Initialize the Config object.""" self._system = "" - self._messaging = "" self._db = "" self._polling_interval = 0 self._pdc_url = "" @@ -78,6 +77,12 @@ class Config(object): self._krb_keytab = None self._krb_principal = None self._krb_ccache = "/tmp/krb5cc_rida" + self._messaging = "" + self._amq_recv_addresses = [] + self._amq_dest_address = "" + self._amq_cert_file = "" + self._amq_private_key_file = "" + self._amq_trusted_cert_file = "" @property def system(self): @@ -99,10 +104,58 @@ class Config(object): @messaging.setter def messaging(self, s): s = str(s) - if s not in ("fedmsg"): + if s not in ("fedmsg" , "amq"): raise ValueError("Unsupported messaging system.") self._messaging = s + @property + def amq_recv_addresses(self): + """Apache MQ broker url to receive messages.""" + return self._amq_recv_addresses + + @amq_recv_addresses.setter + def amq_recv_addresses(self, l): + assert isinstance(l, list) or isinstance(l, tuple) + self._amq_recv_addresses = list(l) + + @property + def amq_dest_address(self): + """Apache MQ broker address to send messages""" + return self._amq_dest_address + + @amq_dest_address.setter + def amq_dest_address(self, s): + self._amq_dest_address = str(s) + + @property + def amq_cert_file(self): + """Certificate for Apache MQ broker auth.""" + return self._amq_cert_file + + @amq_cert_file.setter + def amq_cert_file(self, s): + self._amq_cert_file = str(s) + + @property + def amq_private_key_file(self): + """Private key for Apache MQ broker auth.""" + return self._amq_private_key_file + + @amq_private_key_file.setter + def amq_private_key_file(self, s): + self._amq_private_key_file = str(s) + + @property + def amq_trusted_cert_file(self): + """Trusted certificate for ssl connection.""" + return self._amq_trusted_cert_file + + @amq_trusted_cert_file.setter + def amq_trusted_cert_file(self, s): + s = str(s) + self._amq_trusted_cert_file = s + + @property def db(self): """RDB URL.""" diff --git a/rida/messaging.py b/rida/messaging.py index 51e16175..dd154a9b 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -24,6 +24,8 @@ """Generic messaging functions.""" +import json +import os import re try: from inspect import signature @@ -52,6 +54,67 @@ class BaseMessage(object): return "{}({})".format(type(self).__name__, ', '.join(args_strs)) + @staticmethod + def from_amq(topic, msg): + msg_obj = None + + if not hasattr(msg, 'properties'): + return None # Unrelated message not identifying service origin + properties = json.loads(msg.properties, encoding='utf8') + service = properties.get('service') + + if service not in ('koji', 'rida'): + log.debug('Skipping msg due service=%s which is not related (msg=%r): ' % (service, msg)) + return None + + # This probably appies only for brew + # Also wouldn't be easier to use properties? + if service == 'koji': + content = json.loads(msg.body, encoding='utf8')['content'] + log.debug("Found koji related msg: %s" % msg) + method = content['info']['method'] + msg_type = content['info']['type'] + + if method == 'newRepo': + attr = content['attribute'] + state = content['info']['new'] + if attr == "state" and state == "CLOSED": + repo_tag = content['info']['request'] + assert len(repo_tag) == 1 + msg_obj = KojiRepoChange(msg.id, repo_tag[0]) + + elif method == 'build' and msg_type == 'TaskStateChange': + attr = content['attribute'] + if attr == "state": + build_id = content['info']['id'] + build_state = content['new'] + # These are not available before build is assigned + build_name = None + build_version = None + build_release = None + nvr_req = set(['name', 'version', 'release']) + if nvr_req.issubset(set(content['info'].keys())): + build_name = content['info']['name'] + build_version = content['info']['version'] + build_release = content['info']['release'] + + msg_obj = KojiBuildChange( + msg.id, build_id, build_state, build_name, + build_version, build_release) + + elif service == 'rida': + log.debug("Found rida related msg: %s" % msg) + body = json.loads(msg.body, encoding='utf8') + if topic == 'module.state.change': + msg_obj = RidaModule( + msg.id, body['id'], body['state'] ) + + if msg_obj: + return msg_obj + + log.debug('Skipping unrecognized message: %s' % msg) + return None + @staticmethod def from_fedmsg(topic, msg): """ @@ -79,7 +142,7 @@ class BaseMessage(object): # If there isn't a msg dict in msg then this message can be skipped if not msg_inner_msg: log.debug(('Skipping message without any content with the ' - 'topic "{0}"').format(topic)) + 'topic "{0}"').format(topic)) return None msg_obj = None @@ -160,20 +223,20 @@ class RidaModule(BaseMessage): self.module_build_state = module_build_state -def publish(topic, msg, conf, modname='rida'): +def publish(topic, msg, conf, service): """ Publish a single message to a given backend, and return :param topic: the topic of the message (e.g. module.state.change) :param msg: the message contents of the message (typically JSON) :param conf: a Config object from the class in config.py - :param modname: the system that is publishing the message (e.g. rida) + :param service: the system that is publishing the message (e.g. rida) :return: """ try: handler = _messaging_backends[conf.messaging]['publish'] except KeyError: raise KeyError("No messaging backend found for %r" % conf.messaging) - return handler(topic, msg, modname=modname) + return handler(topic, msg, conf, service) def listen(conf, **kwargs): @@ -188,16 +251,16 @@ def listen(conf, **kwargs): except KeyError: raise KeyError("No messaging backend found for %r" % conf.messaging) - for event in handler(**kwargs): + for event in handler(conf, **kwargs): yield event -def _fedmsg_publish(topic, msg, modname): +def _fedmsg_publish(topic, msg, conf, service): + # fedmsg doesn't really need access to conf, however other backends do import fedmsg - return fedmsg.publish(topic=topic, msg=msg, modname=modname) + return fedmsg.publish(topic, msg=msg, modname=service) - -def _fedmsg_listen(**kwargs): +def _fedmsg_listen(conf, **kwargs): """ Parses a fedmsg event and constructs it into the appropriate message object """ @@ -207,9 +270,68 @@ def _fedmsg_listen(**kwargs): if msg_obj: yield msg_obj +def _amq_get_messenger(conf): + import proton + for attr in ('amq_private_key_file', 'amq_trusted_cert_file', 'amq_cert_file'): + val = getattr(conf, attr) + log.debug('Checking config.%s=%s' % (attr, val)) + assert os.path.exists(val), 'config.%s=%s file does not exist' % (attr, val) + + for attr in ('amq_recv_addresses', 'amq_dest_address'): + val = getattr(conf, attr) + log.debug('Checking config.%s=%s' % (attr, val)) + # list values + if isinstance(val, (list, tuple)): + assert val, 'config.%s is not supposed to be empty' % attr + # individual urls + for v in val: + assert v and '://' in v, 'config.%s: value "%s" does not seem like a valid url' % (attr, val) + # string values + else: + assert val and '://' in val, 'config.%s: value "%s" does not seem like a valid url' % (attr, val) + + msngr = proton.Messenger() + msngr.certificate = conf.amq_cert_file + msngr.private_key = conf.amq_private_key_file + msngr.trusted_certificates = conf.amq_trusted_cert_file + msngr.start() + for url in conf.amq_recv_addresses: + msngr.subscribe(url) + log.debug('proton.Messenger: Subscribing to address=%s' % url) + return msngr + +def _amq_listen(conf, **kwargs): + import proton + msngr = _amq_get_messenger(conf) + msg = proton.Message() + while True: + msngr.recv() + + while msngr.incoming: + msngr.get(msg) + msg_obj = BaseMessage.from_amq(msg.address, msg) + if msg_obj: + yield msg_obj + +def _amq_publish(topic, msg, conf, service): + import proton + msngr = _amq_get_messenger(conf) + message = proton.Message() + message.address = conf.amq_dest_address + message.subject = topic + message.properties['service'] = service + message.content = json.dumps(msg, ensure_ascii=False).encode('utf8') + msngr.put(message) + msngr.send() + + _messaging_backends = { 'fedmsg': { 'publish': _fedmsg_publish, 'listen': _fedmsg_listen, }, + 'amq': { + 'publish': _amq_publish, + 'listen': _amq_listen, + }, } diff --git a/rida/models.py b/rida/models.py index c7f8d631..e571a468 100644 --- a/rida/models.py +++ b/rida/models.py @@ -146,7 +146,7 @@ class ModuleBuild(RidaBase): session.add(module) session.commit() rida.messaging.publish( - modname='rida', + service='rida', topic='module.state.change', msg=module.json(), # Note the state is "init" here... conf=conf, @@ -165,7 +165,7 @@ class ModuleBuild(RidaBase): log.debug("%r, state %r->%r" % (self, old_state, self.state)) rida.messaging.publish( - modname='rida', + service='rida', topic='module.state.change', msg=self.json(), # Note the state is "init" here... conf=conf, diff --git a/tests/test_messaging/test_messaging_functions.py b/tests/test_messaging/test_messaging_functions.py index 30ca1afa..0783ea94 100644 --- a/tests/test_messaging/test_messaging_functions.py +++ b/tests/test_messaging/test_messaging_functions.py @@ -69,7 +69,7 @@ class TestUtilFunctions(unittest.TestCase): } mock_tail_messages.side_effect = \ lambda: [('fedora-infrastructure', endpoint, topic, msg)] - msg_obj = next(rida.messaging._fedmsg_listen()) + msg_obj = next(rida.messaging._fedmsg_listen(None)) self.assertEquals(type(msg_obj), rida.messaging.KojiBuildChange) self.assertEquals(msg_obj.build_id, 2345678) self.assertEquals(msg_obj.build_new_state, 0) @@ -99,7 +99,7 @@ class TestUtilFunctions(unittest.TestCase): } mock_tail_messages.side_effect = \ lambda: [('fedora-infrastructure', endpoint, topic, msg)] - msg_obj = next(rida.messaging._fedmsg_listen()) + msg_obj = next(rida.messaging._fedmsg_listen(None)) self.assertEquals(type(msg_obj), rida.messaging.KojiRepoChange) self.assertEquals(msg_obj.repo_tag, 'f23-build') self.assertEquals(msg_obj.msg_id, @@ -116,7 +116,7 @@ class TestUtilFunctions(unittest.TestCase): } mock_tail_messages.side_effect = \ lambda: [('fedora-infrastructure', endpoint, topic, msg)] - msg_obj = next(rida.messaging._fedmsg_listen()) + msg_obj = next(rida.messaging._fedmsg_listen(None)) self.assertEquals(msg_obj.module_build_id, msg['msg']['id']) self.assertEquals(msg_obj.module_build_state, msg['msg']['state']) self.assertEquals(msg_obj.msg_id,