Merge #234 Add 'internal' messaging backend and use it during build_module_locally to drop dependency on fedmsg-relay for local builds.

This commit is contained in:
Jan Kaluža
2016-12-06 09:37:42 +00:00
5 changed files with 42 additions and 3 deletions

View File

@@ -125,6 +125,7 @@ class TestConfiguration(BaseConfiguration):
LOG_LEVEL = 'debug'
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
DEBUG = True
MESSAGING = 'in_memory'
# Global network-related values, in seconds
NET_TIMEOUT = 3

View File

@@ -1149,7 +1149,7 @@ $repos
# what RPMs are output of particular SRPM build yet.
for artifact in artifacts:
if artifact.startswith("module-build-macros"):
self._execute_cmd(["mock", "-r", self.mock_config, "-i",
_execute_cmd(["mock", "-r", self.mock_config, "-i",
"module-build-macros"])
def buildroot_add_repos(self, dependencies):

View File

@@ -334,7 +334,7 @@ class Config(object):
def _setifok_messaging(self, s):
s = str(s)
if s not in ("fedmsg", "amq"):
if s not in ("fedmsg", "amq", "in_memory"):
raise ValueError("Unsupported messaging system.")
self.messaging = s

View File

@@ -117,6 +117,7 @@ def cleardb():
@manager.command
def build_module_locally(url):
conf.set_item("system", "mock")
conf.set_item("messaging", "in_memory")
# Use our own local SQLite3 database.
confdir = os.path.abspath(os.path.dirname(__file__))

View File

@@ -33,7 +33,7 @@ except ImportError:
from funcsigs import signature
from module_build_service import log
import queue
class BaseMessage(object):
def __init__(self, msg_id):
@@ -330,6 +330,39 @@ def _amq_publish(topic, msg, conf, service):
msngr.put(message)
msngr.send()
# Queue for "in_memory" messaging.
_in_memory_work_queue = queue.Queue()
# Message id for "in_memory" messaging.
_in_memory_msg_id = 0
def _in_memory_publish(topic, msg, conf, service):
"""
Puts the message to _in_memory_work_queue".
"""
# Increment the message ID.
global _in_memory_msg_id
_in_memory_msg_id += 1
# Create fake fedmsg from the message so we can reuse
# the BaseMessage.from_fedmsg code to get the particular BaseMessage
# class instance.
topic = service + "." + topic
wrapped_msg = {}
wrapped_msg["msg_id"] = str(_in_memory_msg_id)
wrapped_msg["msg"] = msg
wrapped_msg = BaseMessage.from_fedmsg(topic, wrapped_msg)
# Put the message to queue.
_in_memory_work_queue.put(wrapped_msg)
def _in_memory_listen(conf, **kwargs):
"""
Yields the message from the _in_memory_work_queue when ready.
"""
while True:
yield _in_memory_work_queue.get(True)
_messaging_backends = {
'fedmsg': {
@@ -340,4 +373,8 @@ _messaging_backends = {
'publish': _amq_publish,
'listen': _amq_listen,
},
'in_memory': {
'publish': _in_memory_publish,
'listen': _in_memory_listen,
},
}