Files
mprahl 8c6cfb702d Use small license headers in the Python files
This also removes the outdated comments around authorship of each
file. If there is still interest in this information, one can just
look at the git history.
2019-10-03 08:47:24 -04:00

378 lines
13 KiB
Python

# -*- coding: utf-8 -*-
# SPDX-License-Identifier: MIT
"""Generic messaging functions."""
import re
import pkg_resources
try:
from inspect import signature
except ImportError:
from funcsigs import signature
from module_build_service import log
class IgnoreMessage(Exception):
pass
class BaseMessage(object):
def __init__(self, msg_id):
"""
A base class to abstract messages from different backends
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
"""
self.msg_id = msg_id
# Moksha calls `consumer.validate` on messages that it receives, and
# even though we have validation turned off in the config there's still
# a step that tries to access `msg['body']`, `msg['topic']` and
# `msg.get('topic')`.
# These are here just so that the `validate` method won't raise an
# exception when we push our fake messages through.
# Note that, our fake message pushing has worked for a while... but the
# *latest* version of fedmsg has some code that exercises the bug. I
# didn't hit this until I went to test in jenkins.
self.body = {}
self.topic = None
def __repr__(self):
init_sig = signature(self.__init__)
args_strs = (
"{}={!r}".format(name, getattr(self, name))
if param.default != param.empty
else repr(getattr(self, name))
for name, param in init_sig.parameters.items()
)
return "{}({})".format(type(self).__name__, ", ".join(args_strs))
def __getitem__(self, key):
""" Used to trick moksha into thinking we are a dict. """
return getattr(self, key)
def __setitem__(self, key, value):
""" Used to trick moksha into thinking we are a dict. """
return setattr(self, key, value)
def get(self, key, value=None):
""" Used to trick moksha into thinking we are a dict. """
return getattr(self, key, value)
def __json__(self):
return dict(msg_id=self.msg_id, topic=self.topic, body=self.body)
class MessageParser(object):
def parse(self, msg):
raise NotImplementedError()
class FedmsgMessageParser(MessageParser):
def parse(self, msg):
"""
Takes a fedmsg topic and message and converts it to a message object
:param msg: the message contents from the fedmsg message
:return: an object of BaseMessage descent if the message is a type
that the app looks for, otherwise None is returned
"""
if "body" in msg:
msg = msg["body"]
topic = msg["topic"]
topic_categories = _messaging_backends["fedmsg"]["services"]
categories_re = "|".join(map(re.escape, topic_categories))
regex_pattern = re.compile(
r"(?P<category>" + categories_re + r")"
r"(?:(?:\.)(?P<object>build|repo|module|decision))?"
r"(?:(?:\.)(?P<subobject>state|build))?"
r"(?:\.)(?P<event>change|done|end|tag|update)$"
)
regex_results = re.search(regex_pattern, topic)
if regex_results:
category = regex_results.group("category")
object = regex_results.group("object")
subobject = regex_results.group("subobject")
event = regex_results.group("event")
msg_id = msg.get("msg_id")
msg_inner_msg = msg.get("msg")
# If there isn't a msg dict in msg then this message can be skipped
if not msg_inner_msg:
log.debug(
"Skipping message without any content with the " 'topic "{0}"'.format(topic))
return None
msg_obj = None
# Ignore all messages from the secondary koji instances.
if category == "buildsys":
instance = msg_inner_msg.get("instance", "primary")
if instance != "primary":
log.debug("Ignoring message from %r koji hub." % instance)
return
if (
category == "buildsys"
and object == "build"
and subobject == "state"
and event == "change"
):
build_id = msg_inner_msg.get("build_id")
task_id = msg_inner_msg.get("task_id")
build_new_state = msg_inner_msg.get("new")
build_name = msg_inner_msg.get("name")
build_version = msg_inner_msg.get("version")
build_release = msg_inner_msg.get("release")
msg_obj = KojiBuildChange(
msg_id,
build_id,
task_id,
build_new_state,
build_name,
build_version,
build_release,
)
elif (
category == "buildsys"
and object == "repo"
and subobject is None
and event == "done"
):
repo_tag = msg_inner_msg.get("tag")
msg_obj = KojiRepoChange(msg_id, repo_tag)
elif category == "buildsys" and event == "tag":
tag = msg_inner_msg.get("tag")
name = msg_inner_msg.get("name")
version = msg_inner_msg.get("version")
release = msg_inner_msg.get("release")
nvr = None
if name and version and release:
nvr = "-".join((name, version, release))
msg_obj = KojiTagChange(msg_id, tag, name, nvr)
elif (
category == "mbs"
and object == "module"
and subobject == "state"
and event == "change"
):
msg_obj = MBSModule(msg_id, msg_inner_msg.get("id"), msg_inner_msg.get("state"))
elif (
category == "greenwave"
and object == "decision"
and subobject is None
and event == "update"
):
msg_obj = GreenwaveDecisionUpdate(
msg_id=msg_id,
decision_context=msg_inner_msg.get("decision_context"),
policies_satisfied=msg_inner_msg.get("policies_satisfied"),
subject_identifier=msg_inner_msg.get("subject_identifier"),
)
# If the message matched the regex and is important to the app,
# it will be returned
if msg_obj:
return msg_obj
return None
class KojiBuildChange(BaseMessage):
""" A class that inherits from BaseMessage to provide a message
object for a build's info (in fedmsg this replaces the msg dictionary)
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
:param build_id: the id of the build (e.g. 264382)
:param build_new_state: the new build state, this is currently a Koji
integer
:param build_name: the name of what is being built
(e.g. golang-googlecode-tools)
:param build_version: the version of the build (e.g. 6.06.06)
:param build_release: the release of the build (e.g. 4.fc25)
:param module_build_id: the optional id of the module_build in the database
:param state_reason: the optional reason as to why the state changed
"""
def __init__(
self,
msg_id,
build_id,
task_id,
build_new_state,
build_name,
build_version,
build_release,
module_build_id=None,
state_reason=None,
):
if task_id is None:
raise IgnoreMessage("KojiBuildChange with a null task_id is invalid.")
super(KojiBuildChange, self).__init__(msg_id)
self.build_id = build_id
self.task_id = task_id
self.build_new_state = build_new_state
self.build_name = build_name
self.build_version = build_version
self.build_release = build_release
self.module_build_id = module_build_id
self.state_reason = state_reason
class KojiTagChange(BaseMessage):
"""
A class that inherits from BaseMessage to provide a message
object for a buildsys.tag info (in fedmsg this replaces the msg dictionary)
:param tag: the name of tag (e.g. module-123456789-build)
:param artifact: the name of tagged artifact (e.g. module-build-macros)
:param nvr: the nvr of the tagged artifact
"""
def __init__(self, msg_id, tag, artifact, nvr):
super(KojiTagChange, self).__init__(msg_id)
self.tag = tag
self.artifact = artifact
self.nvr = nvr
class KojiRepoChange(BaseMessage):
""" A class that inherits from BaseMessage to provide a message
object for a repo's info (in fedmsg this replaces the msg dictionary)
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
:param repo_tag: the repo's tag (e.g. SHADOWBUILD-f25-build)
"""
def __init__(self, msg_id, repo_tag):
super(KojiRepoChange, self).__init__(msg_id)
self.repo_tag = repo_tag
class MBSModule(BaseMessage):
""" A class that inherits from BaseMessage to provide a message
object for a module event generated by module_build_service
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
:param module_build_id: the id of the module build
:param module_build_state: the state of the module build
"""
def __init__(self, msg_id, module_build_id, module_build_state):
super(MBSModule, self).__init__(msg_id)
self.module_build_id = module_build_id
self.module_build_state = module_build_state
class GreenwaveDecisionUpdate(BaseMessage):
"""A class representing message send to topic greenwave.decision.update"""
def __init__(self, msg_id, decision_context, policies_satisfied, subject_identifier):
super(GreenwaveDecisionUpdate, self).__init__(msg_id)
self.decision_context = decision_context
self.policies_satisfied = policies_satisfied
self.subject_identifier = subject_identifier
def publish(topic, msg, conf, service):
"""
Publish a single message to a given backend, and return
:param topic: the topic of the message (e.g. module.state.change)
:param msg: the message contents of the message (typically JSON)
:param conf: a Config object from the class in config.py
:param service: the system that is publishing the message (e.g. mbs)
:return:
"""
try:
handler = _messaging_backends[conf.messaging]["publish"]
except KeyError:
raise KeyError(
"No messaging backend found for %r in %r" % (conf.messaging, _messaging_backends.keys())
)
from module_build_service.monitor import (
messaging_tx_to_send_counter,
messaging_tx_sent_ok_counter,
messaging_tx_failed_counter,
)
messaging_tx_to_send_counter.inc()
try:
rv = handler(topic, msg, conf, service)
messaging_tx_sent_ok_counter.inc()
return rv
except Exception:
messaging_tx_failed_counter.inc()
raise
def _fedmsg_publish(topic, msg, conf, service):
# fedmsg doesn't really need access to conf, however other backends do
import fedmsg
return fedmsg.publish(topic, msg=msg, modname=service)
# A counter used for in-memory messages.
_in_memory_msg_id = 0
_initial_messages = []
def _in_memory_publish(topic, msg, conf, service):
""" Puts the message into the in memory work queue. """
# Increment the message ID.
global _in_memory_msg_id
_in_memory_msg_id += 1
# Create fake fedmsg from the message so we can reuse
# the BaseMessage.from_fedmsg code to get the particular BaseMessage
# class instance.
wrapped_msg = FedmsgMessageParser().parse({
"msg_id": str(_in_memory_msg_id),
"topic": service + "." + topic,
"msg": msg
})
# Put the message to queue.
from module_build_service.scheduler.consumer import work_queue_put
try:
work_queue_put(wrapped_msg)
except ValueError as e:
log.warning("No MBSConsumer found. Shutting down? %r" % e)
except AttributeError:
# In the event that `moksha.hub._hub` hasn't yet been initialized, we
# need to store messages on the side until it becomes available.
# As a last-ditch effort, try to hang initial messages in the config.
log.warning("Hub not initialized. Queueing on the side.")
_initial_messages.append(wrapped_msg)
_fedmsg_backend = {
"publish": _fedmsg_publish,
"services": ["buildsys", "mbs", "greenwave"],
"parser": FedmsgMessageParser(),
"topic_suffix": ".",
}
_in_memory_backend = {
"publish": _in_memory_publish,
"services": [],
"parser": FedmsgMessageParser(), # re-used. :)
"topic_suffix": ".",
}
_messaging_backends = {}
for entrypoint in pkg_resources.iter_entry_points("mbs.messaging_backends"):
_messaging_backends[entrypoint.name] = ep = entrypoint.load()
required = ["publish", "services", "parser", "topic_suffix"]
if any([key not in ep for key in required]):
raise ValueError("messaging backend %r is malformed: %r" % (entrypoint.name, ep))
if not _messaging_backends:
raise ValueError("No messaging plugins are installed or available.")