diff --git a/module_build_service/builder.py b/module_build_service/builder.py index 9c1941cd..10f6e770 100644 --- a/module_build_service/builder.py +++ b/module_build_service/builder.py @@ -55,6 +55,7 @@ from module_build_service.models import ModuleBuild import module_build_service.scm import module_build_service.utils import module_build_service.scheduler +import module_build_service.scheduler.consumer logging.basicConfig(level=logging.DEBUG) @@ -1165,7 +1166,7 @@ $repos msg_id='a faked internal message', repo_tag=self.tag_name + "-build", ) - module_build_service.scheduler.work_queue.put(msg) + module_build_service.scheduler.consumer.work_queue_put(msg) def _send_build_change(self, state, source, build_id): nvr = kobo.rpmlib.parse_nvr(source) @@ -1181,7 +1182,7 @@ $repos build_release=nvr["release"], build_version=nvr["version"] ) - module_build_service.scheduler.main.outgoing_work_queue_put(msg) + module_build_service.scheduler.consumer.work_queue_put(msg) def _save_log(self, log_name, artifact_name): old_log = os.path.join(self.resultsdir, log_name) diff --git a/module_build_service/manage.py b/module_build_service/manage.py index 8fd9fec1..c93303d8 100644 --- a/module_build_service/manage.py +++ b/module_build_service/manage.py @@ -29,6 +29,11 @@ import ssl from shutil import rmtree import getpass +import fedmsg.config +import moksha.hub +import moksha.hub.hub +import moksha.hub.reactor + from module_build_service import app, conf, db from module_build_service import models from module_build_service.pdc import ( @@ -40,6 +45,7 @@ from module_build_service.utils import ( ) from module_build_service.messaging import RidaModule import module_build_service.messaging +import module_build_service.scheduler.consumer manager = Manager(app) @@ -144,10 +150,54 @@ def build_module_locally(url): username = getpass.getuser() submit_module_build(username, url, allow_local_url=True) - # TODO: Ralph to the rescue - # msgs = [] - # msgs.append(RidaModule("local module build", 2, 1)) - # module_build_service.scheduler.main.main(msgs, True) + def stop_condition(message): + # XXX - We ignore the message here and instead just query the DB. + + # Grab the latest module build. + module = db.session.query(models.ModuleBuild)\ + .order_by(models.ModuleBuild.id.desc())\ + .first() + done = ( + module_build_service.models.BUILD_STATES["failed"], + module_build_service.models.BUILD_STATES["ready"], + # XXX should this one be removed? + module_build_service.models.BUILD_STATES["done"], + ) + return module.state in done + + config = fedmsg.config.load_config() + config['mbsconsumer'] = True + config['mbsconsumer.stopcondition'] = stop_condition + config['mbsconsumer.initial_messages'] = [ + RidaModule("local module build", 2, 1) + ] + + consumers = [module_build_service.scheduler.consumer.MBSConsumer] + + # Rephrase the fedmsg-config.py config as moksha *.ini format for + # zeromq. If we're not using zeromq (say, we're using STOMP), then just + # assume that the moksha configuration is specified correctly already + # in /etc/fedmsg.d/ + if config.get('zmq_enabled', True): + moksha_options = dict( + # XXX - replace this with a /dev/null endpoint. + zmq_subscribe_endpoints=','.join( + ','.join(bunch) for bunch in config['endpoints'].values() + ), + ) + config.update(moksha_options) + + # Note that the hub we kick off here cannot send any message. You + # should use fedmsg.publish(...) still for that. + from moksha.hub import main + main( + # Pass in our config dict + options=config, + # Only run the specified consumers if any are so specified. + consumers=consumers, + # Tell moksha to quiet its logging. + framework=False, + ) @manager.command diff --git a/module_build_service/scheduler/consumer.py b/module_build_service/scheduler/consumer.py index 761c20d2..d3485f6b 100644 --- a/module_build_service/scheduler/consumer.py +++ b/module_build_service/scheduler/consumer.py @@ -28,6 +28,7 @@ to use. import koji import inspect import fedmsg.consumers +import moksha.hub from module_build_service.utils import module_build_state_from_msg import module_build_service.messaging @@ -45,11 +46,15 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): topic = '*' config_key = 'mbsconsumer' - def __init__(self, hub, initial_msgs=[]): + def __init__(self, hub): super(MBSConsumer, self).__init__(hub) - for msg in initial_msgs: - work_queue.put(msg) + # These two values are typically provided either by the unit tests or + # by the local build command. They are empty in the production environ + self.stop_condition = hub.config.get('mbsconsumer.stop_condition') + initial_messages = hub.config.get('mbsconsumer.initial_messages', []) + for msg in initial_messages: + self.incoming.put(msg) # These are our main lookup tables for figuring out what to run in # response to what messaging events. @@ -80,19 +85,35 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): self.on_repo_change = module_build_service.scheduler.handlers.repos.done self.sanity_check() + def shutdown(self): + log.info("Shutting down..") + self.hub.stop() + from moksha.hub.reactor import reactor + reactor.callFromThread(reactor.stop) + def consume(self, message): - # Add the message to the work queue - work_queue.put(self.get_abstracted_msg(message['body'])) - # Process all the messages in the work queue - while not work_queue.empty(): - msg = work_queue.get() - try: - with models.make_session(conf) as session: - self.process_message(session, msg) - except Exception: - log.exception('Failed while handling {0!r}'.format( - msg.msg_id)) - log.info(msg) + log.info("Received %r" % message) + + if self.stop_condition and self.stop_condition(message): + return self.shutdown() + + # Sometimes, the messages put into our queue are artificially put there + # by other parts of our own codebase. If they are already abstracted + # messages, then just use them as-is. If they are not already + # instances of our message abstraction base class, then first transform + # them before proceeding. + if isinstance(message, module_build_service.messaging.BaseMessage): + msg = message + else: + msg = self.get_abstracted_msg(message['body']) + + # Primary work is done here. + try: + with models.make_session(conf) as session: + self.process_message(session, msg) + except Exception: + log.exception('Failed while handling {0!r}'.format(msg.msg_id)) + log.info(msg) def get_abstracted_msg(self, message): # Convert the message to an abstracted message @@ -142,8 +163,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): return # Execute our chosen handler - idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__, - msg.msg_id) + idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__, msg.msg_id) if handler is self.NO_OP: log.debug("Handler is NO_OP: %s" % idx) else: @@ -159,4 +179,23 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): # was submitted for real and koji announced its completion. for event in further_work: log.info(" Scheduling faked event %r" % event) - work_queue.put(event) + self.incoming.put(event) + + +def get_global_consumer(): + """ Return a handle to the active consumer object, if it exists. """ + hub = moksha.hub._hub + if not hub: + raise ValueError("No global moksha-hub obj found.") + + for consumer in hub.consumers: + if isinstance(consumer, MBSConsumer): + return consumer + + raise ValueError("No MBSConsumer found.") + + +def work_queue_put(msg): + """ Artificially put a message into the work queue of the consumer. """ + consumer = get_global_consumer() + consumer.incoming.put(msg)