diff --git a/rida/messaging.py b/rida/messaging.py index 663109ef..d8bd787a 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -24,6 +24,8 @@ """Generic messaging functions.""" +import json +import os import re try: from inspect import signature @@ -52,6 +54,66 @@ class BaseMessage(object): return "{}({})".format(type(self).__name__, ', '.join(args_strs)) + @staticmethod + def from_amq(topic, msg): + msg_obj = None + properties = None + body = None + if hasattr('properties'): + properties = json.loads(msg.properties, encoding='utf8') + if not ((properties.get('service') == 'koji' or properties.get('service') == 'rida')): + log.debug("Skipping msg: %s" % msg) + return None + + service = properties.get('service') + # This probably appies only for brew + # Also wouldn't be easier to use properties? + if service == 'koji': + content = json.loads(msg.body, encoding='utf8')['content'] + log.debug("Found koji related msg: %s" % msg) + method = content['info']['method'] + msg_type = content['info']['type'] + + if method == 'newRepo': + attr = content['attribute'] + state = content['info']['new'] + if attr == "state" and state == "CLOSED": + repo_tag = content['info']['request'] + assert len(repo_tag) == 1 + msg_obj = KojiRepoChange(msg.id, repo_tag[0]) + + elif method == 'build' and msg_type == 'TaskStateChange': + attr = content['attribute'] + if attr == "state": + build_id = content['info']['id'] + build_state = content['new'] + # These are not available before build is assigned + build_name = None + build_version = None + build_release = None + nvr_req = set(['name', 'version', 'release']) + if nvr_req.issubset(set(content['info'].keys())): + build_name = content['info']['name'] + build_version = content['info']['version'] + build_release = content['info']['release'] + + msg_obj = KojiBuildChange( + msg.id, build_id, build_state, build_name, + build_version,build_release) + + elif service == 'rida': + log.debug("Found rida related msg: %s" % msg) + body = json.loads(msg.body, encoding='utf8') + if topic == 'module.state.change': + msg_obj = RidaModule( + msg.id, body['id'], body['state'] ) + + if msg_obj: + return msg_obj + + log.debug('Skipping unrecognized message: %s' % msg) + return None + @staticmethod def from_fedmsg(topic, msg): """ @@ -208,9 +270,72 @@ def _fedmsg_listen(conf, **kwargs): # XXX: should we keep conf? if msg_obj: yield msg_obj +def _amq_get_messenger(conf): + import proton + for attr in ('amq_private_key_file', 'amq_trusted_cert_file', 'amq_cert_file'): + val = getattr(conf, attr) + log.debug('Checking config.%s=%s' % (attr, val)) + assert os.path.exists(val), 'config.%s=%s file does not exist' % (attr, val) + + for attr in ('amq_recv_addresses', 'amq_dest_address'): + val = getattr(conf, attr) + log.debug('Checking config.%s=%s' % (attr, val)) + # list values + if isinstance(val, list) or isinstance(val, tuple): + assert val, 'config.%s is not supposed to be empty' % attr + # individual urls + for v in val: + assert v and '://' in v, 'config.%s: value "%s" doesn not seem like a valid url' % (attr, val) + # string values + else: + assert val and '://' in val, 'config.%s: value "%s" doesn not seem like a valid url' % (attr, val) + + mng = proton.Messenger() + mng.certificate=conf.amq_cert_file + mng.private_key=conf.amq_private_key_file + mng.trusted_certificates=conf.amq_trusted_cert_file + mng.start() + for url in conf.amq_recv_addresses: + mng.subscribe(url) + log.debug('proton.Messenger: Subscribing to address=%s' % url) + return mng + +def _amq_listen(conf, **kwargs): + import proton + mng = _amq_get_messenger(conf) + msg = proton.Message() + while True: + mng.recv() + + while mng.incoming: + mng.get(msg) + msg_obj = BaseMessage.from_amq(msg.address, msg) + if msg_obj: + yield msg_obj + +def _amq_get_dst_address(conf, topic, msg, service): + return conf.amq_broker_dest_url + +def _amq_publish(conf, topic, msg, service): + import proton + mng = _amq_get_messenger(conf) + message = proton.Message() + message.address = _amq_get_dst_address(conf, topic, msg, service) + message.subject = topic + message.properties['service'] = service + message.address = conf.amq_dest_address + message.content = json.dumps(msg, ensure_ascii=False).encode('utf8') + mng.put(message) + mng.send() + + _messaging_backends = { 'fedmsg': { 'publish': _fedmsg_publish, 'listen': _fedmsg_listen, }, + 'amq': { + 'publish': _amq_publish, + 'listen': _amq_listen, + }, }