mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-13 16:59:52 +08:00
Use abstract message classes instead of legacy code
This commit is contained in:
committed by
Nils Philippsen
parent
03831262e4
commit
23f057b750
@@ -176,9 +176,14 @@ def _fedmsg_publish(topic, msg, modname):
|
||||
|
||||
|
||||
def _fedmsg_listen(**kwargs):
|
||||
"""
|
||||
Parses a fedmsg event and constructs it into the appropriate message object
|
||||
"""
|
||||
import fedmsg
|
||||
for name, endpoint, topic, msg in fedmsg.tail_messages(**kwargs):
|
||||
yield msg
|
||||
msg_obj = BaseMessage.from_fedmsg(topic, msg)
|
||||
if msg_obj:
|
||||
yield msg_obj
|
||||
|
||||
_messaging_backends = {
|
||||
'fedmsg': {
|
||||
|
||||
@@ -123,9 +123,12 @@ class ModuleBuild(RidaBase):
|
||||
|
||||
@classmethod
|
||||
def from_module_event(cls, session, event):
|
||||
if '.module.' not in event['topic']:
|
||||
raise ValueError("%r is not a module message." % event['topic'])
|
||||
return session.query(cls).filter(cls.id==event['msg']['id']).first()
|
||||
if type(event) == rida.messaging.RidaModule:
|
||||
return session.query(cls).filter(
|
||||
cls.id == event.msg.module_build_id).first()
|
||||
else:
|
||||
raise ValueError("%r is not a module message."
|
||||
% type(event).__name__)
|
||||
|
||||
@classmethod
|
||||
def create(cls, session, conf, name, version, release, modulemd, scmurl, username):
|
||||
@@ -179,7 +182,7 @@ class ModuleBuild(RidaBase):
|
||||
|
||||
There should be at most one.
|
||||
"""
|
||||
tag = event['msg']['tag'].strip('-build')
|
||||
tag = event.repo_tag.strip('-build')
|
||||
query = session.query(cls)\
|
||||
.filter(cls.koji_tag==tag)\
|
||||
.filter(cls.state==BUILD_STATES["build"])
|
||||
@@ -276,9 +279,11 @@ class ComponentBuild(RidaBase):
|
||||
|
||||
@classmethod
|
||||
def from_component_event(cls, session, event):
|
||||
if 'component.state.change' not in event['topic'] and '.buildsys.build.state.change' not in event['topic']:
|
||||
if type(event) == rida.messaging.KojiBuildChange:
|
||||
return session.query(cls).filter(
|
||||
cls.task_id == event.build_id).first()
|
||||
else:
|
||||
raise ValueError("%r is not a koji message." % event['topic'])
|
||||
return session.query(cls).filter(cls.task_id==event['msg']['task_id']).first()
|
||||
|
||||
def json(self):
|
||||
retval = {
|
||||
|
||||
@@ -41,7 +41,8 @@ def _finalize(config, session, msg, state):
|
||||
# First, find our ModuleBuild associated with this repo, if any.
|
||||
component_build = models.ComponentBuild.from_component_event(session, msg)
|
||||
try:
|
||||
nvr = "{name}-{version}-{release}".format(**msg['msg'])
|
||||
nvr = "{}-{}-{}".format(msg.build_name, msg.build_version,
|
||||
msg.build_release)
|
||||
except KeyError:
|
||||
nvr = None
|
||||
|
||||
@@ -72,7 +73,8 @@ def _finalize(config, session, msg, state):
|
||||
builder = rida.builder.KojiModuleBuilder(module_name, config, tag_name=tag)
|
||||
builder.buildroot_connect()
|
||||
# tag && add to srpm-build group
|
||||
nvr = "{name}-{version}-{release}".format(**msg['msg'])
|
||||
nvr = "{}-{}-{}".format(msg.build_name, msg.build_version,
|
||||
msg.build_release)
|
||||
install = bool(component_build.package == 'module-build-macros')
|
||||
builder.buildroot_add_artifacts([nvr,], install=install)
|
||||
session.commit()
|
||||
|
||||
@@ -52,10 +52,10 @@ def done(config, session, msg):
|
||||
"""
|
||||
build = models.ModuleBuild.from_module_event(db.session, msg)
|
||||
module_info = build.json()
|
||||
if module_info['state'] != msg['msg']['state']:
|
||||
if module_info['state'] != msg.module_build_state:
|
||||
log.warn("Note that retrieved module state %r "
|
||||
"doesn't match message module state %r" % (
|
||||
module_info['state'], msg['msg']['state']))
|
||||
module_info['state'], msg.module_build_state))
|
||||
# This is ok.. it's a race condition we can ignore.
|
||||
pass
|
||||
|
||||
@@ -75,10 +75,10 @@ def wait(config, session, msg):
|
||||
log.info("Found build=%r from message" % build)
|
||||
|
||||
module_info = build.json()
|
||||
if module_info['state'] != msg['msg']['state']:
|
||||
if module_info['state'] != msg.module_build_state:
|
||||
log.warn("Note that retrieved module state %r "
|
||||
"doesn't match message module state %r" % (
|
||||
module_info['state'], msg['msg']['state']))
|
||||
module_info['state'], msg.module_build_state))
|
||||
# This is ok.. it's a race condition we can ignore.
|
||||
pass
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ def done(config, session, msg):
|
||||
""" Called whenever koji rebuilds a repo, any repo. """
|
||||
|
||||
# First, find our ModuleBuild associated with this repo, if any.
|
||||
tag = msg['msg']['tag'].strip('-build')
|
||||
tag = msg.repo_tag.strip('-build')
|
||||
module_build = models.ModuleBuild.from_repo_done_event(session, msg)
|
||||
if not module_build:
|
||||
log.info("No module build found associated with koji tag %r" % tag)
|
||||
|
||||
@@ -62,7 +62,7 @@ class STOP_WORK(object):
|
||||
|
||||
|
||||
def module_build_state_from_msg(msg):
|
||||
state = int(msg['msg']['state'])
|
||||
state = int(msg.module_build_state)
|
||||
# TODO better handling
|
||||
assert state in models.BUILD_STATES.values(), "state=%s(%s) is not in %s" % (state, type(state), models.BUILD_STATES.values())
|
||||
return state
|
||||
@@ -75,7 +75,7 @@ class MessageIngest(threading.Thread):
|
||||
|
||||
|
||||
def run(self):
|
||||
for msg in rida.messaging.listen(backend=conf.messaging):
|
||||
for msg in rida.messaging.listen(conf.messaging):
|
||||
self.outgoing_work_queue.put(msg)
|
||||
|
||||
|
||||
@@ -137,29 +137,27 @@ class MessageWorker(threading.Thread):
|
||||
try:
|
||||
self.process_message(msg)
|
||||
except Exception:
|
||||
log.exception("Failed while handling %r" % msg['msg_id'])
|
||||
# Log the body of the message too, but clear out some spammy
|
||||
# fields that are of no use to a human reader.
|
||||
msg.pop('certificate', None)
|
||||
msg.pop('signature', None)
|
||||
log.exception("Failed while handling %r" % msg.msg_id)
|
||||
log.info(pprint.pformat(msg))
|
||||
|
||||
def process_message(self, msg):
|
||||
log.debug("received %r, %r" % (msg['msg_id'], msg['topic']))
|
||||
log.debug('Received a message with an ID of "{0}" and of type "{1}"'
|
||||
.format(msg.msg_id, type(msg).__name__))
|
||||
|
||||
# Choose a handler for this message
|
||||
if '.buildsys.repo.done' in msg['topic']:
|
||||
if type(msg) == rida.messaging.KojiBuildChange:
|
||||
handler = self.on_build_change[msg.build_new_state]
|
||||
elif type(msg) == rida.messaging.KojiRepoChange:
|
||||
handler = self.on_repo_change
|
||||
elif '.buildsys.build.state.change' in msg['topic']:
|
||||
handler = self.on_build_change[msg['msg']['new']]
|
||||
elif '.rida.module.state.change' in msg['topic']:
|
||||
elif type(msg) == rida.messaging.RidaModule:
|
||||
handler = self.on_module_change[module_build_state_from_msg(msg)]
|
||||
else:
|
||||
log.debug("Unhandled message...")
|
||||
return
|
||||
|
||||
# Execute our chosen handler
|
||||
idx = "%s: %s, %s" % (handler.__name__, msg['topic'], msg['msg_id'])
|
||||
idx = "%s: %s, %s" % (handler.__name__, type(msg).__name__,
|
||||
msg.msg_id)
|
||||
if handler is self.NO_OP:
|
||||
log.debug("Handler is NO_OP: %s" % idx)
|
||||
else:
|
||||
@@ -213,15 +211,15 @@ class Poller(threading.Thread):
|
||||
log.info(" task %r is in state %r" % (component_build.task_id, task_info['state']))
|
||||
if task_info['state'] in dead_states:
|
||||
# Fake a fedmsg message on our internal queue
|
||||
self.outgoing_work_queue.put({
|
||||
'msg_id': 'a faked internal message',
|
||||
'topic': 'org.fedoraproject.prod.buildsys.build.state.change',
|
||||
'msg': {
|
||||
'msg_id': 'a faked internal message',
|
||||
'task_id': component_build.task_id,
|
||||
'new': koji.BUILD_STATES['FAILED'],
|
||||
},
|
||||
})
|
||||
msg = rida.messaging.KojiBuildChange(
|
||||
msg_id='a faked internal message',
|
||||
build_id=component_build.task_id,
|
||||
build_name=component_build.package,
|
||||
build_new_state=koji.BUILD_STATES['FAILED'],
|
||||
build_release=None,
|
||||
build_version=None
|
||||
)
|
||||
self.outgoing_work_queue.put(msg)
|
||||
|
||||
else:
|
||||
raise NotImplementedError("Buildsystem %r is not supported." % conf.system)
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
|
||||
import unittest
|
||||
import mock
|
||||
|
||||
import rida.messaging
|
||||
import rida.scheduler.handlers.modules
|
||||
|
||||
|
||||
@@ -47,13 +47,9 @@ class TestModuleWait(unittest.TestCase):
|
||||
'release': 1,
|
||||
'state': 'some state',
|
||||
}
|
||||
|
||||
from_module_event.return_value = mocked_module_build
|
||||
|
||||
msg = {
|
||||
'topic': 'org.fedoraproject.prod.rida.module.state.change',
|
||||
'msg': {
|
||||
'id': 1,
|
||||
'state': 'some state',
|
||||
},
|
||||
}
|
||||
msg = rida.messaging.RidaModule(msg_id=None, module_build_id=1,
|
||||
module_build_state='some state')
|
||||
self.fn(config=self.config, session=self.session, msg=msg)
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
import unittest
|
||||
import mock
|
||||
|
||||
import rida.messaging
|
||||
import rida.scheduler.handlers.repos
|
||||
|
||||
|
||||
@@ -43,10 +44,8 @@ class TestRepoDone(unittest.TestCase):
|
||||
that we do nothing gracefully.
|
||||
"""
|
||||
from_repo_done_event.return_value = None
|
||||
msg = {
|
||||
'topic': 'org.fedoraproject.prod.buildsys.repo.done',
|
||||
'msg': {'tag': 'no matches for this...'},
|
||||
}
|
||||
msg = rida.messaging.KojiRepoChange(
|
||||
'no matches for this...', '2016-some-guid')
|
||||
self.fn(config=self.config, session=self.session, msg=msg)
|
||||
|
||||
@mock.patch('rida.builder.KojiModuleBuilder.buildroot_ready')
|
||||
@@ -75,9 +74,7 @@ class TestRepoDone(unittest.TestCase):
|
||||
|
||||
ready.return_value = True
|
||||
|
||||
msg = {
|
||||
'topic': 'org.fedoraproject.prod.buildsys.repo.done',
|
||||
'msg': {'tag': 'no matches for this...'},
|
||||
}
|
||||
msg = rida.messaging.KojiRepoChange(
|
||||
'no matches for this...', '2016-some-guid')
|
||||
self.fn(config=self.config, session=self.session, msg=msg)
|
||||
build_fn.assert_called_once_with(artifact_name='foo', source='full_scm_url')
|
||||
|
||||
Reference in New Issue
Block a user