diff --git a/rida/scheduler/main.py b/rida/scheduler/main.py old mode 100755 new mode 100644 index 96294669..57c4bc52 --- a/rida/scheduler/main.py +++ b/rida/scheduler/main.py @@ -29,6 +29,8 @@ This is the main component of the orchestrator and is responsible for proper scheduling component builds in the supported build systems. """ + +import inspect import logging import os import threading @@ -36,6 +38,8 @@ import threading import rida.config import rida.messaging +import koji + log = logging.getLogger() # TODO: Load the config file from environment @@ -47,22 +51,50 @@ config = rida.config.from_file("rida.conf") # TODO: Set the build state to failed if the module build fails. class Messaging(threading.Thread): + on_build_change = { + koji.BUILD_STATES["BUILDING"]: lambda x: x + } + on_module_change = { + rida.BUILD_STATES["new"]: rida.scheduler.handlers.modules.new, + } + def sanity_check(self): + """ On startup, make sure our implementation is sane. """ + # Ensure we have every state covered + for state in rida.BUILD_STATES: + if state not in self.on_module_change: + raise KeyError("Module build states %r not handled." % state) + for state in koji.BUILD_STATES: + if state not in self.on_build_change: + raise KeyError("Koji build states %r not handled." % state) + + all_fns = self.on_build_change.items() + self.on_module_change.items() + for key, callback in all_fns: + expected = ['conf', 'db', 'msg'] + argspec = inspect.getargspec(callback) + if argspec != expected: + raise ValueError("Callback %r, state %r has argspec %r!=%r" % ( + callback, key, argspec, expected)) + def run(self): - # TODO: Listen for bus messages from rida about module builds - # entering the wait state - # TODO: Listen for bus messages from the buildsystem about - # component builds changing state + self.sanity_check() # TODO: Check for modules that can be set to done/failed # TODO: Act on these things somehow # TODO: Emit messages about doing so for msg in rida.messaging.listen(backend=config.messaging): log.debug("Saw %r, %r" % (msg['msg_id'], msg['topic'])) + + # Choose a handler for this message if '.buildsys.build.state.change' in msg['topic']: - self.handle_build_change(msg) + handler = self.on_build_change[msg['msg']['new']] elif '.rida.module.state.change' in msg['topic']: - self.handle_module_change(msg) + handler = self.on_module_change[msg['msg']['state']] else: - pass + log.debug("Unhandled message...") + continue + + # Execute our chosen handler + with rida.Database(config) as session: + handler(config, session, msg) class Polling(threading.Thread): def run(self):