Get the basic framework for local module builds working again with hub/consumer.

This commit is contained in:
Ralph Bean
2016-12-14 14:41:08 -05:00
committed by Matt Prahl
parent c4091aff92
commit f11caf32e5
3 changed files with 114 additions and 24 deletions

View File

@@ -55,6 +55,7 @@ from module_build_service.models import ModuleBuild
import module_build_service.scm
import module_build_service.utils
import module_build_service.scheduler
import module_build_service.scheduler.consumer
logging.basicConfig(level=logging.DEBUG)
@@ -1165,7 +1166,7 @@ $repos
msg_id='a faked internal message',
repo_tag=self.tag_name + "-build",
)
module_build_service.scheduler.work_queue.put(msg)
module_build_service.scheduler.consumer.work_queue_put(msg)
def _send_build_change(self, state, source, build_id):
nvr = kobo.rpmlib.parse_nvr(source)
@@ -1181,7 +1182,7 @@ $repos
build_release=nvr["release"],
build_version=nvr["version"]
)
module_build_service.scheduler.main.outgoing_work_queue_put(msg)
module_build_service.scheduler.consumer.work_queue_put(msg)
def _save_log(self, log_name, artifact_name):
old_log = os.path.join(self.resultsdir, log_name)

View File

@@ -29,6 +29,11 @@ import ssl
from shutil import rmtree
import getpass
import fedmsg.config
import moksha.hub
import moksha.hub.hub
import moksha.hub.reactor
from module_build_service import app, conf, db
from module_build_service import models
from module_build_service.pdc import (
@@ -40,6 +45,7 @@ from module_build_service.utils import (
)
from module_build_service.messaging import RidaModule
import module_build_service.messaging
import module_build_service.scheduler.consumer
manager = Manager(app)
@@ -144,10 +150,54 @@ def build_module_locally(url):
username = getpass.getuser()
submit_module_build(username, url, allow_local_url=True)
# TODO: Ralph to the rescue
# msgs = []
# msgs.append(RidaModule("local module build", 2, 1))
# module_build_service.scheduler.main.main(msgs, True)
def stop_condition(message):
# XXX - We ignore the message here and instead just query the DB.
# Grab the latest module build.
module = db.session.query(models.ModuleBuild)\
.order_by(models.ModuleBuild.id.desc())\
.first()
done = (
module_build_service.models.BUILD_STATES["failed"],
module_build_service.models.BUILD_STATES["ready"],
# XXX should this one be removed?
module_build_service.models.BUILD_STATES["done"],
)
return module.state in done
config = fedmsg.config.load_config()
config['mbsconsumer'] = True
config['mbsconsumer.stopcondition'] = stop_condition
config['mbsconsumer.initial_messages'] = [
RidaModule("local module build", 2, 1)
]
consumers = [module_build_service.scheduler.consumer.MBSConsumer]
# Rephrase the fedmsg-config.py config as moksha *.ini format for
# zeromq. If we're not using zeromq (say, we're using STOMP), then just
# assume that the moksha configuration is specified correctly already
# in /etc/fedmsg.d/
if config.get('zmq_enabled', True):
moksha_options = dict(
# XXX - replace this with a /dev/null endpoint.
zmq_subscribe_endpoints=','.join(
','.join(bunch) for bunch in config['endpoints'].values()
),
)
config.update(moksha_options)
# Note that the hub we kick off here cannot send any message. You
# should use fedmsg.publish(...) still for that.
from moksha.hub import main
main(
# Pass in our config dict
options=config,
# Only run the specified consumers if any are so specified.
consumers=consumers,
# Tell moksha to quiet its logging.
framework=False,
)
@manager.command

View File

@@ -28,6 +28,7 @@ to use.
import koji
import inspect
import fedmsg.consumers
import moksha.hub
from module_build_service.utils import module_build_state_from_msg
import module_build_service.messaging
@@ -45,11 +46,15 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
topic = '*'
config_key = 'mbsconsumer'
def __init__(self, hub, initial_msgs=[]):
def __init__(self, hub):
super(MBSConsumer, self).__init__(hub)
for msg in initial_msgs:
work_queue.put(msg)
# These two values are typically provided either by the unit tests or
# by the local build command. They are empty in the production environ
self.stop_condition = hub.config.get('mbsconsumer.stop_condition')
initial_messages = hub.config.get('mbsconsumer.initial_messages', [])
for msg in initial_messages:
self.incoming.put(msg)
# These are our main lookup tables for figuring out what to run in
# response to what messaging events.
@@ -80,19 +85,35 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
self.on_repo_change = module_build_service.scheduler.handlers.repos.done
self.sanity_check()
def shutdown(self):
log.info("Shutting down..")
self.hub.stop()
from moksha.hub.reactor import reactor
reactor.callFromThread(reactor.stop)
def consume(self, message):
# Add the message to the work queue
work_queue.put(self.get_abstracted_msg(message['body']))
# Process all the messages in the work queue
while not work_queue.empty():
msg = work_queue.get()
try:
with models.make_session(conf) as session:
self.process_message(session, msg)
except Exception:
log.exception('Failed while handling {0!r}'.format(
msg.msg_id))
log.info(msg)
log.info("Received %r" % message)
if self.stop_condition and self.stop_condition(message):
return self.shutdown()
# Sometimes, the messages put into our queue are artificially put there
# by other parts of our own codebase. If they are already abstracted
# messages, then just use them as-is. If they are not already
# instances of our message abstraction base class, then first transform
# them before proceeding.
if isinstance(message, module_build_service.messaging.BaseMessage):
msg = message
else:
msg = self.get_abstracted_msg(message['body'])
# Primary work is done here.
try:
with models.make_session(conf) as session:
self.process_message(session, msg)
except Exception:
log.exception('Failed while handling {0!r}'.format(msg.msg_id))
log.info(msg)
def get_abstracted_msg(self, message):
# Convert the message to an abstracted message
@@ -142,8 +163,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
return
# Execute our chosen handler
idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__,
msg.msg_id)
idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__, msg.msg_id)
if handler is self.NO_OP:
log.debug("Handler is NO_OP: %s" % idx)
else:
@@ -159,4 +179,23 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
# was submitted for real and koji announced its completion.
for event in further_work:
log.info(" Scheduling faked event %r" % event)
work_queue.put(event)
self.incoming.put(event)
def get_global_consumer():
""" Return a handle to the active consumer object, if it exists. """
hub = moksha.hub._hub
if not hub:
raise ValueError("No global moksha-hub obj found.")
for consumer in hub.consumers:
if isinstance(consumer, MBSConsumer):
return consumer
raise ValueError("No MBSConsumer found.")
def work_queue_put(msg):
""" Artificially put a message into the work queue of the consumer. """
consumer = get_global_consumer()
consumer.incoming.put(msg)