mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-13 16:29:49 +08:00
rida.messaging add initial amq support
Signed-off-by: Lubos Kocman <lkocman@redhat.com>
This commit is contained in:
@@ -24,6 +24,8 @@
|
||||
|
||||
"""Generic messaging functions."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
try:
|
||||
from inspect import signature
|
||||
@@ -52,6 +54,66 @@ class BaseMessage(object):
|
||||
|
||||
return "{}({})".format(type(self).__name__, ', '.join(args_strs))
|
||||
|
||||
@staticmethod
|
||||
def from_amq(topic, msg):
|
||||
msg_obj = None
|
||||
properties = None
|
||||
body = None
|
||||
if hasattr('properties'):
|
||||
properties = json.loads(msg.properties, encoding='utf8')
|
||||
if not ((properties.get('service') == 'koji' or properties.get('service') == 'rida')):
|
||||
log.debug("Skipping msg: %s" % msg)
|
||||
return None
|
||||
|
||||
service = properties.get('service')
|
||||
# This probably appies only for brew
|
||||
# Also wouldn't be easier to use properties?
|
||||
if service == 'koji':
|
||||
content = json.loads(msg.body, encoding='utf8')['content']
|
||||
log.debug("Found koji related msg: %s" % msg)
|
||||
method = content['info']['method']
|
||||
msg_type = content['info']['type']
|
||||
|
||||
if method == 'newRepo':
|
||||
attr = content['attribute']
|
||||
state = content['info']['new']
|
||||
if attr == "state" and state == "CLOSED":
|
||||
repo_tag = content['info']['request']
|
||||
assert len(repo_tag) == 1
|
||||
msg_obj = KojiRepoChange(msg.id, repo_tag[0])
|
||||
|
||||
elif method == 'build' and msg_type == 'TaskStateChange':
|
||||
attr = content['attribute']
|
||||
if attr == "state":
|
||||
build_id = content['info']['id']
|
||||
build_state = content['new']
|
||||
# These are not available before build is assigned
|
||||
build_name = None
|
||||
build_version = None
|
||||
build_release = None
|
||||
nvr_req = set(['name', 'version', 'release'])
|
||||
if nvr_req.issubset(set(content['info'].keys())):
|
||||
build_name = content['info']['name']
|
||||
build_version = content['info']['version']
|
||||
build_release = content['info']['release']
|
||||
|
||||
msg_obj = KojiBuildChange(
|
||||
msg.id, build_id, build_state, build_name,
|
||||
build_version,build_release)
|
||||
|
||||
elif service == 'rida':
|
||||
log.debug("Found rida related msg: %s" % msg)
|
||||
body = json.loads(msg.body, encoding='utf8')
|
||||
if topic == 'module.state.change':
|
||||
msg_obj = RidaModule(
|
||||
msg.id, body['id'], body['state'] )
|
||||
|
||||
if msg_obj:
|
||||
return msg_obj
|
||||
|
||||
log.debug('Skipping unrecognized message: %s' % msg)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def from_fedmsg(topic, msg):
|
||||
"""
|
||||
@@ -208,9 +270,72 @@ def _fedmsg_listen(conf, **kwargs): # XXX: should we keep conf?
|
||||
if msg_obj:
|
||||
yield msg_obj
|
||||
|
||||
def _amq_get_messenger(conf):
|
||||
import proton
|
||||
for attr in ('amq_private_key_file', 'amq_trusted_cert_file', 'amq_cert_file'):
|
||||
val = getattr(conf, attr)
|
||||
log.debug('Checking config.%s=%s' % (attr, val))
|
||||
assert os.path.exists(val), 'config.%s=%s file does not exist' % (attr, val)
|
||||
|
||||
for attr in ('amq_recv_addresses', 'amq_dest_address'):
|
||||
val = getattr(conf, attr)
|
||||
log.debug('Checking config.%s=%s' % (attr, val))
|
||||
# list values
|
||||
if isinstance(val, list) or isinstance(val, tuple):
|
||||
assert val, 'config.%s is not supposed to be empty' % attr
|
||||
# individual urls
|
||||
for v in val:
|
||||
assert v and '://' in v, 'config.%s: value "%s" doesn not seem like a valid url' % (attr, val)
|
||||
# string values
|
||||
else:
|
||||
assert val and '://' in val, 'config.%s: value "%s" doesn not seem like a valid url' % (attr, val)
|
||||
|
||||
mng = proton.Messenger()
|
||||
mng.certificate=conf.amq_cert_file
|
||||
mng.private_key=conf.amq_private_key_file
|
||||
mng.trusted_certificates=conf.amq_trusted_cert_file
|
||||
mng.start()
|
||||
for url in conf.amq_recv_addresses:
|
||||
mng.subscribe(url)
|
||||
log.debug('proton.Messenger: Subscribing to address=%s' % url)
|
||||
return mng
|
||||
|
||||
def _amq_listen(conf, **kwargs):
|
||||
import proton
|
||||
mng = _amq_get_messenger(conf)
|
||||
msg = proton.Message()
|
||||
while True:
|
||||
mng.recv()
|
||||
|
||||
while mng.incoming:
|
||||
mng.get(msg)
|
||||
msg_obj = BaseMessage.from_amq(msg.address, msg)
|
||||
if msg_obj:
|
||||
yield msg_obj
|
||||
|
||||
def _amq_get_dst_address(conf, topic, msg, service):
|
||||
return conf.amq_broker_dest_url
|
||||
|
||||
def _amq_publish(conf, topic, msg, service):
|
||||
import proton
|
||||
mng = _amq_get_messenger(conf)
|
||||
message = proton.Message()
|
||||
message.address = _amq_get_dst_address(conf, topic, msg, service)
|
||||
message.subject = topic
|
||||
message.properties['service'] = service
|
||||
message.address = conf.amq_dest_address
|
||||
message.content = json.dumps(msg, ensure_ascii=False).encode('utf8')
|
||||
mng.put(message)
|
||||
mng.send()
|
||||
|
||||
|
||||
_messaging_backends = {
|
||||
'fedmsg': {
|
||||
'publish': _fedmsg_publish,
|
||||
'listen': _fedmsg_listen,
|
||||
},
|
||||
'amq': {
|
||||
'publish': _amq_publish,
|
||||
'listen': _amq_listen,
|
||||
},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user