mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-05-16 13:56:11 +08:00
Add 'in_memory' messaging backend and use it during build_module_locally to drop dependency on fedmsg-relay for local builds.
This commit is contained in:
@@ -33,7 +33,7 @@ except ImportError:
|
||||
from funcsigs import signature
|
||||
|
||||
from module_build_service import log
|
||||
|
||||
import queue
|
||||
|
||||
class BaseMessage(object):
|
||||
def __init__(self, msg_id):
|
||||
@@ -330,6 +330,39 @@ def _amq_publish(topic, msg, conf, service):
|
||||
msngr.put(message)
|
||||
msngr.send()
|
||||
|
||||
# Queue for "in_memory" messaging.
|
||||
_in_memory_work_queue = queue.Queue()
|
||||
|
||||
# Message id for "in_memory" messaging.
|
||||
_in_memory_msg_id = 0
|
||||
|
||||
def _in_memory_publish(topic, msg, conf, service):
|
||||
"""
|
||||
Puts the message to _in_memory_work_queue".
|
||||
"""
|
||||
|
||||
# Increment the message ID.
|
||||
global _in_memory_msg_id
|
||||
_in_memory_msg_id += 1
|
||||
|
||||
# Create fake fedmsg from the message so we can reuse
|
||||
# the BaseMessage.from_fedmsg code to get the particular BaseMessage
|
||||
# class instance.
|
||||
topic = service + "." + topic
|
||||
wrapped_msg = {}
|
||||
wrapped_msg["msg_id"] = str(_in_memory_msg_id)
|
||||
wrapped_msg["msg"] = msg
|
||||
wrapped_msg = BaseMessage.from_fedmsg(topic, wrapped_msg)
|
||||
|
||||
# Put the message to queue.
|
||||
_in_memory_work_queue.put(wrapped_msg)
|
||||
|
||||
def _in_memory_listen(conf, **kwargs):
|
||||
"""
|
||||
Yields the message from the _in_memory_work_queue when ready.
|
||||
"""
|
||||
while True:
|
||||
yield _in_memory_work_queue.get(True)
|
||||
|
||||
_messaging_backends = {
|
||||
'fedmsg': {
|
||||
@@ -340,4 +373,8 @@ _messaging_backends = {
|
||||
'publish': _amq_publish,
|
||||
'listen': _amq_listen,
|
||||
},
|
||||
'in_memory': {
|
||||
'publish': _in_memory_publish,
|
||||
'listen': _in_memory_listen,
|
||||
},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user