mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-02-07 23:33:19 +08:00
Some start to the polling thread.
This commit is contained in:
@@ -34,12 +34,14 @@ import inspect
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
import rida.config
|
||||
import rida.logging
|
||||
import rida.logger
|
||||
import rida.messaging
|
||||
import rida.scheduler.handlers.components
|
||||
import rida.scheduler.handlers.modules
|
||||
#import rida.scheduler.handlers.builds
|
||||
import rida.scheduler.handlers.repos
|
||||
import sys
|
||||
|
||||
import koji
|
||||
@@ -62,7 +64,6 @@ else:
|
||||
# TODO: Set the build state to failed if the module build fails.
|
||||
|
||||
def module_build_state_from_msg(msg):
|
||||
|
||||
state = int(msg['msg']['state'])
|
||||
# TODO better handling
|
||||
assert state in rida.BUILD_STATES.values(), "state=%s(%s) is not in %s" % (state, type(state), rida.BUILD_STATES.values())
|
||||
@@ -70,16 +71,19 @@ def module_build_state_from_msg(msg):
|
||||
|
||||
class Messaging(threading.Thread):
|
||||
|
||||
# These are our main lookup tables for figuring out what to run in response
|
||||
# to what messaging events.
|
||||
on_build_change = {
|
||||
koji.BUILD_STATES["BUILDING"]: lambda x: x
|
||||
}
|
||||
on_module_change = {
|
||||
rida.BUILD_STATES["wait"]: rida.scheduler.handlers.modules.wait,
|
||||
}
|
||||
# Only one kind of repo change event...
|
||||
on_repo_change = rida.scheduler.handlers.repos.done,
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Messaging, self).__init__(*args, **kwargs)
|
||||
|
||||
# These are our main lookup tables for figuring out what to run in response
|
||||
# to what messaging events.
|
||||
self.on_build_change = {
|
||||
koji.BUILD_STATES["BUILDING"]: lambda x: x,
|
||||
}
|
||||
self.on_module_change = {
|
||||
rida.BUILD_STATES["wait"]: rida.scheduler.handlers.modules.wait,
|
||||
}
|
||||
# Only one kind of repo change event, though...
|
||||
self.on_repo_change = rida.scheduler.handlers.repos.done
|
||||
|
||||
def sanity_check(self):
|
||||
""" On startup, make sure our implementation is sane. """
|
||||
@@ -105,8 +109,7 @@ class Messaging(threading.Thread):
|
||||
# TODO: Act on these things somehow
|
||||
# TODO: Emit messages about doing so
|
||||
for msg in rida.messaging.listen(backend=config.messaging):
|
||||
log.debug("Saw %r, %r" % (msg['msg_id'], msg['topic']))
|
||||
log.debug(msg)
|
||||
log.debug("received %r, %r" % (msg['msg_id'], msg['topic']))
|
||||
|
||||
# Choose a handler for this message
|
||||
if '.buildsys.repo.done' in msg['topic']:
|
||||
@@ -121,22 +124,43 @@ class Messaging(threading.Thread):
|
||||
|
||||
# Execute our chosen handler
|
||||
with rida.database.Database(config) as session:
|
||||
log.info("Executing handler %r" % handler)
|
||||
handler(config, session, msg)
|
||||
|
||||
class Polling(threading.Thread):
|
||||
def run(self):
|
||||
while True:
|
||||
# TODO: Check for module builds in the wait state
|
||||
# TODO: Check component builds in the open state
|
||||
# TODO: Check for modules that can be set to done/failed
|
||||
# TODO: Act on these things somehow
|
||||
# TODO: Emit messages about doing so
|
||||
# TODO: Sleep for a configuration-determined interval
|
||||
pass
|
||||
log.info("Polling thread sleeping, %rs" % config.polling_interval)
|
||||
time.sleep(config.polling_interval)
|
||||
with rida.database.Database(config) as session:
|
||||
self.process_waiting_module_builds(session)
|
||||
with rida.database.Database(config) as session:
|
||||
self.process_open_component_builds(session)
|
||||
with rida.database.Database(config) as session:
|
||||
self.process_lingering_module_builds(session)
|
||||
|
||||
def process_waiting_module_builds(self, session):
|
||||
log.info("Looking for module builds stuck in the wait state.")
|
||||
builds = rida.database.ModuleBuild.by_state(session, "wait")
|
||||
# TODO -- do throttling calculation here...
|
||||
log.info(" %r module builds in the wait state..." % len(builds))
|
||||
for build in builds:
|
||||
# Fake a message to kickstart the build anew
|
||||
msg = {
|
||||
'topic': '.module.build.state.change',
|
||||
'msg': build.json(),
|
||||
}
|
||||
rida.scheduler.handlers.modules.wait(config, session, msg)
|
||||
|
||||
def process_open_component_builds(self, session):
|
||||
log.warning("process_open_component_builds is not yet implemented...")
|
||||
|
||||
def process_lingering_module_builds(self, session):
|
||||
log.warning("process_lingering_module_builds is not yet implemented...")
|
||||
|
||||
|
||||
def main():
|
||||
rida.logging.init_logging(config)
|
||||
rida.logger.init_logging(config)
|
||||
log.info("Starting ridad.")
|
||||
try:
|
||||
messaging_thread = Messaging()
|
||||
|
||||
Reference in New Issue
Block a user