From 23f057b75018c4eb45bda79140464f0eb44311b2 Mon Sep 17 00:00:00 2001 From: Matt Prahl Date: Wed, 21 Sep 2016 13:43:41 +0200 Subject: [PATCH] Use abstract message classes instead of legacy code --- rida/messaging.py | 7 +++- rida/models.py | 17 ++++++---- rida/scheduler/handlers/components.py | 6 ++-- rida/scheduler/handlers/modules.py | 8 ++--- rida/scheduler/handlers/repos.py | 2 +- rida/scheduler/main.py | 42 +++++++++++------------- tests/test_scheduler/test_module_wait.py | 12 +++---- tests/test_scheduler/test_repo_done.py | 13 +++----- 8 files changed, 55 insertions(+), 52 deletions(-) diff --git a/rida/messaging.py b/rida/messaging.py index 6d373171..8b54a2d6 100644 --- a/rida/messaging.py +++ b/rida/messaging.py @@ -176,9 +176,14 @@ def _fedmsg_publish(topic, msg, modname): def _fedmsg_listen(**kwargs): + """ + Parses a fedmsg event and constructs it into the appropriate message object + """ import fedmsg for name, endpoint, topic, msg in fedmsg.tail_messages(**kwargs): - yield msg + msg_obj = BaseMessage.from_fedmsg(topic, msg) + if msg_obj: + yield msg_obj _messaging_backends = { 'fedmsg': { diff --git a/rida/models.py b/rida/models.py index 6bb66ec6..c3fc9558 100644 --- a/rida/models.py +++ b/rida/models.py @@ -123,9 +123,12 @@ class ModuleBuild(RidaBase): @classmethod def from_module_event(cls, session, event): - if '.module.' not in event['topic']: - raise ValueError("%r is not a module message." % event['topic']) - return session.query(cls).filter(cls.id==event['msg']['id']).first() + if type(event) == rida.messaging.RidaModule: + return session.query(cls).filter( + cls.id == event.msg.module_build_id).first() + else: + raise ValueError("%r is not a module message." + % type(event).__name__) @classmethod def create(cls, session, conf, name, version, release, modulemd, scmurl, username): @@ -179,7 +182,7 @@ class ModuleBuild(RidaBase): There should be at most one. """ - tag = event['msg']['tag'].strip('-build') + tag = event.repo_tag.strip('-build') query = session.query(cls)\ .filter(cls.koji_tag==tag)\ .filter(cls.state==BUILD_STATES["build"]) @@ -276,9 +279,11 @@ class ComponentBuild(RidaBase): @classmethod def from_component_event(cls, session, event): - if 'component.state.change' not in event['topic'] and '.buildsys.build.state.change' not in event['topic']: + if type(event) == rida.messaging.KojiBuildChange: + return session.query(cls).filter( + cls.task_id == event.build_id).first() + else: raise ValueError("%r is not a koji message." % event['topic']) - return session.query(cls).filter(cls.task_id==event['msg']['task_id']).first() def json(self): retval = { diff --git a/rida/scheduler/handlers/components.py b/rida/scheduler/handlers/components.py index 9fb22f04..d391e584 100644 --- a/rida/scheduler/handlers/components.py +++ b/rida/scheduler/handlers/components.py @@ -41,7 +41,8 @@ def _finalize(config, session, msg, state): # First, find our ModuleBuild associated with this repo, if any. component_build = models.ComponentBuild.from_component_event(session, msg) try: - nvr = "{name}-{version}-{release}".format(**msg['msg']) + nvr = "{}-{}-{}".format(msg.build_name, msg.build_version, + msg.build_release) except KeyError: nvr = None @@ -72,7 +73,8 @@ def _finalize(config, session, msg, state): builder = rida.builder.KojiModuleBuilder(module_name, config, tag_name=tag) builder.buildroot_connect() # tag && add to srpm-build group - nvr = "{name}-{version}-{release}".format(**msg['msg']) + nvr = "{}-{}-{}".format(msg.build_name, msg.build_version, + msg.build_release) install = bool(component_build.package == 'module-build-macros') builder.buildroot_add_artifacts([nvr,], install=install) session.commit() diff --git a/rida/scheduler/handlers/modules.py b/rida/scheduler/handlers/modules.py index 14e496df..d23f56ab 100644 --- a/rida/scheduler/handlers/modules.py +++ b/rida/scheduler/handlers/modules.py @@ -52,10 +52,10 @@ def done(config, session, msg): """ build = models.ModuleBuild.from_module_event(db.session, msg) module_info = build.json() - if module_info['state'] != msg['msg']['state']: + if module_info['state'] != msg.module_build_state: log.warn("Note that retrieved module state %r " "doesn't match message module state %r" % ( - module_info['state'], msg['msg']['state'])) + module_info['state'], msg.module_build_state)) # This is ok.. it's a race condition we can ignore. pass @@ -75,10 +75,10 @@ def wait(config, session, msg): log.info("Found build=%r from message" % build) module_info = build.json() - if module_info['state'] != msg['msg']['state']: + if module_info['state'] != msg.module_build_state: log.warn("Note that retrieved module state %r " "doesn't match message module state %r" % ( - module_info['state'], msg['msg']['state'])) + module_info['state'], msg.module_build_state)) # This is ok.. it's a race condition we can ignore. pass diff --git a/rida/scheduler/handlers/repos.py b/rida/scheduler/handlers/repos.py index 100c055b..da6f8e45 100644 --- a/rida/scheduler/handlers/repos.py +++ b/rida/scheduler/handlers/repos.py @@ -36,7 +36,7 @@ def done(config, session, msg): """ Called whenever koji rebuilds a repo, any repo. """ # First, find our ModuleBuild associated with this repo, if any. - tag = msg['msg']['tag'].strip('-build') + tag = msg.repo_tag.strip('-build') module_build = models.ModuleBuild.from_repo_done_event(session, msg) if not module_build: log.info("No module build found associated with koji tag %r" % tag) diff --git a/rida/scheduler/main.py b/rida/scheduler/main.py index caa87a72..27623c30 100644 --- a/rida/scheduler/main.py +++ b/rida/scheduler/main.py @@ -62,7 +62,7 @@ class STOP_WORK(object): def module_build_state_from_msg(msg): - state = int(msg['msg']['state']) + state = int(msg.module_build_state) # TODO better handling assert state in models.BUILD_STATES.values(), "state=%s(%s) is not in %s" % (state, type(state), models.BUILD_STATES.values()) return state @@ -75,7 +75,7 @@ class MessageIngest(threading.Thread): def run(self): - for msg in rida.messaging.listen(backend=conf.messaging): + for msg in rida.messaging.listen(conf.messaging): self.outgoing_work_queue.put(msg) @@ -137,29 +137,27 @@ class MessageWorker(threading.Thread): try: self.process_message(msg) except Exception: - log.exception("Failed while handling %r" % msg['msg_id']) - # Log the body of the message too, but clear out some spammy - # fields that are of no use to a human reader. - msg.pop('certificate', None) - msg.pop('signature', None) + log.exception("Failed while handling %r" % msg.msg_id) log.info(pprint.pformat(msg)) def process_message(self, msg): - log.debug("received %r, %r" % (msg['msg_id'], msg['topic'])) + log.debug('Received a message with an ID of "{0}" and of type "{1}"' + .format(msg.msg_id, type(msg).__name__)) # Choose a handler for this message - if '.buildsys.repo.done' in msg['topic']: + if type(msg) == rida.messaging.KojiBuildChange: + handler = self.on_build_change[msg.build_new_state] + elif type(msg) == rida.messaging.KojiRepoChange: handler = self.on_repo_change - elif '.buildsys.build.state.change' in msg['topic']: - handler = self.on_build_change[msg['msg']['new']] - elif '.rida.module.state.change' in msg['topic']: + elif type(msg) == rida.messaging.RidaModule: handler = self.on_module_change[module_build_state_from_msg(msg)] else: log.debug("Unhandled message...") return # Execute our chosen handler - idx = "%s: %s, %s" % (handler.__name__, msg['topic'], msg['msg_id']) + idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__, + msg.msg_id) if handler is self.NO_OP: log.debug("Handler is NO_OP: %s" % idx) else: @@ -213,15 +211,15 @@ class Poller(threading.Thread): log.info(" task %r is in state %r" % (component_build.task_id, task_info['state'])) if task_info['state'] in dead_states: # Fake a fedmsg message on our internal queue - self.outgoing_work_queue.put({ - 'msg_id': 'a faked internal message', - 'topic': 'org.fedoraproject.prod.buildsys.build.state.change', - 'msg': { - 'msg_id': 'a faked internal message', - 'task_id': component_build.task_id, - 'new': koji.BUILD_STATES['FAILED'], - }, - }) + msg = rida.messaging.KojiBuildChange( + msg_id='a faked internal message', + build_id=component_build.task_id, + build_name=component_build.package, + build_new_state=koji.BUILD_STATES['FAILED'], + build_release=None, + build_version=None + ) + self.outgoing_work_queue.put(msg) else: raise NotImplementedError("Buildsystem %r is not supported." % conf.system) diff --git a/tests/test_scheduler/test_module_wait.py b/tests/test_scheduler/test_module_wait.py index a8ed4ba7..05338db6 100644 --- a/tests/test_scheduler/test_module_wait.py +++ b/tests/test_scheduler/test_module_wait.py @@ -22,7 +22,7 @@ import unittest import mock - +import rida.messaging import rida.scheduler.handlers.modules @@ -47,13 +47,9 @@ class TestModuleWait(unittest.TestCase): 'release': 1, 'state': 'some state', } + from_module_event.return_value = mocked_module_build - msg = { - 'topic': 'org.fedoraproject.prod.rida.module.state.change', - 'msg': { - 'id': 1, - 'state': 'some state', - }, - } + msg = rida.messaging.RidaModule(msg_id=None, module_build_id=1, + module_build_state='some state') self.fn(config=self.config, session=self.session, msg=msg) diff --git a/tests/test_scheduler/test_repo_done.py b/tests/test_scheduler/test_repo_done.py index 4940a251..1de319b7 100644 --- a/tests/test_scheduler/test_repo_done.py +++ b/tests/test_scheduler/test_repo_done.py @@ -23,6 +23,7 @@ import unittest import mock +import rida.messaging import rida.scheduler.handlers.repos @@ -43,10 +44,8 @@ class TestRepoDone(unittest.TestCase): that we do nothing gracefully. """ from_repo_done_event.return_value = None - msg = { - 'topic': 'org.fedoraproject.prod.buildsys.repo.done', - 'msg': {'tag': 'no matches for this...'}, - } + msg = rida.messaging.KojiRepoChange( + 'no matches for this...', '2016-some-guid') self.fn(config=self.config, session=self.session, msg=msg) @mock.patch('rida.builder.KojiModuleBuilder.buildroot_ready') @@ -75,9 +74,7 @@ class TestRepoDone(unittest.TestCase): ready.return_value = True - msg = { - 'topic': 'org.fedoraproject.prod.buildsys.repo.done', - 'msg': {'tag': 'no matches for this...'}, - } + msg = rida.messaging.KojiRepoChange( + 'no matches for this...', '2016-some-guid') self.fn(config=self.config, session=self.session, msg=msg) build_fn.assert_called_once_with(artifact_name='foo', source='full_scm_url')