diff --git a/conf/config.py b/conf/config.py index 319667b1..6182d9a2 100644 --- a/conf/config.py +++ b/conf/config.py @@ -125,6 +125,7 @@ class TestConfiguration(BaseConfiguration): LOG_LEVEL = 'debug' SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:' DEBUG = True + MESSAGING = 'in_memory' # Global network-related values, in seconds NET_TIMEOUT = 3 diff --git a/module_build_service/builder.py b/module_build_service/builder.py index ed431a5f..efb83fd2 100644 --- a/module_build_service/builder.py +++ b/module_build_service/builder.py @@ -1149,7 +1149,7 @@ $repos # what RPMs are output of particular SRPM build yet. for artifact in artifacts: if artifact.startswith("module-build-macros"): - self._execute_cmd(["mock", "-r", self.mock_config, "-i", + _execute_cmd(["mock", "-r", self.mock_config, "-i", "module-build-macros"]) def buildroot_add_repos(self, dependencies): diff --git a/module_build_service/config.py b/module_build_service/config.py index 60a4f68a..3a08991c 100644 --- a/module_build_service/config.py +++ b/module_build_service/config.py @@ -334,7 +334,7 @@ class Config(object): def _setifok_messaging(self, s): s = str(s) - if s not in ("fedmsg", "amq"): + if s not in ("fedmsg", "amq", "in_memory"): raise ValueError("Unsupported messaging system.") self.messaging = s diff --git a/module_build_service/manage.py b/module_build_service/manage.py index 18215ac3..3c8c1233 100644 --- a/module_build_service/manage.py +++ b/module_build_service/manage.py @@ -117,6 +117,7 @@ def cleardb(): @manager.command def build_module_locally(url): conf.set_item("system", "mock") + conf.set_item("messaging", "in_memory") # Use our own local SQLite3 database. confdir = os.path.abspath(os.path.dirname(__file__)) diff --git a/module_build_service/messaging.py b/module_build_service/messaging.py index 3e2e1dad..b8fc6449 100644 --- a/module_build_service/messaging.py +++ b/module_build_service/messaging.py @@ -33,7 +33,7 @@ except ImportError: from funcsigs import signature from module_build_service import log - +import queue class BaseMessage(object): def __init__(self, msg_id): @@ -330,6 +330,39 @@ def _amq_publish(topic, msg, conf, service): msngr.put(message) msngr.send() +# Queue for "in_memory" messaging. +_in_memory_work_queue = queue.Queue() + +# Message id for "in_memory" messaging. +_in_memory_msg_id = 0 + +def _in_memory_publish(topic, msg, conf, service): + """ + Puts the message to _in_memory_work_queue". + """ + + # Increment the message ID. + global _in_memory_msg_id + _in_memory_msg_id += 1 + + # Create fake fedmsg from the message so we can reuse + # the BaseMessage.from_fedmsg code to get the particular BaseMessage + # class instance. + topic = service + "." + topic + wrapped_msg = {} + wrapped_msg["msg_id"] = str(_in_memory_msg_id) + wrapped_msg["msg"] = msg + wrapped_msg = BaseMessage.from_fedmsg(topic, wrapped_msg) + + # Put the message to queue. + _in_memory_work_queue.put(wrapped_msg) + +def _in_memory_listen(conf, **kwargs): + """ + Yields the message from the _in_memory_work_queue when ready. + """ + while True: + yield _in_memory_work_queue.get(True) _messaging_backends = { 'fedmsg': { @@ -340,4 +373,8 @@ _messaging_backends = { 'publish': _amq_publish, 'listen': _amq_listen, }, + 'in_memory': { + 'publish': _in_memory_publish, + 'listen': _in_memory_listen, + }, }