Modify message functions to accept conf instead of backend

This commit is contained in:
Matt Prahl
2016-09-20 08:49:14 -04:00
committed by Nils Philippsen
parent a3ecd4180d
commit d37659afd8
3 changed files with 22 additions and 14 deletions

View File

@@ -146,25 +146,33 @@ class RidaModule(BaseMessage):
self.module_build_state = module_build_state
def publish(topic, msg, backend, modname='rida'):
""" Publish a single message to a given backend, and return. """
def publish(topic, msg, conf, modname='rida'):
"""
Publish a single message to a given backend, and return
:param topic: the topic of the message (e.g. module.state.change)
:param msg: the message contents of the message (typically JSON)
:param conf: a Config object from the class in config.py
:param modname: the system that is publishing the message (e.g. rida)
:return:
"""
try:
handler = _messaging_backends[backend]['publish']
handler = _messaging_backends[conf.messaging]['publish']
except KeyError:
raise KeyError("No messaging backend found for %r" % backend)
raise KeyError("No messaging backend found for %r" % conf.messaging)
return handler(topic, msg, modname=modname)
def listen(backend, **kwargs):
""" Yield messages from a given messaging backend.
The ``**kwargs`` arguments will be passed on to the backend to do some
backend-specific connection handling, throttling, or filtering.
def listen(conf, **kwargs):
"""
Yield messages from the messaging backend in conf.messaging.
:param conf: a Config object from the class in config.py
:param kwargs: any additional arguments to pass to the backend handler
:return: yields a message object (child class from BaseMessage)
"""
try:
handler = _messaging_backends[backend]['listen']
handler = _messaging_backends[conf.messaging]['listen']
except KeyError:
raise KeyError("No messaging backend found for %r" % backend)
raise KeyError("No messaging backend found for %r" % conf.messaging)
for event in handler(**kwargs):
yield event

View File

@@ -149,7 +149,7 @@ class ModuleBuild(RidaBase):
modname='rida',
topic='module.state.change',
msg=module.json(), # Note the state is "init" here...
backend=conf.messaging,
conf=conf,
)
return module
@@ -168,7 +168,7 @@ class ModuleBuild(RidaBase):
modname='rida',
topic='module.state.change',
msg=self.json(), # Note the state is "init" here...
backend=conf.messaging,
conf=conf,
)
@classmethod

View File

@@ -75,7 +75,7 @@ class MessageIngest(threading.Thread):
def run(self):
for msg in rida.messaging.listen(conf.messaging):
for msg in rida.messaging.listen(conf):
self.outgoing_work_queue.put(msg)