From 2f2688dc0129d1aaac95d1ed29ce8ac729f6dde8 Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Mon, 26 Sep 2016 07:29:09 -0400 Subject: [PATCH 01/16] rida.config: add Apache MQ config options Signed-off-by: Lubos Kocman --- rida/config.py | 59 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/rida/config.py b/rida/config.py index e8ab9c19..48a0883a 100644 --- a/rida/config.py +++ b/rida/config.py @@ -52,7 +52,6 @@ class Config(object): def __init__(self): """Initialize the Config object.""" self._system = "" - self._messaging = "" self._db = "" self._polling_interval = 0 self._pdc_url = "" @@ -78,6 +77,12 @@ class Config(object): self._krb_keytab = None self._krb_principal = None self._krb_ccache = "/tmp/krb5cc_rida" + self._messaging = "" + self._amq_recv_addresses = [] + self._amq_dest_address = "" + self._amq_cert_file = "" + self._amq_private_key_file = "" + self._amq_trusted_cert_file = "" @property def system(self): @@ -99,10 +104,60 @@ class Config(object): @messaging.setter def messaging(self, s): s = str(s) - if s not in ("fedmsg"): + if s not in ("fedmsg" , "amq"): raise ValueError("Unsupported messaging system.") self._messaging = s + @property + def amq_recv_addresses(self): + """Apache MQ broker url to receive messages.""" + return self._amq_recv_addresses + + @amq_recv_addresses.setter + def amq_recv_addresses(self, l): + assert isinstance(l, list) or isinstance(l, tuple) + self._amq_recv_addresses = list(l) + + @property + def amq_dest_address(self): + """Apache MQ broker address to send messages""" + return self._amq_dest_address + + @amq_dest_address.setter + def amq_dest_address(self, s): + self._amq_dest_address = str(s) + + @property + def amq_cert_file(self): + """Certificate for Apache MQ broker auth.""" + return self._amq_cert_file + + @amq_cert_file.setter + def amq_cert_file(self, s): + s = str(s) + self._amq_cert_file = s + + @property + def amq_private_key_file(self): + """Private key for Apache MQ broker auth.""" + return self._amq_private_key_file + + @amq_private_key_file.setter + def amq_private_key_file(self, s): + s = str(s) + self._amq_private_key_file = s + + @property + def amq_trusted_cert_file(self): + """Trusted certificate for ssl connection.""" + return self._amq_trusted_cert_file + + @amq_trusted_cert_file.setter + def amq_trusted_cert_file(self, s): + s = str(s) + self._amq_trusted_cert_file = s + + @property def db(self): """RDB URL.""" From 0787abd1b50d421bdcd5fa82d1cd7396bead89f0 Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Mon, 26 Sep 2016 14:12:39 +0200 Subject: [PATCH 02/16] rida.messaging: few changes: - pass over conf as param (helps amq implementation) - modname -> service to be less fedmsg centric Signed-off-by: Lubos Kocman --- rida/messaging.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/rida/messaging.py b/rida/messaging.py index 44b2a859..663109ef 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -162,20 +162,20 @@ class RidaModule(BaseMessage): self.module_build_state = module_build_state -def publish(topic, msg, conf, modname='rida'): +def publish(topic, msg, conf, service='rida'): """ Publish a single message to a given backend, and return :param topic: the topic of the message (e.g. module.state.change) :param msg: the message contents of the message (typically JSON) :param conf: a Config object from the class in config.py - :param modname: the system that is publishing the message (e.g. rida) + :param service: the system that is publishing the message (e.g. rida) :return: """ try: handler = _messaging_backends[conf.messaging]['publish'] except KeyError: raise KeyError("No messaging backend found for %r" % conf.messaging) - return handler(topic, msg, modname=modname) + return handler(topic, msg, service=service) def listen(conf, **kwargs): @@ -190,16 +190,15 @@ def listen(conf, **kwargs): except KeyError: raise KeyError("No messaging backend found for %r" % conf.messaging) - for event in handler(**kwargs): + for event in handler(conf, **kwargs): yield event -def _fedmsg_publish(topic, msg, modname): +def _fedmsg_publish(conf, topic, msg, service): import fedmsg - return fedmsg.publish(topic=topic, msg=msg, modname=modname) + return fedmsg.publish(topic, msg=msg, modname=service) - -def _fedmsg_listen(**kwargs): +def _fedmsg_listen(conf, **kwargs): # XXX: should we keep conf? """ Parses a fedmsg event and constructs it into the appropriate message object """ From 823f851d29be7a8ad799bbd4aec49e571269d998 Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Mon, 26 Sep 2016 14:13:24 +0200 Subject: [PATCH 03/16] rida.messaging add initial amq support Signed-off-by: Lubos Kocman --- rida/messaging.py | 125 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/rida/messaging.py b/rida/messaging.py index 663109ef..d8bd787a 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -24,6 +24,8 @@ """Generic messaging functions.""" +import json +import os import re try: from inspect import signature @@ -52,6 +54,66 @@ class BaseMessage(object): return "{}({})".format(type(self).__name__, ', '.join(args_strs)) + @staticmethod + def from_amq(topic, msg): + msg_obj = None + properties = None + body = None + if hasattr('properties'): + properties = json.loads(msg.properties, encoding='utf8') + if not ((properties.get('service') == 'koji' or properties.get('service') == 'rida')): + log.debug("Skipping msg: %s" % msg) + return None + + service = properties.get('service') + # This probably appies only for brew + # Also wouldn't be easier to use properties? + if service == 'koji': + content = json.loads(msg.body, encoding='utf8')['content'] + log.debug("Found koji related msg: %s" % msg) + method = content['info']['method'] + msg_type = content['info']['type'] + + if method == 'newRepo': + attr = content['attribute'] + state = content['info']['new'] + if attr == "state" and state == "CLOSED": + repo_tag = content['info']['request'] + assert len(repo_tag) == 1 + msg_obj = KojiRepoChange(msg.id, repo_tag[0]) + + elif method == 'build' and msg_type == 'TaskStateChange': + attr = content['attribute'] + if attr == "state": + build_id = content['info']['id'] + build_state = content['new'] + # These are not available before build is assigned + build_name = None + build_version = None + build_release = None + nvr_req = set(['name', 'version', 'release']) + if nvr_req.issubset(set(content['info'].keys())): + build_name = content['info']['name'] + build_version = content['info']['version'] + build_release = content['info']['release'] + + msg_obj = KojiBuildChange( + msg.id, build_id, build_state, build_name, + build_version,build_release) + + elif service == 'rida': + log.debug("Found rida related msg: %s" % msg) + body = json.loads(msg.body, encoding='utf8') + if topic == 'module.state.change': + msg_obj = RidaModule( + msg.id, body['id'], body['state'] ) + + if msg_obj: + return msg_obj + + log.debug('Skipping unrecognized message: %s' % msg) + return None + @staticmethod def from_fedmsg(topic, msg): """ @@ -208,9 +270,72 @@ def _fedmsg_listen(conf, **kwargs): # XXX: should we keep conf? if msg_obj: yield msg_obj +def _amq_get_messenger(conf): + import proton + for attr in ('amq_private_key_file', 'amq_trusted_cert_file', 'amq_cert_file'): + val = getattr(conf, attr) + log.debug('Checking config.%s=%s' % (attr, val)) + assert os.path.exists(val), 'config.%s=%s file does not exist' % (attr, val) + + for attr in ('amq_recv_addresses', 'amq_dest_address'): + val = getattr(conf, attr) + log.debug('Checking config.%s=%s' % (attr, val)) + # list values + if isinstance(val, list) or isinstance(val, tuple): + assert val, 'config.%s is not supposed to be empty' % attr + # individual urls + for v in val: + assert v and '://' in v, 'config.%s: value "%s" doesn not seem like a valid url' % (attr, val) + # string values + else: + assert val and '://' in val, 'config.%s: value "%s" doesn not seem like a valid url' % (attr, val) + + mng = proton.Messenger() + mng.certificate=conf.amq_cert_file + mng.private_key=conf.amq_private_key_file + mng.trusted_certificates=conf.amq_trusted_cert_file + mng.start() + for url in conf.amq_recv_addresses: + mng.subscribe(url) + log.debug('proton.Messenger: Subscribing to address=%s' % url) + return mng + +def _amq_listen(conf, **kwargs): + import proton + mng = _amq_get_messenger(conf) + msg = proton.Message() + while True: + mng.recv() + + while mng.incoming: + mng.get(msg) + msg_obj = BaseMessage.from_amq(msg.address, msg) + if msg_obj: + yield msg_obj + +def _amq_get_dst_address(conf, topic, msg, service): + return conf.amq_broker_dest_url + +def _amq_publish(conf, topic, msg, service): + import proton + mng = _amq_get_messenger(conf) + message = proton.Message() + message.address = _amq_get_dst_address(conf, topic, msg, service) + message.subject = topic + message.properties['service'] = service + message.address = conf.amq_dest_address + message.content = json.dumps(msg, ensure_ascii=False).encode('utf8') + mng.put(message) + mng.send() + + _messaging_backends = { 'fedmsg': { 'publish': _fedmsg_publish, 'listen': _fedmsg_listen, }, + 'amq': { + 'publish': _amq_publish, + 'listen': _amq_listen, + }, } From c83eb15194bbf26256b370edc4430c8844aec221 Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Mon, 26 Sep 2016 07:58:28 -0400 Subject: [PATCH 04/16] config.py: example amq config Signed-off-by: Lubos Kocman --- config.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/config.py b/config.py index 9a225c3d..2c5548eb 100644 --- a/config.py +++ b/config.py @@ -12,7 +12,7 @@ class BaseConfiguration(object): PORT = 5000 SYSTEM = 'koji' - MESSAGING = 'fedmsg' + MESSAGING = 'fedmsg' # or amq KOJI_CONFIG = '/etc/rida/koji.conf' KOJI_PROFILE = 'koji' KOJI_ARCHES = ['i686', 'armv7hl', 'x86_64'] @@ -53,6 +53,15 @@ class BaseConfiguration(object): KRB_PRINCIPAL = None KRB_CCACHE = None + # AMQ prefixed variables are required only while using 'amq' as messaging backend + # Addresses to listen to + AMQ_RECV_ADDRESSES = ['amqps://messaging.mydomain.com/Consumer.m8y.VirtualTopic.eng.koji', + 'amqps://messaging.mydomain.com/Consumer.m8y.VirtualTopic.eng.rida',] + # Address for sending messages + AMQ_DEST_ADDRESS = 'amqps://messaging.mydomain.com/Consumer.m8y.VirtualTopic.eng.rida' + AMQ_CERT_FILE = '/etc/rida/msg-m8y-client.crt' + AMQ_PRIVATE_KEY_FILE = '/etc/rida/msg-m8y-client.key' + AMQ_TRUSTED_CERT_FILE = '/etci/rida/Root-CA.crt' class DevConfiguration(BaseConfiguration): LOG_BACKEND = 'console' From 9783fe6c04dfcd5e46267355c8e2961bfdc48754 Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Mon, 26 Sep 2016 14:34:10 +0200 Subject: [PATCH 05/16] Fixed hasattr --- rida/messaging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rida/messaging.py b/rida/messaging.py index d8bd787a..a2680dc6 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -59,7 +59,7 @@ class BaseMessage(object): msg_obj = None properties = None body = None - if hasattr('properties'): + if hasattr(msg, 'properties'): properties = json.loads(msg.properties, encoding='utf8') if not ((properties.get('service') == 'koji' or properties.get('service') == 'rida')): log.debug("Skipping msg: %s" % msg) From 11703ccb41a9c821eab62c1c91ddc0a5305a45a3 Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Mon, 26 Sep 2016 15:24:43 +0200 Subject: [PATCH 06/16] Improved conditions for message filtering --- rida/messaging.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/rida/messaging.py b/rida/messaging.py index a2680dc6..05a7c33d 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -57,15 +57,16 @@ class BaseMessage(object): @staticmethod def from_amq(topic, msg): msg_obj = None - properties = None - body = None - if hasattr(msg, 'properties'): - properties = json.loads(msg.properties, encoding='utf8') - if not ((properties.get('service') == 'koji' or properties.get('service') == 'rida')): - log.debug("Skipping msg: %s" % msg) - return None + if not hasattr(msg, 'properties'): + return None # Unrelated message not identifying service origin + properties = json.loads(msg.properties, encoding='utf8') service = properties.get('service') + + if service not in ('koji', 'rida'): + log.debug('Skipping msg due service=%s which is not related (msg=%r): ' % (service, msg)) + return None + # This probably appies only for brew # Also wouldn't be easier to use properties? if service == 'koji': From b576dd70d75661e785df8001e8cca7caf8d522f3 Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Mon, 26 Sep 2016 15:28:05 +0200 Subject: [PATCH 07/16] config: fixed typo in path --- config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.py b/config.py index 2c5548eb..5264a78a 100644 --- a/config.py +++ b/config.py @@ -61,7 +61,7 @@ class BaseConfiguration(object): AMQ_DEST_ADDRESS = 'amqps://messaging.mydomain.com/Consumer.m8y.VirtualTopic.eng.rida' AMQ_CERT_FILE = '/etc/rida/msg-m8y-client.crt' AMQ_PRIVATE_KEY_FILE = '/etc/rida/msg-m8y-client.key' - AMQ_TRUSTED_CERT_FILE = '/etci/rida/Root-CA.crt' + AMQ_TRUSTED_CERT_FILE = '/etc/rida/Root-CA.crt' class DevConfiguration(BaseConfiguration): LOG_BACKEND = 'console' From 372367bb9c17f985133a4157c4fdc79af6bea510 Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Tue, 4 Oct 2016 14:00:46 +0200 Subject: [PATCH 08/16] messaging: use single isinstance() --- rida/messaging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rida/messaging.py b/rida/messaging.py index 05a7c33d..af6e2e2a 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -282,7 +282,7 @@ def _amq_get_messenger(conf): val = getattr(conf, attr) log.debug('Checking config.%s=%s' % (attr, val)) # list values - if isinstance(val, list) or isinstance(val, tuple): + if isinstance(val, (list, tuple)): assert val, 'config.%s is not supposed to be empty' % attr # individual urls for v in val: From 3daef241581d7a908f1dba50314f83eac23e5fa0 Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Tue, 4 Oct 2016 14:04:10 +0200 Subject: [PATCH 09/16] config.py: get rid of meaningless str() oneliners --- rida/config.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rida/config.py b/rida/config.py index 48a0883a..71b8f48f 100644 --- a/rida/config.py +++ b/rida/config.py @@ -134,8 +134,7 @@ class Config(object): @amq_cert_file.setter def amq_cert_file(self, s): - s = str(s) - self._amq_cert_file = s + self._amq_cert_file = str(s) @property def amq_private_key_file(self): @@ -144,8 +143,7 @@ class Config(object): @amq_private_key_file.setter def amq_private_key_file(self, s): - s = str(s) - self._amq_private_key_file = s + self._amq_private_key_file = str(s) @property def amq_trusted_cert_file(self): From b090a8d7f88275113894421f1727a8ce6679c4fd Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Tue, 4 Oct 2016 14:08:33 +0200 Subject: [PATCH 10/16] messaging: add space after comma --- rida/messaging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rida/messaging.py b/rida/messaging.py index af6e2e2a..2dc298c8 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -100,7 +100,7 @@ class BaseMessage(object): msg_obj = KojiBuildChange( msg.id, build_id, build_state, build_name, - build_version,build_release) + build_version, build_release) elif service == 'rida': log.debug("Found rida related msg: %s" % msg) From bb1110758e5d7621112bbac1827f152b3ed30eb8 Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Tue, 4 Oct 2016 14:09:13 +0200 Subject: [PATCH 11/16] remove unuseful/obsolete get_amq_addr call --- rida/messaging.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/rida/messaging.py b/rida/messaging.py index 2dc298c8..70193416 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -314,17 +314,13 @@ def _amq_listen(conf, **kwargs): if msg_obj: yield msg_obj -def _amq_get_dst_address(conf, topic, msg, service): - return conf.amq_broker_dest_url - def _amq_publish(conf, topic, msg, service): import proton mng = _amq_get_messenger(conf) message = proton.Message() - message.address = _amq_get_dst_address(conf, topic, msg, service) + message.address = conf.amq_dest_address message.subject = topic message.properties['service'] = service - message.address = conf.amq_dest_address message.content = json.dumps(msg, ensure_ascii=False).encode('utf8') mng.put(message) mng.send() From 6ec05fd05832401b2a54540b11b238c80d75b447 Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Tue, 4 Oct 2016 14:12:34 +0200 Subject: [PATCH 12/16] messaging.py: mng -> msngr --- rida/messaging.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/rida/messaging.py b/rida/messaging.py index 70193416..2f531965 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -291,39 +291,39 @@ def _amq_get_messenger(conf): else: assert val and '://' in val, 'config.%s: value "%s" doesn not seem like a valid url' % (attr, val) - mng = proton.Messenger() - mng.certificate=conf.amq_cert_file - mng.private_key=conf.amq_private_key_file - mng.trusted_certificates=conf.amq_trusted_cert_file - mng.start() + msngr = proton.Messenger() + msngr.certificate=conf.amq_cert_file + msngr.private_key=conf.amq_private_key_file + msngr.trusted_certificates=conf.amq_trusted_cert_file + msngr.start() for url in conf.amq_recv_addresses: - mng.subscribe(url) + msngr.subscribe(url) log.debug('proton.Messenger: Subscribing to address=%s' % url) - return mng + return msngr def _amq_listen(conf, **kwargs): import proton - mng = _amq_get_messenger(conf) + msngr = _amq_get_messenger(conf) msg = proton.Message() while True: - mng.recv() + msngr.recv() - while mng.incoming: - mng.get(msg) + while msngr.incoming: + msngr.get(msg) msg_obj = BaseMessage.from_amq(msg.address, msg) if msg_obj: yield msg_obj def _amq_publish(conf, topic, msg, service): import proton - mng = _amq_get_messenger(conf) + msngr = _amq_get_messenger(conf) message = proton.Message() message.address = conf.amq_dest_address message.subject = topic message.properties['service'] = service message.content = json.dumps(msg, ensure_ascii=False).encode('utf8') - mng.put(message) - mng.send() + msngr.put(message) + msngr.send() _messaging_backends = { From b00ea5ac958300a34d6145e8f7ce4d8fe375f466 Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Tue, 4 Oct 2016 14:14:06 +0200 Subject: [PATCH 13/16] messaging: fixed typo --- rida/messaging.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rida/messaging.py b/rida/messaging.py index 2f531965..f98db0c3 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -286,10 +286,10 @@ def _amq_get_messenger(conf): assert val, 'config.%s is not supposed to be empty' % attr # individual urls for v in val: - assert v and '://' in v, 'config.%s: value "%s" doesn not seem like a valid url' % (attr, val) + assert v and '://' in v, 'config.%s: value "%s" does not seem like a valid url' % (attr, val) # string values else: - assert val and '://' in val, 'config.%s: value "%s" doesn not seem like a valid url' % (attr, val) + assert val and '://' in val, 'config.%s: value "%s" does not seem like a valid url' % (attr, val) msngr = proton.Messenger() msngr.certificate=conf.amq_cert_file From 452aba0373f34e76ac39596392e0901f7379ae63 Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Tue, 4 Oct 2016 15:00:19 +0200 Subject: [PATCH 14/16] messaging: publish*, unify order of args, remove positional arg --- rida/messaging.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/rida/messaging.py b/rida/messaging.py index f98db0c3..b7e526f2 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -225,7 +225,7 @@ class RidaModule(BaseMessage): self.module_build_state = module_build_state -def publish(topic, msg, conf, service='rida'): +def publish(topic, msg, conf, service): """ Publish a single message to a given backend, and return :param topic: the topic of the message (e.g. module.state.change) @@ -238,7 +238,7 @@ def publish(topic, msg, conf, service='rida'): handler = _messaging_backends[conf.messaging]['publish'] except KeyError: raise KeyError("No messaging backend found for %r" % conf.messaging) - return handler(topic, msg, service=service) + return handler(topic, msg, conf, service) def listen(conf, **kwargs): @@ -257,7 +257,8 @@ def listen(conf, **kwargs): yield event -def _fedmsg_publish(conf, topic, msg, service): +def _fedmsg_publish(topic, msg, conf, service): + # fedmsg doesn't really need access to conf, however other backends do import fedmsg return fedmsg.publish(topic, msg=msg, modname=service) @@ -314,7 +315,7 @@ def _amq_listen(conf, **kwargs): if msg_obj: yield msg_obj -def _amq_publish(conf, topic, msg, service): +def _amq_publish(topic, msg, conf, service): import proton msngr = _amq_get_messenger(conf) message = proton.Message() From 151d4ebadb6d056554f7c5e36f639cce3bc66e5c Mon Sep 17 00:00:00 2001 From: Lubos Kocman Date: Tue, 4 Oct 2016 15:05:09 +0200 Subject: [PATCH 15/16] messaging: amq PEP improvements --- rida/messaging.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/rida/messaging.py b/rida/messaging.py index b7e526f2..53b00a5c 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -72,11 +72,11 @@ class BaseMessage(object): if service == 'koji': content = json.loads(msg.body, encoding='utf8')['content'] log.debug("Found koji related msg: %s" % msg) - method = content['info']['method'] - msg_type = content['info']['type'] + method = content['info']['method'] + msg_type = content['info']['type'] if method == 'newRepo': - attr = content['attribute'] + attr = content['attribute'] state = content['info']['new'] if attr == "state" and state == "CLOSED": repo_tag = content['info']['request'] @@ -106,7 +106,7 @@ class BaseMessage(object): log.debug("Found rida related msg: %s" % msg) body = json.loads(msg.body, encoding='utf8') if topic == 'module.state.change': - msg_obj = RidaModule( + msg_obj = RidaModule( msg.id, body['id'], body['state'] ) if msg_obj: @@ -142,7 +142,7 @@ class BaseMessage(object): # If there isn't a msg dict in msg then this message can be skipped if not msg_inner_msg: log.debug(('Skipping message without any content with the ' - 'topic "{0}"').format(topic)) + 'topic "{0}"').format(topic)) return None msg_obj = None @@ -262,7 +262,7 @@ def _fedmsg_publish(topic, msg, conf, service): import fedmsg return fedmsg.publish(topic, msg=msg, modname=service) -def _fedmsg_listen(conf, **kwargs): # XXX: should we keep conf? +def _fedmsg_listen(conf, **kwargs): """ Parses a fedmsg event and constructs it into the appropriate message object """ @@ -293,9 +293,9 @@ def _amq_get_messenger(conf): assert val and '://' in val, 'config.%s: value "%s" does not seem like a valid url' % (attr, val) msngr = proton.Messenger() - msngr.certificate=conf.amq_cert_file - msngr.private_key=conf.amq_private_key_file - msngr.trusted_certificates=conf.amq_trusted_cert_file + msngr.certificate = conf.amq_cert_file + msngr.private_key = conf.amq_private_key_file + msngr.trusted_certificates = conf.amq_trusted_cert_file msngr.start() for url in conf.amq_recv_addresses: msngr.subscribe(url) From 04d773b3fbbead9c868e08b4bbee61646e372d14 Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Tue, 18 Oct 2016 10:33:04 -0400 Subject: [PATCH 16/16] Get the test suite working agian for https://pagure.io/fm-orchestrator/pull-request/73 --- rida/models.py | 4 ++-- tests/test_messaging/test_messaging_functions.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/rida/models.py b/rida/models.py index c7f8d631..e571a468 100644 --- a/rida/models.py +++ b/rida/models.py @@ -146,7 +146,7 @@ class ModuleBuild(RidaBase): session.add(module) session.commit() rida.messaging.publish( - modname='rida', + service='rida', topic='module.state.change', msg=module.json(), # Note the state is "init" here... conf=conf, @@ -165,7 +165,7 @@ class ModuleBuild(RidaBase): log.debug("%r, state %r->%r" % (self, old_state, self.state)) rida.messaging.publish( - modname='rida', + service='rida', topic='module.state.change', msg=self.json(), # Note the state is "init" here... conf=conf, diff --git a/tests/test_messaging/test_messaging_functions.py b/tests/test_messaging/test_messaging_functions.py index 30ca1afa..0783ea94 100644 --- a/tests/test_messaging/test_messaging_functions.py +++ b/tests/test_messaging/test_messaging_functions.py @@ -69,7 +69,7 @@ class TestUtilFunctions(unittest.TestCase): } mock_tail_messages.side_effect = \ lambda: [('fedora-infrastructure', endpoint, topic, msg)] - msg_obj = next(rida.messaging._fedmsg_listen()) + msg_obj = next(rida.messaging._fedmsg_listen(None)) self.assertEquals(type(msg_obj), rida.messaging.KojiBuildChange) self.assertEquals(msg_obj.build_id, 2345678) self.assertEquals(msg_obj.build_new_state, 0) @@ -99,7 +99,7 @@ class TestUtilFunctions(unittest.TestCase): } mock_tail_messages.side_effect = \ lambda: [('fedora-infrastructure', endpoint, topic, msg)] - msg_obj = next(rida.messaging._fedmsg_listen()) + msg_obj = next(rida.messaging._fedmsg_listen(None)) self.assertEquals(type(msg_obj), rida.messaging.KojiRepoChange) self.assertEquals(msg_obj.repo_tag, 'f23-build') self.assertEquals(msg_obj.msg_id, @@ -116,7 +116,7 @@ class TestUtilFunctions(unittest.TestCase): } mock_tail_messages.side_effect = \ lambda: [('fedora-infrastructure', endpoint, topic, msg)] - msg_obj = next(rida.messaging._fedmsg_listen()) + msg_obj = next(rida.messaging._fedmsg_listen(None)) self.assertEquals(msg_obj.module_build_id, msg['msg']['id']) self.assertEquals(msg_obj.module_build_state, msg['msg']['state']) self.assertEquals(msg_obj.msg_id,