mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-05 03:38:12 +08:00
Remove in-memory messaging
This commit is contained in:
@@ -134,7 +134,6 @@ class TestConfiguration(BaseConfiguration):
|
||||
SQLALCHEMY_DATABASE_URI = 'sqlite:///{0}'.format(
|
||||
path.join(dbdir, 'tests', 'test_module_build_service.db'))
|
||||
DEBUG = True
|
||||
MESSAGING = 'in_memory'
|
||||
|
||||
# Global network-related values, in seconds
|
||||
NET_TIMEOUT = 3
|
||||
|
||||
@@ -344,7 +344,7 @@ class Config(object):
|
||||
|
||||
def _setifok_messaging(self, s):
|
||||
s = str(s)
|
||||
if s not in ("fedmsg", "amq", "in_memory"):
|
||||
if s not in ("fedmsg", "amq"):
|
||||
raise ValueError("Unsupported messaging system.")
|
||||
self.messaging = s
|
||||
|
||||
|
||||
@@ -118,7 +118,6 @@ def cleardb():
|
||||
@manager.command
|
||||
def build_module_locally(url):
|
||||
conf.set_item("system", "mock")
|
||||
conf.set_item("messaging", "in_memory")
|
||||
|
||||
# Use our own local SQLite3 database.
|
||||
confdir = os.path.abspath(os.path.dirname(__file__))
|
||||
@@ -294,8 +293,6 @@ def runssl(host=conf.host, port=conf.port, debug=conf.debug):
|
||||
"""
|
||||
logging.info('Starting Module Build Service frontend')
|
||||
|
||||
module_build_service.messaging.init(conf)
|
||||
|
||||
ssl_ctx = _establish_ssl_context()
|
||||
app.run(
|
||||
host=host,
|
||||
|
||||
@@ -228,18 +228,6 @@ class RidaModule(BaseMessage):
|
||||
self.module_build_id = module_build_id
|
||||
self.module_build_state = module_build_state
|
||||
|
||||
def init(conf, **kwargs):
|
||||
"""
|
||||
Initialize the messaging backend.
|
||||
:param conf: a Config object from the class in config.py
|
||||
:param kwargs: any additional arguments to pass to the backend handler
|
||||
"""
|
||||
try:
|
||||
handler = _messaging_backends[conf.messaging]['init']
|
||||
except KeyError:
|
||||
raise KeyError("No messaging backend found for %r" % conf.messaging)
|
||||
return handler(conf, **kwargs)
|
||||
|
||||
def publish(topic, msg, conf, service):
|
||||
"""
|
||||
Publish a single message to a given backend, and return
|
||||
@@ -301,68 +289,12 @@ def _amq_publish(topic, msg, conf, service):
|
||||
msngr.put(message)
|
||||
msngr.send()
|
||||
|
||||
# Queue for "in_memory" messaging.
|
||||
_in_memory_work_queue = queue.Queue()
|
||||
|
||||
# Message id for "in_memory" messaging.
|
||||
_in_memory_msg_id = 0
|
||||
|
||||
def _in_memory_init(conf, **kwargs):
|
||||
"""
|
||||
Initializes the In Memory messaging backend.
|
||||
"""
|
||||
global _in_memory_work_queue
|
||||
global _in_memory_msg_id
|
||||
_in_memory_msg_id = 0
|
||||
_in_memory_work_queue = queue.Queue()
|
||||
|
||||
# TODO: Ralph to the rescue
|
||||
def _in_memory_publish(topic, msg, conf, service):
|
||||
"""
|
||||
Puts the message to _in_memory_work_queue".
|
||||
"""
|
||||
|
||||
# Increment the message ID.
|
||||
global _in_memory_msg_id
|
||||
_in_memory_msg_id += 1
|
||||
|
||||
# Create fake fedmsg from the message so we can reuse
|
||||
# the BaseMessage.from_fedmsg code to get the particular BaseMessage
|
||||
# class instance.
|
||||
topic = service + "." + topic
|
||||
wrapped_msg = {}
|
||||
wrapped_msg["msg_id"] = str(_in_memory_msg_id)
|
||||
wrapped_msg["msg"] = msg
|
||||
wrapped_msg = BaseMessage.from_fedmsg(topic, wrapped_msg)
|
||||
|
||||
# Put the message to queue.
|
||||
_in_memory_work_queue.put(wrapped_msg)
|
||||
|
||||
# TODO: Ralph to the rescue
|
||||
def _in_memory_listen(conf, **kwargs):
|
||||
"""
|
||||
Yields the message from the _in_memory_work_queue when ready.
|
||||
"""
|
||||
while True:
|
||||
yield _in_memory_work_queue.get(True)
|
||||
|
||||
def _no_op(conf, **kwargs):
|
||||
"""
|
||||
No operation.
|
||||
"""
|
||||
pass
|
||||
|
||||
_messaging_backends = {
|
||||
'fedmsg': {
|
||||
'init': _no_op,
|
||||
'publish': _fedmsg_publish,
|
||||
},
|
||||
'amq': {
|
||||
'init': _no_op,
|
||||
'publish': _amq_publish,
|
||||
},
|
||||
'in_memory': {
|
||||
'init': _in_memory_init,
|
||||
'publish': _in_memory_publish,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user