mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-05-11 10:34:30 +08:00
Clear up module_build_service.messaging
Message classes and FedmsgMessageParser are moved into dedicated Python module under scheduler/ directory. FedmsgMessageParser is decoupled from messaging.py by initializing a parser object with known fedmsg services. This decouple avoids cycle import between parser.py and messaging.py. Signed-off-by: Chenxiong Qi <cqi@redhat.com>
This commit is contained in:
@@ -29,6 +29,7 @@ from module_build_service.errors import ProgrammingError
|
||||
|
||||
from module_build_service.builder.base import GenericBuilder
|
||||
from module_build_service.builder.KojiContentGenerator import KojiContentGenerator
|
||||
from module_build_service.scheduler import events
|
||||
from module_build_service.utils import get_reusable_components, get_reusable_module, set_locale
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
@@ -741,7 +742,7 @@ class KojiModuleBuilder(GenericBuilder):
|
||||
nvr_dict = kobo.rpmlib.parse_nvr(component_build.nvr)
|
||||
# Trigger a completed build message
|
||||
further_work.append(
|
||||
module_build_service.messaging.KojiBuildChange(
|
||||
events.KojiBuildChange(
|
||||
"recover_orphaned_artifact: fake message",
|
||||
build["build_id"],
|
||||
build["task_id"],
|
||||
@@ -772,7 +773,7 @@ class KojiModuleBuilder(GenericBuilder):
|
||||
"the tag handler".format(tag)
|
||||
)
|
||||
further_work.append(
|
||||
module_build_service.messaging.KojiTagChange(
|
||||
events.KojiTagChange(
|
||||
"recover_orphaned_artifact: fake message",
|
||||
tag,
|
||||
component_build.package,
|
||||
|
||||
@@ -23,6 +23,7 @@ from module_build_service.db_session import db_session
|
||||
from module_build_service.errors import StreamAmbigous
|
||||
import module_build_service.messaging
|
||||
import module_build_service.scheduler.consumer
|
||||
import module_build_service.scheduler.local
|
||||
|
||||
|
||||
manager = Manager(create_app)
|
||||
@@ -181,10 +182,10 @@ def build_module_locally(
|
||||
|
||||
module_build_ids = [build.id for build in module_builds]
|
||||
|
||||
stop = module_build_service.scheduler.make_simple_stop_condition()
|
||||
stop = module_build_service.scheduler.local.make_simple_stop_condition()
|
||||
|
||||
# Run the consumer until stop_condition returns True
|
||||
module_build_service.scheduler.main([], stop)
|
||||
module_build_service.scheduler.local.main([], stop)
|
||||
|
||||
has_failed_module = db_session.query(models.ModuleBuild).filter(
|
||||
models.ModuleBuild.id.in_(module_build_ids),
|
||||
|
||||
@@ -2,282 +2,13 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
"""Generic messaging functions."""
|
||||
|
||||
import re
|
||||
import pkg_resources
|
||||
|
||||
try:
|
||||
from inspect import signature
|
||||
except ImportError:
|
||||
from funcsigs import signature
|
||||
from module_build_service.scheduler.parser import FedmsgMessageParser
|
||||
|
||||
from module_build_service import log
|
||||
|
||||
|
||||
class IgnoreMessage(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class BaseMessage(object):
|
||||
def __init__(self, msg_id):
|
||||
"""
|
||||
A base class to abstract messages from different backends
|
||||
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
|
||||
"""
|
||||
self.msg_id = msg_id
|
||||
|
||||
# Moksha calls `consumer.validate` on messages that it receives, and
|
||||
# even though we have validation turned off in the config there's still
|
||||
# a step that tries to access `msg['body']`, `msg['topic']` and
|
||||
# `msg.get('topic')`.
|
||||
# These are here just so that the `validate` method won't raise an
|
||||
# exception when we push our fake messages through.
|
||||
# Note that, our fake message pushing has worked for a while... but the
|
||||
# *latest* version of fedmsg has some code that exercises the bug. I
|
||||
# didn't hit this until I went to test in jenkins.
|
||||
self.body = {}
|
||||
self.topic = None
|
||||
|
||||
def __repr__(self):
|
||||
init_sig = signature(self.__init__)
|
||||
|
||||
args_strs = (
|
||||
"{}={!r}".format(name, getattr(self, name))
|
||||
if param.default != param.empty
|
||||
else repr(getattr(self, name))
|
||||
for name, param in init_sig.parameters.items()
|
||||
)
|
||||
|
||||
return "{}({})".format(type(self).__name__, ", ".join(args_strs))
|
||||
|
||||
def __getitem__(self, key):
|
||||
""" Used to trick moksha into thinking we are a dict. """
|
||||
return getattr(self, key)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
""" Used to trick moksha into thinking we are a dict. """
|
||||
return setattr(self, key, value)
|
||||
|
||||
def get(self, key, value=None):
|
||||
""" Used to trick moksha into thinking we are a dict. """
|
||||
return getattr(self, key, value)
|
||||
|
||||
def __json__(self):
|
||||
return dict(msg_id=self.msg_id, topic=self.topic, body=self.body)
|
||||
|
||||
|
||||
class MessageParser(object):
|
||||
def parse(self, msg):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class FedmsgMessageParser(MessageParser):
|
||||
def parse(self, msg):
|
||||
"""
|
||||
Takes a fedmsg topic and message and converts it to a message object
|
||||
:param msg: the message contents from the fedmsg message
|
||||
:return: an object of BaseMessage descent if the message is a type
|
||||
that the app looks for, otherwise None is returned
|
||||
"""
|
||||
if "body" in msg:
|
||||
msg = msg["body"]
|
||||
topic = msg["topic"]
|
||||
topic_categories = _messaging_backends["fedmsg"]["services"]
|
||||
categories_re = "|".join(map(re.escape, topic_categories))
|
||||
regex_pattern = re.compile(
|
||||
r"(?P<category>" + categories_re + r")"
|
||||
r"(?:(?:\.)(?P<object>build|repo|module|decision))?"
|
||||
r"(?:(?:\.)(?P<subobject>state|build))?"
|
||||
r"(?:\.)(?P<event>change|done|end|tag|update)$"
|
||||
)
|
||||
regex_results = re.search(regex_pattern, topic)
|
||||
|
||||
if regex_results:
|
||||
category = regex_results.group("category")
|
||||
object = regex_results.group("object")
|
||||
subobject = regex_results.group("subobject")
|
||||
event = regex_results.group("event")
|
||||
|
||||
msg_id = msg.get("msg_id")
|
||||
msg_inner_msg = msg.get("msg")
|
||||
|
||||
# If there isn't a msg dict in msg then this message can be skipped
|
||||
if not msg_inner_msg:
|
||||
log.debug(
|
||||
"Skipping message without any content with the " 'topic "{0}"'.format(topic))
|
||||
return None
|
||||
|
||||
msg_obj = None
|
||||
|
||||
# Ignore all messages from the secondary koji instances.
|
||||
if category == "buildsys":
|
||||
instance = msg_inner_msg.get("instance", "primary")
|
||||
if instance != "primary":
|
||||
log.debug("Ignoring message from %r koji hub." % instance)
|
||||
return
|
||||
|
||||
if (
|
||||
category == "buildsys"
|
||||
and object == "build"
|
||||
and subobject == "state"
|
||||
and event == "change"
|
||||
):
|
||||
build_id = msg_inner_msg.get("build_id")
|
||||
task_id = msg_inner_msg.get("task_id")
|
||||
build_new_state = msg_inner_msg.get("new")
|
||||
build_name = msg_inner_msg.get("name")
|
||||
build_version = msg_inner_msg.get("version")
|
||||
build_release = msg_inner_msg.get("release")
|
||||
|
||||
msg_obj = KojiBuildChange(
|
||||
msg_id,
|
||||
build_id,
|
||||
task_id,
|
||||
build_new_state,
|
||||
build_name,
|
||||
build_version,
|
||||
build_release,
|
||||
)
|
||||
|
||||
elif (
|
||||
category == "buildsys"
|
||||
and object == "repo"
|
||||
and subobject is None
|
||||
and event == "done"
|
||||
):
|
||||
repo_tag = msg_inner_msg.get("tag")
|
||||
msg_obj = KojiRepoChange(msg_id, repo_tag)
|
||||
|
||||
elif category == "buildsys" and event == "tag":
|
||||
tag = msg_inner_msg.get("tag")
|
||||
name = msg_inner_msg.get("name")
|
||||
version = msg_inner_msg.get("version")
|
||||
release = msg_inner_msg.get("release")
|
||||
nvr = None
|
||||
if name and version and release:
|
||||
nvr = "-".join((name, version, release))
|
||||
msg_obj = KojiTagChange(msg_id, tag, name, nvr)
|
||||
|
||||
elif (
|
||||
category == "mbs"
|
||||
and object == "module"
|
||||
and subobject == "state"
|
||||
and event == "change"
|
||||
):
|
||||
msg_obj = MBSModule(msg_id, msg_inner_msg.get("id"), msg_inner_msg.get("state"))
|
||||
|
||||
elif (
|
||||
category == "greenwave"
|
||||
and object == "decision"
|
||||
and subobject is None
|
||||
and event == "update"
|
||||
):
|
||||
msg_obj = GreenwaveDecisionUpdate(
|
||||
msg_id=msg_id,
|
||||
decision_context=msg_inner_msg.get("decision_context"),
|
||||
policies_satisfied=msg_inner_msg.get("policies_satisfied"),
|
||||
subject_identifier=msg_inner_msg.get("subject_identifier"),
|
||||
)
|
||||
|
||||
# If the message matched the regex and is important to the app,
|
||||
# it will be returned
|
||||
if msg_obj:
|
||||
return msg_obj
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class KojiBuildChange(BaseMessage):
|
||||
""" A class that inherits from BaseMessage to provide a message
|
||||
object for a build's info (in fedmsg this replaces the msg dictionary)
|
||||
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
|
||||
:param build_id: the id of the build (e.g. 264382)
|
||||
:param build_new_state: the new build state, this is currently a Koji
|
||||
integer
|
||||
:param build_name: the name of what is being built
|
||||
(e.g. golang-googlecode-tools)
|
||||
:param build_version: the version of the build (e.g. 6.06.06)
|
||||
:param build_release: the release of the build (e.g. 4.fc25)
|
||||
:param module_build_id: the optional id of the module_build in the database
|
||||
:param state_reason: the optional reason as to why the state changed
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
msg_id,
|
||||
build_id,
|
||||
task_id,
|
||||
build_new_state,
|
||||
build_name,
|
||||
build_version,
|
||||
build_release,
|
||||
module_build_id=None,
|
||||
state_reason=None,
|
||||
):
|
||||
if task_id is None:
|
||||
raise IgnoreMessage("KojiBuildChange with a null task_id is invalid.")
|
||||
super(KojiBuildChange, self).__init__(msg_id)
|
||||
self.build_id = build_id
|
||||
self.task_id = task_id
|
||||
self.build_new_state = build_new_state
|
||||
self.build_name = build_name
|
||||
self.build_version = build_version
|
||||
self.build_release = build_release
|
||||
self.module_build_id = module_build_id
|
||||
self.state_reason = state_reason
|
||||
|
||||
|
||||
class KojiTagChange(BaseMessage):
|
||||
"""
|
||||
A class that inherits from BaseMessage to provide a message
|
||||
object for a buildsys.tag info (in fedmsg this replaces the msg dictionary)
|
||||
:param tag: the name of tag (e.g. module-123456789-build)
|
||||
:param artifact: the name of tagged artifact (e.g. module-build-macros)
|
||||
:param nvr: the nvr of the tagged artifact
|
||||
"""
|
||||
|
||||
def __init__(self, msg_id, tag, artifact, nvr):
|
||||
super(KojiTagChange, self).__init__(msg_id)
|
||||
self.tag = tag
|
||||
self.artifact = artifact
|
||||
self.nvr = nvr
|
||||
|
||||
|
||||
class KojiRepoChange(BaseMessage):
|
||||
""" A class that inherits from BaseMessage to provide a message
|
||||
object for a repo's info (in fedmsg this replaces the msg dictionary)
|
||||
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
|
||||
:param repo_tag: the repo's tag (e.g. SHADOWBUILD-f25-build)
|
||||
"""
|
||||
|
||||
def __init__(self, msg_id, repo_tag):
|
||||
super(KojiRepoChange, self).__init__(msg_id)
|
||||
self.repo_tag = repo_tag
|
||||
|
||||
|
||||
class MBSModule(BaseMessage):
|
||||
""" A class that inherits from BaseMessage to provide a message
|
||||
object for a module event generated by module_build_service
|
||||
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
|
||||
:param module_build_id: the id of the module build
|
||||
:param module_build_state: the state of the module build
|
||||
"""
|
||||
|
||||
def __init__(self, msg_id, module_build_id, module_build_state):
|
||||
super(MBSModule, self).__init__(msg_id)
|
||||
self.module_build_id = module_build_id
|
||||
self.module_build_state = module_build_state
|
||||
|
||||
|
||||
class GreenwaveDecisionUpdate(BaseMessage):
|
||||
"""A class representing message send to topic greenwave.decision.update"""
|
||||
|
||||
def __init__(self, msg_id, decision_context, policies_satisfied, subject_identifier):
|
||||
super(GreenwaveDecisionUpdate, self).__init__(msg_id)
|
||||
self.decision_context = decision_context
|
||||
self.policies_satisfied = policies_satisfied
|
||||
self.subject_identifier = subject_identifier
|
||||
|
||||
|
||||
def publish(topic, msg, conf, service):
|
||||
"""
|
||||
Publish a single message to a given backend, and return
|
||||
@@ -331,7 +62,7 @@ def _in_memory_publish(topic, msg, conf, service):
|
||||
# Create fake fedmsg from the message so we can reuse
|
||||
# the BaseMessage.from_fedmsg code to get the particular BaseMessage
|
||||
# class instance.
|
||||
wrapped_msg = FedmsgMessageParser().parse({
|
||||
wrapped_msg = FedmsgMessageParser(known_fedmsg_services).parse({
|
||||
"msg_id": str(_in_memory_msg_id),
|
||||
"topic": service + "." + topic,
|
||||
"msg": msg
|
||||
@@ -352,16 +83,19 @@ def _in_memory_publish(topic, msg, conf, service):
|
||||
_initial_messages.append(wrapped_msg)
|
||||
|
||||
|
||||
known_fedmsg_services = ["buildsys", "mbs", "greenwave"]
|
||||
|
||||
|
||||
_fedmsg_backend = {
|
||||
"publish": _fedmsg_publish,
|
||||
"services": ["buildsys", "mbs", "greenwave"],
|
||||
"parser": FedmsgMessageParser(),
|
||||
"parser": FedmsgMessageParser(known_fedmsg_services),
|
||||
"services": known_fedmsg_services,
|
||||
"topic_suffix": ".",
|
||||
}
|
||||
_in_memory_backend = {
|
||||
"publish": _in_memory_publish,
|
||||
"parser": FedmsgMessageParser(known_fedmsg_services), # re-used. :)
|
||||
"services": [],
|
||||
"parser": FedmsgMessageParser(), # re-used. :)
|
||||
"topic_suffix": ".",
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ from sqlalchemy.orm import validates, load_only
|
||||
import module_build_service.messaging
|
||||
from module_build_service import db, log, get_url_for, conf
|
||||
from module_build_service.errors import UnprocessableEntity
|
||||
from module_build_service.scheduler import events
|
||||
|
||||
DEFAULT_MODULE_CONTEXT = "00000000"
|
||||
|
||||
@@ -484,7 +485,7 @@ class ModuleBuild(MBSBase):
|
||||
|
||||
@classmethod
|
||||
def from_module_event(cls, db_session, event):
|
||||
if type(event) == module_build_service.messaging.MBSModule:
|
||||
if type(event) == events.MBSModule:
|
||||
return db_session.query(cls).filter(cls.id == event.module_build_id).first()
|
||||
else:
|
||||
raise ValueError("%r is not a module message." % type(event).__name__)
|
||||
@@ -1128,7 +1129,7 @@ class ComponentBuild(MBSBase):
|
||||
|
||||
@classmethod
|
||||
def from_component_event(cls, db_session, event):
|
||||
if isinstance(event, module_build_service.messaging.KojiBuildChange):
|
||||
if isinstance(event, events.KojiBuildChange):
|
||||
if event.module_build_id:
|
||||
return (
|
||||
db_session.query(cls)
|
||||
|
||||
@@ -1,87 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# SPDX-License-Identifier: MIT
|
||||
""" This is a sub-module for backend/scheduler functionality. """
|
||||
|
||||
import fedmsg
|
||||
import moksha.hub
|
||||
|
||||
import module_build_service.models
|
||||
import module_build_service.scheduler.consumer
|
||||
|
||||
from module_build_service.db_session import db_session
|
||||
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def main(initial_messages, stop_condition):
|
||||
""" Run the consumer until some condition is met.
|
||||
|
||||
Setting stop_condition to None will run the consumer forever.
|
||||
"""
|
||||
|
||||
config = fedmsg.config.load_config()
|
||||
config["mbsconsumer"] = True
|
||||
config["mbsconsumer.stop_condition"] = stop_condition
|
||||
config["mbsconsumer.initial_messages"] = initial_messages
|
||||
|
||||
# Moksha requires that we subscribe to *something*, so tell it /dev/null
|
||||
# since we'll just be doing in-memory queue-based messaging for this single
|
||||
# build.
|
||||
config["zmq_enabled"] = True
|
||||
config["zmq_subscribe_endpoints"] = "ipc:///dev/null"
|
||||
|
||||
consumers = [module_build_service.scheduler.consumer.MBSConsumer]
|
||||
|
||||
# Note that the hub we kick off here cannot send any message. You
|
||||
# should use fedmsg.publish(...) still for that.
|
||||
moksha.hub.main(
|
||||
# Pass in our config dict
|
||||
options=config,
|
||||
# Only run the specified consumers if any are so specified.
|
||||
consumers=consumers,
|
||||
# Do not run default producers.
|
||||
producers=[],
|
||||
# Tell moksha to quiet its logging.
|
||||
framework=False,
|
||||
)
|
||||
|
||||
|
||||
def make_simple_stop_condition():
|
||||
""" Return a simple stop_condition callable.
|
||||
|
||||
Intended to be used with the main() function here in manage.py and tests.
|
||||
|
||||
The stop_condition returns true when the latest module build enters the any
|
||||
of the finished states.
|
||||
"""
|
||||
|
||||
def stop_condition(message):
|
||||
# XXX - We ignore the message here and instead just query the DB.
|
||||
|
||||
# Grab the latest module build.
|
||||
module = (
|
||||
db_session.query(module_build_service.models.ModuleBuild)
|
||||
.order_by(module_build_service.models.ModuleBuild.id.desc())
|
||||
.first()
|
||||
)
|
||||
done = (
|
||||
module_build_service.models.BUILD_STATES["failed"],
|
||||
module_build_service.models.BUILD_STATES["ready"],
|
||||
module_build_service.models.BUILD_STATES["done"],
|
||||
)
|
||||
result = module.state in done
|
||||
log.debug("stop_condition checking %r, got %r" % (module, result))
|
||||
|
||||
# moksha.hub.main starts the hub and runs it in a separate thread. When
|
||||
# the result is True, remove the db_session from that thread local so
|
||||
# that any pending queries in the transaction will not block other
|
||||
# queries made from other threads.
|
||||
# This is useful for testing particularly.
|
||||
if result:
|
||||
db_session.remove()
|
||||
|
||||
return result
|
||||
|
||||
return stop_condition
|
||||
|
||||
@@ -33,6 +33,7 @@ from module_build_service import models, log, conf
|
||||
from module_build_service.db_session import db_session
|
||||
from module_build_service.scheduler.handlers import greenwave
|
||||
from module_build_service.utils import module_build_state_from_msg
|
||||
from module_build_service.scheduler import events
|
||||
|
||||
|
||||
class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
@@ -124,7 +125,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
def validate(self, message):
|
||||
if conf.messaging == "fedmsg":
|
||||
# If this is a faked internal message, don't bother.
|
||||
if isinstance(message, module_build_service.messaging.BaseMessage):
|
||||
if isinstance(message, events.BaseMessage):
|
||||
log.info("Skipping crypto validation for %r" % message)
|
||||
return
|
||||
# Otherwise, if it is a real message from the network, pass it
|
||||
@@ -139,7 +140,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
# messages, then just use them as-is. If they are not already
|
||||
# instances of our message abstraction base class, then first transform
|
||||
# them before proceeding.
|
||||
if isinstance(message, module_build_service.messaging.BaseMessage):
|
||||
if isinstance(message, events.BaseMessage):
|
||||
msg = message
|
||||
else:
|
||||
msg = self.get_abstracted_msg(message)
|
||||
@@ -169,7 +170,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
if parser:
|
||||
try:
|
||||
return parser.parse(message)
|
||||
except module_build_service.messaging.IgnoreMessage:
|
||||
except events.IgnoreMessage:
|
||||
pass
|
||||
else:
|
||||
raise ValueError("{0} backend does not define a message parser".format(conf.messaging))
|
||||
@@ -198,32 +199,32 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
def _map_message(self, db_session, msg):
|
||||
"""Map message to its corresponding event handler and module build"""
|
||||
|
||||
if isinstance(msg, module_build_service.messaging.KojiBuildChange):
|
||||
if isinstance(msg, events.KojiBuildChange):
|
||||
handler = self.on_build_change[msg.build_new_state]
|
||||
build = models.ComponentBuild.from_component_event(db_session, msg)
|
||||
if build:
|
||||
build = build.module_build
|
||||
return handler, build
|
||||
|
||||
if isinstance(msg, module_build_service.messaging.KojiRepoChange):
|
||||
if isinstance(msg, events.KojiRepoChange):
|
||||
return (
|
||||
self.on_repo_change,
|
||||
models.ModuleBuild.from_repo_done_event(db_session, msg)
|
||||
)
|
||||
|
||||
if isinstance(msg, module_build_service.messaging.KojiTagChange):
|
||||
if isinstance(msg, events.KojiTagChange):
|
||||
return (
|
||||
self.on_tag_change,
|
||||
models.ModuleBuild.from_tag_change_event(db_session, msg)
|
||||
)
|
||||
|
||||
if isinstance(msg, module_build_service.messaging.MBSModule):
|
||||
if isinstance(msg, events.MBSModule):
|
||||
return (
|
||||
self.on_module_change[module_build_state_from_msg(msg)],
|
||||
models.ModuleBuild.from_module_event(db_session, msg)
|
||||
)
|
||||
|
||||
if isinstance(msg, module_build_service.messaging.GreenwaveDecisionUpdate):
|
||||
if isinstance(msg, events.GreenwaveDecisionUpdate):
|
||||
return (
|
||||
self.on_decision_update,
|
||||
greenwave.get_corresponding_module_build(msg.subject_identifier)
|
||||
@@ -305,6 +306,6 @@ def work_queue_put(msg):
|
||||
|
||||
|
||||
def fake_repo_done_message(tag_name):
|
||||
msg = module_build_service.messaging.KojiRepoChange(
|
||||
msg = events.KojiRepoChange(
|
||||
msg_id="a faked internal message", repo_tag=tag_name + "-build")
|
||||
work_queue_put(msg)
|
||||
|
||||
151
module_build_service/scheduler/events.py
Normal file
151
module_build_service/scheduler/events.py
Normal file
@@ -0,0 +1,151 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
try:
|
||||
from inspect import signature
|
||||
except ImportError:
|
||||
from funcsigs import signature
|
||||
|
||||
|
||||
class IgnoreMessage(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class BaseMessage(object):
|
||||
def __init__(self, msg_id):
|
||||
"""
|
||||
A base class to abstract messages from different backends
|
||||
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
|
||||
"""
|
||||
self.msg_id = msg_id
|
||||
|
||||
# Moksha calls `consumer.validate` on messages that it receives, and
|
||||
# even though we have validation turned off in the config there's still
|
||||
# a step that tries to access `msg['body']`, `msg['topic']` and
|
||||
# `msg.get('topic')`.
|
||||
# These are here just so that the `validate` method won't raise an
|
||||
# exception when we push our fake messages through.
|
||||
# Note that, our fake message pushing has worked for a while... but the
|
||||
# *latest* version of fedmsg has some code that exercises the bug. I
|
||||
# didn't hit this until I went to test in jenkins.
|
||||
self.body = {}
|
||||
self.topic = None
|
||||
|
||||
def __repr__(self):
|
||||
init_sig = signature(self.__init__)
|
||||
|
||||
args_strs = (
|
||||
"{}={!r}".format(name, getattr(self, name))
|
||||
if param.default != param.empty
|
||||
else repr(getattr(self, name))
|
||||
for name, param in init_sig.parameters.items()
|
||||
)
|
||||
|
||||
return "{}({})".format(type(self).__name__, ", ".join(args_strs))
|
||||
|
||||
def __getitem__(self, key):
|
||||
""" Used to trick moksha into thinking we are a dict. """
|
||||
return getattr(self, key)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
""" Used to trick moksha into thinking we are a dict. """
|
||||
return setattr(self, key, value)
|
||||
|
||||
def get(self, key, value=None):
|
||||
""" Used to trick moksha into thinking we are a dict. """
|
||||
return getattr(self, key, value)
|
||||
|
||||
def __json__(self):
|
||||
return dict(msg_id=self.msg_id, topic=self.topic, body=self.body)
|
||||
|
||||
|
||||
class KojiBuildChange(BaseMessage):
|
||||
""" A class that inherits from BaseMessage to provide a message
|
||||
object for a build's info (in fedmsg this replaces the msg dictionary)
|
||||
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
|
||||
:param build_id: the id of the build (e.g. 264382)
|
||||
:param build_new_state: the new build state, this is currently a Koji
|
||||
integer
|
||||
:param build_name: the name of what is being built
|
||||
(e.g. golang-googlecode-tools)
|
||||
:param build_version: the version of the build (e.g. 6.06.06)
|
||||
:param build_release: the release of the build (e.g. 4.fc25)
|
||||
:param module_build_id: the optional id of the module_build in the database
|
||||
:param state_reason: the optional reason as to why the state changed
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
msg_id,
|
||||
build_id,
|
||||
task_id,
|
||||
build_new_state,
|
||||
build_name,
|
||||
build_version,
|
||||
build_release,
|
||||
module_build_id=None,
|
||||
state_reason=None,
|
||||
):
|
||||
if task_id is None:
|
||||
raise IgnoreMessage("KojiBuildChange with a null task_id is invalid.")
|
||||
super(KojiBuildChange, self).__init__(msg_id)
|
||||
self.build_id = build_id
|
||||
self.task_id = task_id
|
||||
self.build_new_state = build_new_state
|
||||
self.build_name = build_name
|
||||
self.build_version = build_version
|
||||
self.build_release = build_release
|
||||
self.module_build_id = module_build_id
|
||||
self.state_reason = state_reason
|
||||
|
||||
|
||||
class KojiTagChange(BaseMessage):
|
||||
"""
|
||||
A class that inherits from BaseMessage to provide a message
|
||||
object for a buildsys.tag info (in fedmsg this replaces the msg dictionary)
|
||||
:param tag: the name of tag (e.g. module-123456789-build)
|
||||
:param artifact: the name of tagged artifact (e.g. module-build-macros)
|
||||
:param nvr: the nvr of the tagged artifact
|
||||
"""
|
||||
|
||||
def __init__(self, msg_id, tag, artifact, nvr):
|
||||
super(KojiTagChange, self).__init__(msg_id)
|
||||
self.tag = tag
|
||||
self.artifact = artifact
|
||||
self.nvr = nvr
|
||||
|
||||
|
||||
class KojiRepoChange(BaseMessage):
|
||||
""" A class that inherits from BaseMessage to provide a message
|
||||
object for a repo's info (in fedmsg this replaces the msg dictionary)
|
||||
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
|
||||
:param repo_tag: the repo's tag (e.g. SHADOWBUILD-f25-build)
|
||||
"""
|
||||
|
||||
def __init__(self, msg_id, repo_tag):
|
||||
super(KojiRepoChange, self).__init__(msg_id)
|
||||
self.repo_tag = repo_tag
|
||||
|
||||
|
||||
class MBSModule(BaseMessage):
|
||||
""" A class that inherits from BaseMessage to provide a message
|
||||
object for a module event generated by module_build_service
|
||||
:param msg_id: the id of the msg (e.g. 2016-SomeGUID)
|
||||
:param module_build_id: the id of the module build
|
||||
:param module_build_state: the state of the module build
|
||||
"""
|
||||
|
||||
def __init__(self, msg_id, module_build_id, module_build_state):
|
||||
super(MBSModule, self).__init__(msg_id)
|
||||
self.module_build_id = module_build_id
|
||||
self.module_build_state = module_build_state
|
||||
|
||||
|
||||
class GreenwaveDecisionUpdate(BaseMessage):
|
||||
"""A class representing message send to topic greenwave.decision.update"""
|
||||
|
||||
def __init__(self, msg_id, decision_context, policies_satisfied, subject_identifier):
|
||||
super(GreenwaveDecisionUpdate, self).__init__(msg_id)
|
||||
self.decision_context = decision_context
|
||||
self.policies_satisfied = policies_satisfied
|
||||
self.subject_identifier = subject_identifier
|
||||
@@ -6,9 +6,10 @@ import logging
|
||||
import koji
|
||||
import module_build_service.builder
|
||||
|
||||
from module_build_service import models, log
|
||||
from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder
|
||||
from module_build_service.scheduler import events
|
||||
from module_build_service.utils.general import mmd_to_str
|
||||
from module_build_service import models, log, messaging
|
||||
from module_build_service.db_session import db_session
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
@@ -110,7 +111,7 @@ def _finalize(config, msg, state):
|
||||
# change message here.
|
||||
log.info("Batch done. No component to tag")
|
||||
further_work += [
|
||||
messaging.KojiRepoChange(
|
||||
events.KojiRepoChange(
|
||||
"components::_finalize: fake msg", builder.module_build_tag["name"])
|
||||
]
|
||||
else:
|
||||
|
||||
@@ -6,7 +6,6 @@ from module_build_service import conf, models, log, build_logs
|
||||
import module_build_service.builder
|
||||
import module_build_service.resolver
|
||||
import module_build_service.utils
|
||||
import module_build_service.messaging
|
||||
from module_build_service.utils import (
|
||||
attempt_to_reuse_all_components,
|
||||
record_component_builds,
|
||||
@@ -17,6 +16,7 @@ from module_build_service.utils import (
|
||||
)
|
||||
from module_build_service.db_session import db_session
|
||||
from module_build_service.errors import UnprocessableEntity, Forbidden, ValidationError
|
||||
from module_build_service.scheduler import events
|
||||
from module_build_service.utils.greenwave import greenwave
|
||||
from module_build_service.scheduler.default_modules import (
|
||||
add_default_modules, handle_collisions_with_base_module_rpms)
|
||||
@@ -382,7 +382,7 @@ def wait(config, msg):
|
||||
# Return a KojiRepoChange message so that the build can be transitioned to done
|
||||
# in the repos handler
|
||||
return [
|
||||
module_build_service.messaging.KojiRepoChange(
|
||||
events.KojiRepoChange(
|
||||
"handlers.modules.wait: fake msg", builder.module_build_tag["name"])
|
||||
]
|
||||
|
||||
@@ -458,7 +458,7 @@ def wait(config, msg):
|
||||
db_session.commit()
|
||||
else:
|
||||
further_work.append(
|
||||
module_build_service.messaging.KojiRepoChange(
|
||||
events.KojiRepoChange(
|
||||
"fake msg", builder.module_build_tag["name"])
|
||||
)
|
||||
return further_work
|
||||
|
||||
@@ -5,8 +5,9 @@
|
||||
import module_build_service.builder
|
||||
import logging
|
||||
import koji
|
||||
from module_build_service import models, log, messaging
|
||||
from module_build_service import models, log
|
||||
from module_build_service.db_session import db_session
|
||||
from module_build_service.scheduler import events
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
@@ -70,7 +71,7 @@ def tagged(config, msg):
|
||||
log.info(
|
||||
"All components in module tagged and built, skipping the last repo regeneration")
|
||||
further_work += [
|
||||
messaging.KojiRepoChange(
|
||||
events.KojiRepoChange(
|
||||
"components::_finalize: fake msg", builder.module_build_tag["name"])
|
||||
]
|
||||
db_session.commit()
|
||||
|
||||
97
module_build_service/scheduler/local.py
Normal file
97
module_build_service/scheduler/local.py
Normal file
@@ -0,0 +1,97 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
import fedmsg
|
||||
import logging
|
||||
import module_build_service.models
|
||||
import moksha.hub
|
||||
|
||||
from module_build_service.db_session import db_session
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
"""
|
||||
This module contains functions to control fedmsg-hub running locally for local
|
||||
module build and running tests within test_build.py particularly.
|
||||
"""
|
||||
|
||||
__all__ = ["main", "make_simple_stop_condition"]
|
||||
|
||||
|
||||
def main(initial_messages, stop_condition):
|
||||
""" Run the consumer until some condition is met.
|
||||
|
||||
Setting stop_condition to None will run the consumer forever.
|
||||
"""
|
||||
|
||||
config = fedmsg.config.load_config()
|
||||
config["mbsconsumer"] = True
|
||||
config["mbsconsumer.stop_condition"] = stop_condition
|
||||
config["mbsconsumer.initial_messages"] = initial_messages
|
||||
|
||||
# Moksha requires that we subscribe to *something*, so tell it /dev/null
|
||||
# since we'll just be doing in-memory queue-based messaging for this single
|
||||
# build.
|
||||
config["zmq_enabled"] = True
|
||||
config["zmq_subscribe_endpoints"] = "ipc:///dev/null"
|
||||
|
||||
# Lazy import consumer to avoid potential import cycle.
|
||||
# For example, in some cases, importing event message from events.py would
|
||||
# cause importing the consumer module, which then starts to import relative
|
||||
# code inside handlers module, and the original code is imported eventually.
|
||||
import module_build_service.scheduler.consumer
|
||||
|
||||
consumers = [module_build_service.scheduler.consumer.MBSConsumer]
|
||||
|
||||
# Note that the hub we kick off here cannot send any message. You
|
||||
# should use fedmsg.publish(...) still for that.
|
||||
moksha.hub.main(
|
||||
# Pass in our config dict
|
||||
options=config,
|
||||
# Only run the specified consumers if any are so specified.
|
||||
consumers=consumers,
|
||||
# Do not run default producers.
|
||||
producers=[],
|
||||
# Tell moksha to quiet its logging.
|
||||
framework=False,
|
||||
)
|
||||
|
||||
|
||||
def make_simple_stop_condition():
|
||||
""" Return a simple stop_condition callable.
|
||||
|
||||
Intended to be used with the main() function here in manage.py and tests.
|
||||
|
||||
The stop_condition returns true when the latest module build enters the any
|
||||
of the finished states.
|
||||
"""
|
||||
|
||||
def stop_condition(message):
|
||||
# XXX - We ignore the message here and instead just query the DB.
|
||||
|
||||
# Grab the latest module build.
|
||||
module = (
|
||||
db_session.query(module_build_service.models.ModuleBuild)
|
||||
.order_by(module_build_service.models.ModuleBuild.id.desc())
|
||||
.first()
|
||||
)
|
||||
done = (
|
||||
module_build_service.models.BUILD_STATES["failed"],
|
||||
module_build_service.models.BUILD_STATES["ready"],
|
||||
module_build_service.models.BUILD_STATES["done"],
|
||||
)
|
||||
result = module.state in done
|
||||
log.debug("stop_condition checking %r, got %r" % (module, result))
|
||||
|
||||
# moksha.hub.main starts the hub and runs it in a separate thread. When
|
||||
# the result is True, remove the db_session from that thread local so
|
||||
# that any pending queries in the transaction will not block other
|
||||
# queries made from other threads.
|
||||
# This is useful for testing particularly.
|
||||
if result:
|
||||
db_session.remove()
|
||||
|
||||
return result
|
||||
|
||||
return stop_condition
|
||||
118
module_build_service/scheduler/parser.py
Normal file
118
module_build_service/scheduler/parser.py
Normal file
@@ -0,0 +1,118 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
import re
|
||||
|
||||
from module_build_service import log
|
||||
from module_build_service.scheduler import events
|
||||
|
||||
|
||||
class MessageParser(object):
|
||||
"""Base class for parsing messages received from a specific message bus
|
||||
|
||||
:param topic_categories: list of known services, that MBS can handle the
|
||||
messages sent from them. For example, a value could be
|
||||
``["buildsys", "mbs", "greenwave"]``.
|
||||
:type topic_categories: list[str]
|
||||
"""
|
||||
|
||||
def __init__(self, topic_categories):
|
||||
self.topic_categories = topic_categories
|
||||
|
||||
def parse(self, msg):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class FedmsgMessageParser(MessageParser):
|
||||
|
||||
def parse(self, msg):
|
||||
"""
|
||||
Parse a received message and convert it to a consistent format
|
||||
|
||||
:param dict msg: the message contents from the message bus.
|
||||
:return: a mapping representing the corresponding event.
|
||||
If the topic isn't recognized, None is returned.
|
||||
:rtype: dict or None
|
||||
"""
|
||||
|
||||
if "body" in msg:
|
||||
msg = msg["body"]
|
||||
topic = msg["topic"]
|
||||
categories_re = "|".join(map(re.escape, self.topic_categories))
|
||||
regex_pattern = re.compile(
|
||||
r"(?P<category>" + categories_re + r")"
|
||||
r"(?:(?:\.)(?P<object>build|repo|module|decision))?"
|
||||
r"(?:(?:\.)(?P<subobject>state|build))?"
|
||||
r"(?:\.)(?P<event>change|done|end|tag|update)$"
|
||||
)
|
||||
regex_results = re.search(regex_pattern, topic)
|
||||
|
||||
if regex_results:
|
||||
category = regex_results.group("category")
|
||||
object = regex_results.group("object")
|
||||
subobject = regex_results.group("subobject")
|
||||
event = regex_results.group("event")
|
||||
|
||||
msg_id = msg.get("msg_id")
|
||||
msg_inner_msg = msg.get("msg")
|
||||
|
||||
# If there isn't a msg dict in msg then this message can be skipped
|
||||
if not msg_inner_msg:
|
||||
log.debug(
|
||||
"Skipping message without any content with the " 'topic "{0}"'.format(topic))
|
||||
return None
|
||||
|
||||
# Ignore all messages from the secondary koji instances.
|
||||
if category == "buildsys":
|
||||
instance = msg_inner_msg.get("instance", "primary")
|
||||
if instance != "primary":
|
||||
log.debug("Ignoring message from %r koji hub." % instance)
|
||||
return
|
||||
|
||||
if object == "build" and subobject == "state" and event == "change":
|
||||
build_id = msg_inner_msg.get("build_id")
|
||||
task_id = msg_inner_msg.get("task_id")
|
||||
build_new_state = msg_inner_msg.get("new")
|
||||
build_name = msg_inner_msg.get("name")
|
||||
build_version = msg_inner_msg.get("version")
|
||||
build_release = msg_inner_msg.get("release")
|
||||
|
||||
return events.KojiBuildChange(
|
||||
msg_id,
|
||||
build_id,
|
||||
task_id,
|
||||
build_new_state,
|
||||
build_name,
|
||||
build_version,
|
||||
build_release,
|
||||
)
|
||||
|
||||
if object == "repo" and subobject is None and event == "done":
|
||||
repo_tag = msg_inner_msg.get("tag")
|
||||
return events.KojiRepoChange(msg_id, repo_tag)
|
||||
|
||||
if event == "tag":
|
||||
tag = msg_inner_msg.get("tag")
|
||||
name = msg_inner_msg.get("name")
|
||||
version = msg_inner_msg.get("version")
|
||||
release = msg_inner_msg.get("release")
|
||||
nvr = None
|
||||
if name and version and release:
|
||||
nvr = "-".join((name, version, release))
|
||||
return events.KojiTagChange(msg_id, tag, name, nvr)
|
||||
|
||||
if (category == "mbs"
|
||||
and object == "module" and subobject == "state" and event == "change"):
|
||||
return events.MBSModule(
|
||||
msg_id,
|
||||
msg_inner_msg.get("id"),
|
||||
msg_inner_msg.get("state"))
|
||||
|
||||
if (category == "greenwave"
|
||||
and object == "decision" and subobject is None and event == "update"):
|
||||
return events.GreenwaveDecisionUpdate(
|
||||
msg_id=msg_id,
|
||||
decision_context=msg_inner_msg.get("decision_context"),
|
||||
policies_satisfied=msg_inner_msg.get("policies_satisfied"),
|
||||
subject_identifier=msg_inner_msg.get("subject_identifier"),
|
||||
)
|
||||
@@ -16,6 +16,7 @@ import module_build_service.scheduler.consumer
|
||||
from module_build_service import conf, models, log
|
||||
from module_build_service.builder import GenericBuilder
|
||||
from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder
|
||||
from module_build_service.scheduler import events
|
||||
from module_build_service.utils.greenwave import greenwave
|
||||
from module_build_service.db_session import db_session
|
||||
|
||||
@@ -109,7 +110,7 @@ class MBSProducer(PollingProducer):
|
||||
log.info(" task {0!r} is in state {1!r}".format(task_id, task_info["state"]))
|
||||
if task_info["state"] in state_mapping:
|
||||
# Fake a fedmsg message on our internal queue
|
||||
msg = module_build_service.messaging.KojiBuildChange(
|
||||
msg = events.KojiBuildChange(
|
||||
msg_id="producer::fail_lost_builds fake msg",
|
||||
build_id=component_build.task_id,
|
||||
task_id=component_build.task_id,
|
||||
@@ -214,7 +215,7 @@ class MBSProducer(PollingProducer):
|
||||
|
||||
# Fake a message to kickstart the build anew in the consumer
|
||||
state = module_build_service.models.BUILD_STATES[state_name]
|
||||
msg = module_build_service.messaging.MBSModule(
|
||||
msg = events.MBSModule(
|
||||
"nudge_module_builds_fake_message", build.id, state)
|
||||
log.info(" Scheduling faked event %r" % msg)
|
||||
module_build_service.scheduler.consumer.work_queue_put(msg)
|
||||
@@ -436,7 +437,7 @@ class MBSProducer(PollingProducer):
|
||||
# If it is tagged in final tag, but MBS does not think so,
|
||||
# schedule fake message.
|
||||
if not c.tagged_in_final and module_build.koji_tag in tags:
|
||||
msg = module_build_service.messaging.KojiTagChange(
|
||||
msg = events.KojiTagChange(
|
||||
"sync_koji_build_tags_fake_message", module_build.koji_tag, c.package, c.nvr
|
||||
)
|
||||
log.info(" Scheduling faked event %r" % msg)
|
||||
@@ -446,7 +447,7 @@ class MBSProducer(PollingProducer):
|
||||
# schedule fake message.
|
||||
build_tag = module_build.koji_tag + "-build"
|
||||
if not c.tagged and build_tag in tags:
|
||||
msg = module_build_service.messaging.KojiTagChange(
|
||||
msg = events.KojiTagChange(
|
||||
"sync_koji_build_tags_fake_message", build_tag, c.package, c.nvr)
|
||||
log.info(" Scheduling faked event %r" % msg)
|
||||
module_build_service.scheduler.consumer.work_queue_put(msg)
|
||||
|
||||
@@ -4,8 +4,8 @@ import threading
|
||||
import concurrent.futures
|
||||
|
||||
from module_build_service import conf, log, models
|
||||
import module_build_service.messaging
|
||||
from module_build_service.db_session import db_session
|
||||
from module_build_service.scheduler.events import KojiRepoChange
|
||||
from .reuse import get_reusable_components, reuse_component
|
||||
|
||||
|
||||
@@ -275,8 +275,7 @@ def start_next_batch_build(config, module, builder, components=None):
|
||||
# message and return
|
||||
if components_reused and not unbuilt_components_after_reuse:
|
||||
further_work.append(
|
||||
module_build_service.messaging.KojiRepoChange(
|
||||
"start_build_batch: fake msg", builder.module_build_tag["name"])
|
||||
KojiRepoChange("start_build_batch: fake msg", builder.module_build_tag["name"])
|
||||
)
|
||||
return further_work
|
||||
|
||||
|
||||
@@ -2,11 +2,11 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
import kobo.rpmlib
|
||||
|
||||
import module_build_service.messaging
|
||||
from module_build_service import log, models, conf
|
||||
from module_build_service.db_session import db_session
|
||||
from module_build_service.utils.mse import get_base_module_mmds
|
||||
from module_build_service.resolver import GenericResolver
|
||||
from module_build_service.scheduler.events import KojiBuildChange
|
||||
from module_build_service.utils.mse import get_base_module_mmds
|
||||
|
||||
|
||||
def reuse_component(component, previous_component_build, change_state_now=False):
|
||||
@@ -45,7 +45,7 @@ def reuse_component(component, previous_component_build, change_state_now=False)
|
||||
# Add this message to further_work so that the reused
|
||||
# component will be tagged properly
|
||||
return [
|
||||
module_build_service.messaging.KojiBuildChange(
|
||||
KojiBuildChange(
|
||||
msg_id="reuse_component: fake msg",
|
||||
build_id=None,
|
||||
task_id=component.task_id,
|
||||
|
||||
Reference in New Issue
Block a user