mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-05 11:48:33 +08:00
Some structure for event handling in ridad.py.
This commit is contained in:
46
rida/scheduler/main.py
Executable file → Normal file
46
rida/scheduler/main.py
Executable file → Normal file
@@ -29,6 +29,8 @@ This is the main component of the orchestrator and is responsible for
|
||||
proper scheduling component builds in the supported build systems.
|
||||
"""
|
||||
|
||||
|
||||
import inspect
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
@@ -36,6 +38,8 @@ import threading
|
||||
import rida.config
|
||||
import rida.messaging
|
||||
|
||||
import koji
|
||||
|
||||
log = logging.getLogger()
|
||||
|
||||
# TODO: Load the config file from environment
|
||||
@@ -47,22 +51,50 @@ config = rida.config.from_file("rida.conf")
|
||||
# TODO: Set the build state to failed if the module build fails.
|
||||
|
||||
class Messaging(threading.Thread):
|
||||
on_build_change = {
|
||||
koji.BUILD_STATES["BUILDING"]: lambda x: x
|
||||
}
|
||||
on_module_change = {
|
||||
rida.BUILD_STATES["new"]: rida.scheduler.handlers.modules.new,
|
||||
}
|
||||
def sanity_check(self):
|
||||
""" On startup, make sure our implementation is sane. """
|
||||
# Ensure we have every state covered
|
||||
for state in rida.BUILD_STATES:
|
||||
if state not in self.on_module_change:
|
||||
raise KeyError("Module build states %r not handled." % state)
|
||||
for state in koji.BUILD_STATES:
|
||||
if state not in self.on_build_change:
|
||||
raise KeyError("Koji build states %r not handled." % state)
|
||||
|
||||
all_fns = self.on_build_change.items() + self.on_module_change.items()
|
||||
for key, callback in all_fns:
|
||||
expected = ['conf', 'db', 'msg']
|
||||
argspec = inspect.getargspec(callback)
|
||||
if argspec != expected:
|
||||
raise ValueError("Callback %r, state %r has argspec %r!=%r" % (
|
||||
callback, key, argspec, expected))
|
||||
|
||||
def run(self):
|
||||
# TODO: Listen for bus messages from rida about module builds
|
||||
# entering the wait state
|
||||
# TODO: Listen for bus messages from the buildsystem about
|
||||
# component builds changing state
|
||||
self.sanity_check()
|
||||
# TODO: Check for modules that can be set to done/failed
|
||||
# TODO: Act on these things somehow
|
||||
# TODO: Emit messages about doing so
|
||||
for msg in rida.messaging.listen(backend=config.messaging):
|
||||
log.debug("Saw %r, %r" % (msg['msg_id'], msg['topic']))
|
||||
|
||||
# Choose a handler for this message
|
||||
if '.buildsys.build.state.change' in msg['topic']:
|
||||
self.handle_build_change(msg)
|
||||
handler = self.on_build_change[msg['msg']['new']]
|
||||
elif '.rida.module.state.change' in msg['topic']:
|
||||
self.handle_module_change(msg)
|
||||
handler = self.on_module_change[msg['msg']['state']]
|
||||
else:
|
||||
pass
|
||||
log.debug("Unhandled message...")
|
||||
continue
|
||||
|
||||
# Execute our chosen handler
|
||||
with rida.Database(config) as session:
|
||||
handler(config, session, msg)
|
||||
|
||||
class Polling(threading.Thread):
|
||||
def run(self):
|
||||
|
||||
Reference in New Issue
Block a user