diff --git a/rida/scheduler/main.py b/rida/scheduler/main.py index ea0cf1eb..f46459be 100644 --- a/rida/scheduler/main.py +++ b/rida/scheduler/main.py @@ -34,12 +34,14 @@ import inspect import logging import os import threading +import time import rida.config -import rida.logging +import rida.logger import rida.messaging +import rida.scheduler.handlers.components import rida.scheduler.handlers.modules -#import rida.scheduler.handlers.builds +import rida.scheduler.handlers.repos import sys import koji @@ -62,7 +64,6 @@ else: # TODO: Set the build state to failed if the module build fails. def module_build_state_from_msg(msg): - state = int(msg['msg']['state']) # TODO better handling assert state in rida.BUILD_STATES.values(), "state=%s(%s) is not in %s" % (state, type(state), rida.BUILD_STATES.values()) @@ -70,16 +71,19 @@ def module_build_state_from_msg(msg): class Messaging(threading.Thread): - # These are our main lookup tables for figuring out what to run in response - # to what messaging events. - on_build_change = { - koji.BUILD_STATES["BUILDING"]: lambda x: x - } - on_module_change = { - rida.BUILD_STATES["wait"]: rida.scheduler.handlers.modules.wait, - } - # Only one kind of repo change event... - on_repo_change = rida.scheduler.handlers.repos.done, + def __init__(self, *args, **kwargs): + super(Messaging, self).__init__(*args, **kwargs) + + # These are our main lookup tables for figuring out what to run in response + # to what messaging events. + self.on_build_change = { + koji.BUILD_STATES["BUILDING"]: lambda x: x, + } + self.on_module_change = { + rida.BUILD_STATES["wait"]: rida.scheduler.handlers.modules.wait, + } + # Only one kind of repo change event, though... + self.on_repo_change = rida.scheduler.handlers.repos.done def sanity_check(self): """ On startup, make sure our implementation is sane. """ @@ -105,8 +109,7 @@ class Messaging(threading.Thread): # TODO: Act on these things somehow # TODO: Emit messages about doing so for msg in rida.messaging.listen(backend=config.messaging): - log.debug("Saw %r, %r" % (msg['msg_id'], msg['topic'])) - log.debug(msg) + log.debug("received %r, %r" % (msg['msg_id'], msg['topic'])) # Choose a handler for this message if '.buildsys.repo.done' in msg['topic']: @@ -121,22 +124,43 @@ class Messaging(threading.Thread): # Execute our chosen handler with rida.database.Database(config) as session: + log.info("Executing handler %r" % handler) handler(config, session, msg) class Polling(threading.Thread): def run(self): while True: - # TODO: Check for module builds in the wait state - # TODO: Check component builds in the open state - # TODO: Check for modules that can be set to done/failed - # TODO: Act on these things somehow - # TODO: Emit messages about doing so - # TODO: Sleep for a configuration-determined interval - pass + log.info("Polling thread sleeping, %rs" % config.polling_interval) + time.sleep(config.polling_interval) + with rida.database.Database(config) as session: + self.process_waiting_module_builds(session) + with rida.database.Database(config) as session: + self.process_open_component_builds(session) + with rida.database.Database(config) as session: + self.process_lingering_module_builds(session) + + def process_waiting_module_builds(self, session): + log.info("Looking for module builds stuck in the wait state.") + builds = rida.database.ModuleBuild.by_state(session, "wait") + # TODO -- do throttling calculation here... + log.info(" %r module builds in the wait state..." % len(builds)) + for build in builds: + # Fake a message to kickstart the build anew + msg = { + 'topic': '.module.build.state.change', + 'msg': build.json(), + } + rida.scheduler.handlers.modules.wait(config, session, msg) + + def process_open_component_builds(self, session): + log.warning("process_open_component_builds is not yet implemented...") + + def process_lingering_module_builds(self, session): + log.warning("process_lingering_module_builds is not yet implemented...") def main(): - rida.logging.init_logging(config) + rida.logger.init_logging(config) log.info("Starting ridad.") try: messaging_thread = Messaging()