mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-02-07 23:33:19 +08:00
This also removes the outdated comments around authorship of each file. If there is still interest in this information, one can just look at the git history.
378 lines
13 KiB
Python
378 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
# SPDX-License-Identifier: MIT
|
|
"""Generic messaging functions."""
|
|
|
|
import re
|
|
import pkg_resources
|
|
|
|
try:
|
|
from inspect import signature
|
|
except ImportError:
|
|
from funcsigs import signature
|
|
|
|
from module_build_service import log
|
|
|
|
|
|
class IgnoreMessage(Exception):
|
|
pass
|
|
|
|
|
|
class BaseMessage(object):
|
|
def __init__(self, msg_id):
|
|
"""
|
|
A base class to abstract messages from different backends
|
|
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
|
|
"""
|
|
self.msg_id = msg_id
|
|
|
|
# Moksha calls `consumer.validate` on messages that it receives, and
|
|
# even though we have validation turned off in the config there's still
|
|
# a step that tries to access `msg['body']`, `msg['topic']` and
|
|
# `msg.get('topic')`.
|
|
# These are here just so that the `validate` method won't raise an
|
|
# exception when we push our fake messages through.
|
|
# Note that, our fake message pushing has worked for a while... but the
|
|
# *latest* version of fedmsg has some code that exercises the bug. I
|
|
# didn't hit this until I went to test in jenkins.
|
|
self.body = {}
|
|
self.topic = None
|
|
|
|
def __repr__(self):
|
|
init_sig = signature(self.__init__)
|
|
|
|
args_strs = (
|
|
"{}={!r}".format(name, getattr(self, name))
|
|
if param.default != param.empty
|
|
else repr(getattr(self, name))
|
|
for name, param in init_sig.parameters.items()
|
|
)
|
|
|
|
return "{}({})".format(type(self).__name__, ", ".join(args_strs))
|
|
|
|
def __getitem__(self, key):
|
|
""" Used to trick moksha into thinking we are a dict. """
|
|
return getattr(self, key)
|
|
|
|
def __setitem__(self, key, value):
|
|
""" Used to trick moksha into thinking we are a dict. """
|
|
return setattr(self, key, value)
|
|
|
|
def get(self, key, value=None):
|
|
""" Used to trick moksha into thinking we are a dict. """
|
|
return getattr(self, key, value)
|
|
|
|
def __json__(self):
|
|
return dict(msg_id=self.msg_id, topic=self.topic, body=self.body)
|
|
|
|
|
|
class MessageParser(object):
|
|
def parse(self, msg):
|
|
raise NotImplementedError()
|
|
|
|
|
|
class FedmsgMessageParser(MessageParser):
|
|
def parse(self, msg):
|
|
"""
|
|
Takes a fedmsg topic and message and converts it to a message object
|
|
:param msg: the message contents from the fedmsg message
|
|
:return: an object of BaseMessage descent if the message is a type
|
|
that the app looks for, otherwise None is returned
|
|
"""
|
|
if "body" in msg:
|
|
msg = msg["body"]
|
|
topic = msg["topic"]
|
|
topic_categories = _messaging_backends["fedmsg"]["services"]
|
|
categories_re = "|".join(map(re.escape, topic_categories))
|
|
regex_pattern = re.compile(
|
|
r"(?P<category>" + categories_re + r")"
|
|
r"(?:(?:\.)(?P<object>build|repo|module|decision))?"
|
|
r"(?:(?:\.)(?P<subobject>state|build))?"
|
|
r"(?:\.)(?P<event>change|done|end|tag|update)$"
|
|
)
|
|
regex_results = re.search(regex_pattern, topic)
|
|
|
|
if regex_results:
|
|
category = regex_results.group("category")
|
|
object = regex_results.group("object")
|
|
subobject = regex_results.group("subobject")
|
|
event = regex_results.group("event")
|
|
|
|
msg_id = msg.get("msg_id")
|
|
msg_inner_msg = msg.get("msg")
|
|
|
|
# If there isn't a msg dict in msg then this message can be skipped
|
|
if not msg_inner_msg:
|
|
log.debug(
|
|
"Skipping message without any content with the " 'topic "{0}"'.format(topic))
|
|
return None
|
|
|
|
msg_obj = None
|
|
|
|
# Ignore all messages from the secondary koji instances.
|
|
if category == "buildsys":
|
|
instance = msg_inner_msg.get("instance", "primary")
|
|
if instance != "primary":
|
|
log.debug("Ignoring message from %r koji hub." % instance)
|
|
return
|
|
|
|
if (
|
|
category == "buildsys"
|
|
and object == "build"
|
|
and subobject == "state"
|
|
and event == "change"
|
|
):
|
|
build_id = msg_inner_msg.get("build_id")
|
|
task_id = msg_inner_msg.get("task_id")
|
|
build_new_state = msg_inner_msg.get("new")
|
|
build_name = msg_inner_msg.get("name")
|
|
build_version = msg_inner_msg.get("version")
|
|
build_release = msg_inner_msg.get("release")
|
|
|
|
msg_obj = KojiBuildChange(
|
|
msg_id,
|
|
build_id,
|
|
task_id,
|
|
build_new_state,
|
|
build_name,
|
|
build_version,
|
|
build_release,
|
|
)
|
|
|
|
elif (
|
|
category == "buildsys"
|
|
and object == "repo"
|
|
and subobject is None
|
|
and event == "done"
|
|
):
|
|
repo_tag = msg_inner_msg.get("tag")
|
|
msg_obj = KojiRepoChange(msg_id, repo_tag)
|
|
|
|
elif category == "buildsys" and event == "tag":
|
|
tag = msg_inner_msg.get("tag")
|
|
name = msg_inner_msg.get("name")
|
|
version = msg_inner_msg.get("version")
|
|
release = msg_inner_msg.get("release")
|
|
nvr = None
|
|
if name and version and release:
|
|
nvr = "-".join((name, version, release))
|
|
msg_obj = KojiTagChange(msg_id, tag, name, nvr)
|
|
|
|
elif (
|
|
category == "mbs"
|
|
and object == "module"
|
|
and subobject == "state"
|
|
and event == "change"
|
|
):
|
|
msg_obj = MBSModule(msg_id, msg_inner_msg.get("id"), msg_inner_msg.get("state"))
|
|
|
|
elif (
|
|
category == "greenwave"
|
|
and object == "decision"
|
|
and subobject is None
|
|
and event == "update"
|
|
):
|
|
msg_obj = GreenwaveDecisionUpdate(
|
|
msg_id=msg_id,
|
|
decision_context=msg_inner_msg.get("decision_context"),
|
|
policies_satisfied=msg_inner_msg.get("policies_satisfied"),
|
|
subject_identifier=msg_inner_msg.get("subject_identifier"),
|
|
)
|
|
|
|
# If the message matched the regex and is important to the app,
|
|
# it will be returned
|
|
if msg_obj:
|
|
return msg_obj
|
|
|
|
return None
|
|
|
|
|
|
class KojiBuildChange(BaseMessage):
|
|
""" A class that inherits from BaseMessage to provide a message
|
|
object for a build's info (in fedmsg this replaces the msg dictionary)
|
|
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
|
|
:param build_id: the id of the build (e.g. 264382)
|
|
:param build_new_state: the new build state, this is currently a Koji
|
|
integer
|
|
:param build_name: the name of what is being built
|
|
(e.g. golang-googlecode-tools)
|
|
:param build_version: the version of the build (e.g. 6.06.06)
|
|
:param build_release: the release of the build (e.g. 4.fc25)
|
|
:param module_build_id: the optional id of the module_build in the database
|
|
:param state_reason: the optional reason as to why the state changed
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
msg_id,
|
|
build_id,
|
|
task_id,
|
|
build_new_state,
|
|
build_name,
|
|
build_version,
|
|
build_release,
|
|
module_build_id=None,
|
|
state_reason=None,
|
|
):
|
|
if task_id is None:
|
|
raise IgnoreMessage("KojiBuildChange with a null task_id is invalid.")
|
|
super(KojiBuildChange, self).__init__(msg_id)
|
|
self.build_id = build_id
|
|
self.task_id = task_id
|
|
self.build_new_state = build_new_state
|
|
self.build_name = build_name
|
|
self.build_version = build_version
|
|
self.build_release = build_release
|
|
self.module_build_id = module_build_id
|
|
self.state_reason = state_reason
|
|
|
|
|
|
class KojiTagChange(BaseMessage):
|
|
"""
|
|
A class that inherits from BaseMessage to provide a message
|
|
object for a buildsys.tag info (in fedmsg this replaces the msg dictionary)
|
|
:param tag: the name of tag (e.g. module-123456789-build)
|
|
:param artifact: the name of tagged artifact (e.g. module-build-macros)
|
|
:param nvr: the nvr of the tagged artifact
|
|
"""
|
|
|
|
def __init__(self, msg_id, tag, artifact, nvr):
|
|
super(KojiTagChange, self).__init__(msg_id)
|
|
self.tag = tag
|
|
self.artifact = artifact
|
|
self.nvr = nvr
|
|
|
|
|
|
class KojiRepoChange(BaseMessage):
|
|
""" A class that inherits from BaseMessage to provide a message
|
|
object for a repo's info (in fedmsg this replaces the msg dictionary)
|
|
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
|
|
:param repo_tag: the repo's tag (e.g. SHADOWBUILD-f25-build)
|
|
"""
|
|
|
|
def __init__(self, msg_id, repo_tag):
|
|
super(KojiRepoChange, self).__init__(msg_id)
|
|
self.repo_tag = repo_tag
|
|
|
|
|
|
class MBSModule(BaseMessage):
|
|
""" A class that inherits from BaseMessage to provide a message
|
|
object for a module event generated by module_build_service
|
|
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
|
|
:param module_build_id: the id of the module build
|
|
:param module_build_state: the state of the module build
|
|
"""
|
|
|
|
def __init__(self, msg_id, module_build_id, module_build_state):
|
|
super(MBSModule, self).__init__(msg_id)
|
|
self.module_build_id = module_build_id
|
|
self.module_build_state = module_build_state
|
|
|
|
|
|
class GreenwaveDecisionUpdate(BaseMessage):
|
|
"""A class representing message send to topic greenwave.decision.update"""
|
|
|
|
def __init__(self, msg_id, decision_context, policies_satisfied, subject_identifier):
|
|
super(GreenwaveDecisionUpdate, self).__init__(msg_id)
|
|
self.decision_context = decision_context
|
|
self.policies_satisfied = policies_satisfied
|
|
self.subject_identifier = subject_identifier
|
|
|
|
|
|
def publish(topic, msg, conf, service):
|
|
"""
|
|
Publish a single message to a given backend, and return
|
|
:param topic: the topic of the message (e.g. module.state.change)
|
|
:param msg: the message contents of the message (typically JSON)
|
|
:param conf: a Config object from the class in config.py
|
|
:param service: the system that is publishing the message (e.g. mbs)
|
|
:return:
|
|
"""
|
|
try:
|
|
handler = _messaging_backends[conf.messaging]["publish"]
|
|
except KeyError:
|
|
raise KeyError(
|
|
"No messaging backend found for %r in %r" % (conf.messaging, _messaging_backends.keys())
|
|
)
|
|
|
|
from module_build_service.monitor import (
|
|
messaging_tx_to_send_counter,
|
|
messaging_tx_sent_ok_counter,
|
|
messaging_tx_failed_counter,
|
|
)
|
|
|
|
messaging_tx_to_send_counter.inc()
|
|
try:
|
|
rv = handler(topic, msg, conf, service)
|
|
messaging_tx_sent_ok_counter.inc()
|
|
return rv
|
|
except Exception:
|
|
messaging_tx_failed_counter.inc()
|
|
raise
|
|
|
|
|
|
def _fedmsg_publish(topic, msg, conf, service):
|
|
# fedmsg doesn't really need access to conf, however other backends do
|
|
import fedmsg
|
|
|
|
return fedmsg.publish(topic, msg=msg, modname=service)
|
|
|
|
|
|
# A counter used for in-memory messages.
|
|
_in_memory_msg_id = 0
|
|
_initial_messages = []
|
|
|
|
|
|
def _in_memory_publish(topic, msg, conf, service):
|
|
""" Puts the message into the in memory work queue. """
|
|
# Increment the message ID.
|
|
global _in_memory_msg_id
|
|
_in_memory_msg_id += 1
|
|
|
|
# Create fake fedmsg from the message so we can reuse
|
|
# the BaseMessage.from_fedmsg code to get the particular BaseMessage
|
|
# class instance.
|
|
wrapped_msg = FedmsgMessageParser().parse({
|
|
"msg_id": str(_in_memory_msg_id),
|
|
"topic": service + "." + topic,
|
|
"msg": msg
|
|
})
|
|
|
|
# Put the message to queue.
|
|
from module_build_service.scheduler.consumer import work_queue_put
|
|
|
|
try:
|
|
work_queue_put(wrapped_msg)
|
|
except ValueError as e:
|
|
log.warning("No MBSConsumer found. Shutting down? %r" % e)
|
|
except AttributeError:
|
|
# In the event that `moksha.hub._hub` hasn't yet been initialized, we
|
|
# need to store messages on the side until it becomes available.
|
|
# As a last-ditch effort, try to hang initial messages in the config.
|
|
log.warning("Hub not initialized. Queueing on the side.")
|
|
_initial_messages.append(wrapped_msg)
|
|
|
|
|
|
_fedmsg_backend = {
|
|
"publish": _fedmsg_publish,
|
|
"services": ["buildsys", "mbs", "greenwave"],
|
|
"parser": FedmsgMessageParser(),
|
|
"topic_suffix": ".",
|
|
}
|
|
_in_memory_backend = {
|
|
"publish": _in_memory_publish,
|
|
"services": [],
|
|
"parser": FedmsgMessageParser(), # re-used. :)
|
|
"topic_suffix": ".",
|
|
}
|
|
|
|
|
|
_messaging_backends = {}
|
|
for entrypoint in pkg_resources.iter_entry_points("mbs.messaging_backends"):
|
|
_messaging_backends[entrypoint.name] = ep = entrypoint.load()
|
|
required = ["publish", "services", "parser", "topic_suffix"]
|
|
if any([key not in ep for key in required]):
|
|
raise ValueError("messaging backend %r is malformed: %r" % (entrypoint.name, ep))
|
|
|
|
if not _messaging_backends:
|
|
raise ValueError("No messaging plugins are installed or available.")
|