diff --git a/module_build_service/builder/KojiModuleBuilder.py b/module_build_service/builder/KojiModuleBuilder.py index 37d332fb..48c8f3ed 100644 --- a/module_build_service/builder/KojiModuleBuilder.py +++ b/module_build_service/builder/KojiModuleBuilder.py @@ -29,6 +29,7 @@ from module_build_service.errors import ProgrammingError from module_build_service.builder.base import GenericBuilder from module_build_service.builder.KojiContentGenerator import KojiContentGenerator +from module_build_service.scheduler import events from module_build_service.utils import get_reusable_components, get_reusable_module, set_locale logging.basicConfig(level=logging.DEBUG) @@ -741,7 +742,7 @@ class KojiModuleBuilder(GenericBuilder): nvr_dict = kobo.rpmlib.parse_nvr(component_build.nvr) # Trigger a completed build message further_work.append( - module_build_service.messaging.KojiBuildChange( + events.KojiBuildChange( "recover_orphaned_artifact: fake message", build["build_id"], build["task_id"], @@ -772,7 +773,7 @@ class KojiModuleBuilder(GenericBuilder): "the tag handler".format(tag) ) further_work.append( - module_build_service.messaging.KojiTagChange( + events.KojiTagChange( "recover_orphaned_artifact: fake message", tag, component_build.package, diff --git a/module_build_service/manage.py b/module_build_service/manage.py index 8fc1e198..837278e8 100755 --- a/module_build_service/manage.py +++ b/module_build_service/manage.py @@ -23,6 +23,7 @@ from module_build_service.db_session import db_session from module_build_service.errors import StreamAmbigous import module_build_service.messaging import module_build_service.scheduler.consumer +import module_build_service.scheduler.local manager = Manager(create_app) @@ -181,10 +182,10 @@ def build_module_locally( module_build_ids = [build.id for build in module_builds] - stop = module_build_service.scheduler.make_simple_stop_condition() + stop = module_build_service.scheduler.local.make_simple_stop_condition() # Run the consumer until stop_condition returns True - module_build_service.scheduler.main([], stop) + module_build_service.scheduler.local.main([], stop) has_failed_module = db_session.query(models.ModuleBuild).filter( models.ModuleBuild.id.in_(module_build_ids), diff --git a/module_build_service/messaging.py b/module_build_service/messaging.py index 586719c7..89b9243c 100644 --- a/module_build_service/messaging.py +++ b/module_build_service/messaging.py @@ -2,282 +2,13 @@ # SPDX-License-Identifier: MIT """Generic messaging functions.""" -import re import pkg_resources -try: - from inspect import signature -except ImportError: - from funcsigs import signature +from module_build_service.scheduler.parser import FedmsgMessageParser from module_build_service import log -class IgnoreMessage(Exception): - pass - - -class BaseMessage(object): - def __init__(self, msg_id): - """ - A base class to abstract messages from different backends - :param msg_id: the id of the msg (e.g. 2016-SomeGUID) - """ - self.msg_id = msg_id - - # Moksha calls `consumer.validate` on messages that it receives, and - # even though we have validation turned off in the config there's still - # a step that tries to access `msg['body']`, `msg['topic']` and - # `msg.get('topic')`. - # These are here just so that the `validate` method won't raise an - # exception when we push our fake messages through. - # Note that, our fake message pushing has worked for a while... but the - # *latest* version of fedmsg has some code that exercises the bug. I - # didn't hit this until I went to test in jenkins. - self.body = {} - self.topic = None - - def __repr__(self): - init_sig = signature(self.__init__) - - args_strs = ( - "{}={!r}".format(name, getattr(self, name)) - if param.default != param.empty - else repr(getattr(self, name)) - for name, param in init_sig.parameters.items() - ) - - return "{}({})".format(type(self).__name__, ", ".join(args_strs)) - - def __getitem__(self, key): - """ Used to trick moksha into thinking we are a dict. """ - return getattr(self, key) - - def __setitem__(self, key, value): - """ Used to trick moksha into thinking we are a dict. """ - return setattr(self, key, value) - - def get(self, key, value=None): - """ Used to trick moksha into thinking we are a dict. """ - return getattr(self, key, value) - - def __json__(self): - return dict(msg_id=self.msg_id, topic=self.topic, body=self.body) - - -class MessageParser(object): - def parse(self, msg): - raise NotImplementedError() - - -class FedmsgMessageParser(MessageParser): - def parse(self, msg): - """ - Takes a fedmsg topic and message and converts it to a message object - :param msg: the message contents from the fedmsg message - :return: an object of BaseMessage descent if the message is a type - that the app looks for, otherwise None is returned - """ - if "body" in msg: - msg = msg["body"] - topic = msg["topic"] - topic_categories = _messaging_backends["fedmsg"]["services"] - categories_re = "|".join(map(re.escape, topic_categories)) - regex_pattern = re.compile( - r"(?P" + categories_re + r")" - r"(?:(?:\.)(?Pbuild|repo|module|decision))?" - r"(?:(?:\.)(?Pstate|build))?" - r"(?:\.)(?Pchange|done|end|tag|update)$" - ) - regex_results = re.search(regex_pattern, topic) - - if regex_results: - category = regex_results.group("category") - object = regex_results.group("object") - subobject = regex_results.group("subobject") - event = regex_results.group("event") - - msg_id = msg.get("msg_id") - msg_inner_msg = msg.get("msg") - - # If there isn't a msg dict in msg then this message can be skipped - if not msg_inner_msg: - log.debug( - "Skipping message without any content with the " 'topic "{0}"'.format(topic)) - return None - - msg_obj = None - - # Ignore all messages from the secondary koji instances. - if category == "buildsys": - instance = msg_inner_msg.get("instance", "primary") - if instance != "primary": - log.debug("Ignoring message from %r koji hub." % instance) - return - - if ( - category == "buildsys" - and object == "build" - and subobject == "state" - and event == "change" - ): - build_id = msg_inner_msg.get("build_id") - task_id = msg_inner_msg.get("task_id") - build_new_state = msg_inner_msg.get("new") - build_name = msg_inner_msg.get("name") - build_version = msg_inner_msg.get("version") - build_release = msg_inner_msg.get("release") - - msg_obj = KojiBuildChange( - msg_id, - build_id, - task_id, - build_new_state, - build_name, - build_version, - build_release, - ) - - elif ( - category == "buildsys" - and object == "repo" - and subobject is None - and event == "done" - ): - repo_tag = msg_inner_msg.get("tag") - msg_obj = KojiRepoChange(msg_id, repo_tag) - - elif category == "buildsys" and event == "tag": - tag = msg_inner_msg.get("tag") - name = msg_inner_msg.get("name") - version = msg_inner_msg.get("version") - release = msg_inner_msg.get("release") - nvr = None - if name and version and release: - nvr = "-".join((name, version, release)) - msg_obj = KojiTagChange(msg_id, tag, name, nvr) - - elif ( - category == "mbs" - and object == "module" - and subobject == "state" - and event == "change" - ): - msg_obj = MBSModule(msg_id, msg_inner_msg.get("id"), msg_inner_msg.get("state")) - - elif ( - category == "greenwave" - and object == "decision" - and subobject is None - and event == "update" - ): - msg_obj = GreenwaveDecisionUpdate( - msg_id=msg_id, - decision_context=msg_inner_msg.get("decision_context"), - policies_satisfied=msg_inner_msg.get("policies_satisfied"), - subject_identifier=msg_inner_msg.get("subject_identifier"), - ) - - # If the message matched the regex and is important to the app, - # it will be returned - if msg_obj: - return msg_obj - - return None - - -class KojiBuildChange(BaseMessage): - """ A class that inherits from BaseMessage to provide a message - object for a build's info (in fedmsg this replaces the msg dictionary) - :param msg_id: the id of the msg (e.g. 2016-SomeGUID) - :param build_id: the id of the build (e.g. 264382) - :param build_new_state: the new build state, this is currently a Koji - integer - :param build_name: the name of what is being built - (e.g. golang-googlecode-tools) - :param build_version: the version of the build (e.g. 6.06.06) - :param build_release: the release of the build (e.g. 4.fc25) - :param module_build_id: the optional id of the module_build in the database - :param state_reason: the optional reason as to why the state changed - """ - - def __init__( - self, - msg_id, - build_id, - task_id, - build_new_state, - build_name, - build_version, - build_release, - module_build_id=None, - state_reason=None, - ): - if task_id is None: - raise IgnoreMessage("KojiBuildChange with a null task_id is invalid.") - super(KojiBuildChange, self).__init__(msg_id) - self.build_id = build_id - self.task_id = task_id - self.build_new_state = build_new_state - self.build_name = build_name - self.build_version = build_version - self.build_release = build_release - self.module_build_id = module_build_id - self.state_reason = state_reason - - -class KojiTagChange(BaseMessage): - """ - A class that inherits from BaseMessage to provide a message - object for a buildsys.tag info (in fedmsg this replaces the msg dictionary) - :param tag: the name of tag (e.g. module-123456789-build) - :param artifact: the name of tagged artifact (e.g. module-build-macros) - :param nvr: the nvr of the tagged artifact - """ - - def __init__(self, msg_id, tag, artifact, nvr): - super(KojiTagChange, self).__init__(msg_id) - self.tag = tag - self.artifact = artifact - self.nvr = nvr - - -class KojiRepoChange(BaseMessage): - """ A class that inherits from BaseMessage to provide a message - object for a repo's info (in fedmsg this replaces the msg dictionary) - :param msg_id: the id of the msg (e.g. 2016-SomeGUID) - :param repo_tag: the repo's tag (e.g. SHADOWBUILD-f25-build) - """ - - def __init__(self, msg_id, repo_tag): - super(KojiRepoChange, self).__init__(msg_id) - self.repo_tag = repo_tag - - -class MBSModule(BaseMessage): - """ A class that inherits from BaseMessage to provide a message - object for a module event generated by module_build_service - :param msg_id: the id of the msg (e.g. 2016-SomeGUID) - :param module_build_id: the id of the module build - :param module_build_state: the state of the module build - """ - - def __init__(self, msg_id, module_build_id, module_build_state): - super(MBSModule, self).__init__(msg_id) - self.module_build_id = module_build_id - self.module_build_state = module_build_state - - -class GreenwaveDecisionUpdate(BaseMessage): - """A class representing message send to topic greenwave.decision.update""" - - def __init__(self, msg_id, decision_context, policies_satisfied, subject_identifier): - super(GreenwaveDecisionUpdate, self).__init__(msg_id) - self.decision_context = decision_context - self.policies_satisfied = policies_satisfied - self.subject_identifier = subject_identifier - - def publish(topic, msg, conf, service): """ Publish a single message to a given backend, and return @@ -331,7 +62,7 @@ def _in_memory_publish(topic, msg, conf, service): # Create fake fedmsg from the message so we can reuse # the BaseMessage.from_fedmsg code to get the particular BaseMessage # class instance. - wrapped_msg = FedmsgMessageParser().parse({ + wrapped_msg = FedmsgMessageParser(known_fedmsg_services).parse({ "msg_id": str(_in_memory_msg_id), "topic": service + "." + topic, "msg": msg @@ -352,16 +83,19 @@ def _in_memory_publish(topic, msg, conf, service): _initial_messages.append(wrapped_msg) +known_fedmsg_services = ["buildsys", "mbs", "greenwave"] + + _fedmsg_backend = { "publish": _fedmsg_publish, - "services": ["buildsys", "mbs", "greenwave"], - "parser": FedmsgMessageParser(), + "parser": FedmsgMessageParser(known_fedmsg_services), + "services": known_fedmsg_services, "topic_suffix": ".", } _in_memory_backend = { "publish": _in_memory_publish, + "parser": FedmsgMessageParser(known_fedmsg_services), # re-used. :) "services": [], - "parser": FedmsgMessageParser(), # re-used. :) "topic_suffix": ".", } diff --git a/module_build_service/models.py b/module_build_service/models.py index 82d57409..3414ca1e 100644 --- a/module_build_service/models.py +++ b/module_build_service/models.py @@ -19,6 +19,7 @@ from sqlalchemy.orm import validates, load_only import module_build_service.messaging from module_build_service import db, log, get_url_for, conf from module_build_service.errors import UnprocessableEntity +from module_build_service.scheduler import events DEFAULT_MODULE_CONTEXT = "00000000" @@ -484,7 +485,7 @@ class ModuleBuild(MBSBase): @classmethod def from_module_event(cls, db_session, event): - if type(event) == module_build_service.messaging.MBSModule: + if type(event) == events.MBSModule: return db_session.query(cls).filter(cls.id == event.module_build_id).first() else: raise ValueError("%r is not a module message." % type(event).__name__) @@ -1128,7 +1129,7 @@ class ComponentBuild(MBSBase): @classmethod def from_component_event(cls, db_session, event): - if isinstance(event, module_build_service.messaging.KojiBuildChange): + if isinstance(event, events.KojiBuildChange): if event.module_build_id: return ( db_session.query(cls) diff --git a/module_build_service/scheduler/__init__.py b/module_build_service/scheduler/__init__.py index 470116b4..01d69e8d 100644 --- a/module_build_service/scheduler/__init__.py +++ b/module_build_service/scheduler/__init__.py @@ -1,87 +1,3 @@ # -*- coding: utf-8 -*- # SPDX-License-Identifier: MIT """ This is a sub-module for backend/scheduler functionality. """ - -import fedmsg -import moksha.hub - -import module_build_service.models -import module_build_service.scheduler.consumer - -from module_build_service.db_session import db_session - -import logging - -log = logging.getLogger(__name__) - - -def main(initial_messages, stop_condition): - """ Run the consumer until some condition is met. - - Setting stop_condition to None will run the consumer forever. - """ - - config = fedmsg.config.load_config() - config["mbsconsumer"] = True - config["mbsconsumer.stop_condition"] = stop_condition - config["mbsconsumer.initial_messages"] = initial_messages - - # Moksha requires that we subscribe to *something*, so tell it /dev/null - # since we'll just be doing in-memory queue-based messaging for this single - # build. - config["zmq_enabled"] = True - config["zmq_subscribe_endpoints"] = "ipc:///dev/null" - - consumers = [module_build_service.scheduler.consumer.MBSConsumer] - - # Note that the hub we kick off here cannot send any message. You - # should use fedmsg.publish(...) still for that. - moksha.hub.main( - # Pass in our config dict - options=config, - # Only run the specified consumers if any are so specified. - consumers=consumers, - # Do not run default producers. - producers=[], - # Tell moksha to quiet its logging. - framework=False, - ) - - -def make_simple_stop_condition(): - """ Return a simple stop_condition callable. - - Intended to be used with the main() function here in manage.py and tests. - - The stop_condition returns true when the latest module build enters the any - of the finished states. - """ - - def stop_condition(message): - # XXX - We ignore the message here and instead just query the DB. - - # Grab the latest module build. - module = ( - db_session.query(module_build_service.models.ModuleBuild) - .order_by(module_build_service.models.ModuleBuild.id.desc()) - .first() - ) - done = ( - module_build_service.models.BUILD_STATES["failed"], - module_build_service.models.BUILD_STATES["ready"], - module_build_service.models.BUILD_STATES["done"], - ) - result = module.state in done - log.debug("stop_condition checking %r, got %r" % (module, result)) - - # moksha.hub.main starts the hub and runs it in a separate thread. When - # the result is True, remove the db_session from that thread local so - # that any pending queries in the transaction will not block other - # queries made from other threads. - # This is useful for testing particularly. - if result: - db_session.remove() - - return result - - return stop_condition diff --git a/module_build_service/scheduler/consumer.py b/module_build_service/scheduler/consumer.py index a50c51dd..71258d46 100644 --- a/module_build_service/scheduler/consumer.py +++ b/module_build_service/scheduler/consumer.py @@ -33,6 +33,7 @@ from module_build_service import models, log, conf from module_build_service.db_session import db_session from module_build_service.scheduler.handlers import greenwave from module_build_service.utils import module_build_state_from_msg +from module_build_service.scheduler import events class MBSConsumer(fedmsg.consumers.FedmsgConsumer): @@ -124,7 +125,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): def validate(self, message): if conf.messaging == "fedmsg": # If this is a faked internal message, don't bother. - if isinstance(message, module_build_service.messaging.BaseMessage): + if isinstance(message, events.BaseMessage): log.info("Skipping crypto validation for %r" % message) return # Otherwise, if it is a real message from the network, pass it @@ -139,7 +140,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): # messages, then just use them as-is. If they are not already # instances of our message abstraction base class, then first transform # them before proceeding. - if isinstance(message, module_build_service.messaging.BaseMessage): + if isinstance(message, events.BaseMessage): msg = message else: msg = self.get_abstracted_msg(message) @@ -169,7 +170,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): if parser: try: return parser.parse(message) - except module_build_service.messaging.IgnoreMessage: + except events.IgnoreMessage: pass else: raise ValueError("{0} backend does not define a message parser".format(conf.messaging)) @@ -198,32 +199,32 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): def _map_message(self, db_session, msg): """Map message to its corresponding event handler and module build""" - if isinstance(msg, module_build_service.messaging.KojiBuildChange): + if isinstance(msg, events.KojiBuildChange): handler = self.on_build_change[msg.build_new_state] build = models.ComponentBuild.from_component_event(db_session, msg) if build: build = build.module_build return handler, build - if isinstance(msg, module_build_service.messaging.KojiRepoChange): + if isinstance(msg, events.KojiRepoChange): return ( self.on_repo_change, models.ModuleBuild.from_repo_done_event(db_session, msg) ) - if isinstance(msg, module_build_service.messaging.KojiTagChange): + if isinstance(msg, events.KojiTagChange): return ( self.on_tag_change, models.ModuleBuild.from_tag_change_event(db_session, msg) ) - if isinstance(msg, module_build_service.messaging.MBSModule): + if isinstance(msg, events.MBSModule): return ( self.on_module_change[module_build_state_from_msg(msg)], models.ModuleBuild.from_module_event(db_session, msg) ) - if isinstance(msg, module_build_service.messaging.GreenwaveDecisionUpdate): + if isinstance(msg, events.GreenwaveDecisionUpdate): return ( self.on_decision_update, greenwave.get_corresponding_module_build(msg.subject_identifier) @@ -305,6 +306,6 @@ def work_queue_put(msg): def fake_repo_done_message(tag_name): - msg = module_build_service.messaging.KojiRepoChange( + msg = events.KojiRepoChange( msg_id="a faked internal message", repo_tag=tag_name + "-build") work_queue_put(msg) diff --git a/module_build_service/scheduler/events.py b/module_build_service/scheduler/events.py new file mode 100644 index 00000000..70501f67 --- /dev/null +++ b/module_build_service/scheduler/events.py @@ -0,0 +1,151 @@ +# -*- coding: utf-8 -*- +# SPDX-License-Identifier: MIT + +try: + from inspect import signature +except ImportError: + from funcsigs import signature + + +class IgnoreMessage(Exception): + pass + + +class BaseMessage(object): + def __init__(self, msg_id): + """ + A base class to abstract messages from different backends + :param msg_id: the id of the msg (e.g. 2016-SomeGUID) + """ + self.msg_id = msg_id + + # Moksha calls `consumer.validate` on messages that it receives, and + # even though we have validation turned off in the config there's still + # a step that tries to access `msg['body']`, `msg['topic']` and + # `msg.get('topic')`. + # These are here just so that the `validate` method won't raise an + # exception when we push our fake messages through. + # Note that, our fake message pushing has worked for a while... but the + # *latest* version of fedmsg has some code that exercises the bug. I + # didn't hit this until I went to test in jenkins. + self.body = {} + self.topic = None + + def __repr__(self): + init_sig = signature(self.__init__) + + args_strs = ( + "{}={!r}".format(name, getattr(self, name)) + if param.default != param.empty + else repr(getattr(self, name)) + for name, param in init_sig.parameters.items() + ) + + return "{}({})".format(type(self).__name__, ", ".join(args_strs)) + + def __getitem__(self, key): + """ Used to trick moksha into thinking we are a dict. """ + return getattr(self, key) + + def __setitem__(self, key, value): + """ Used to trick moksha into thinking we are a dict. """ + return setattr(self, key, value) + + def get(self, key, value=None): + """ Used to trick moksha into thinking we are a dict. """ + return getattr(self, key, value) + + def __json__(self): + return dict(msg_id=self.msg_id, topic=self.topic, body=self.body) + + +class KojiBuildChange(BaseMessage): + """ A class that inherits from BaseMessage to provide a message + object for a build's info (in fedmsg this replaces the msg dictionary) + :param msg_id: the id of the msg (e.g. 2016-SomeGUID) + :param build_id: the id of the build (e.g. 264382) + :param build_new_state: the new build state, this is currently a Koji + integer + :param build_name: the name of what is being built + (e.g. golang-googlecode-tools) + :param build_version: the version of the build (e.g. 6.06.06) + :param build_release: the release of the build (e.g. 4.fc25) + :param module_build_id: the optional id of the module_build in the database + :param state_reason: the optional reason as to why the state changed + """ + + def __init__( + self, + msg_id, + build_id, + task_id, + build_new_state, + build_name, + build_version, + build_release, + module_build_id=None, + state_reason=None, + ): + if task_id is None: + raise IgnoreMessage("KojiBuildChange with a null task_id is invalid.") + super(KojiBuildChange, self).__init__(msg_id) + self.build_id = build_id + self.task_id = task_id + self.build_new_state = build_new_state + self.build_name = build_name + self.build_version = build_version + self.build_release = build_release + self.module_build_id = module_build_id + self.state_reason = state_reason + + +class KojiTagChange(BaseMessage): + """ + A class that inherits from BaseMessage to provide a message + object for a buildsys.tag info (in fedmsg this replaces the msg dictionary) + :param tag: the name of tag (e.g. module-123456789-build) + :param artifact: the name of tagged artifact (e.g. module-build-macros) + :param nvr: the nvr of the tagged artifact + """ + + def __init__(self, msg_id, tag, artifact, nvr): + super(KojiTagChange, self).__init__(msg_id) + self.tag = tag + self.artifact = artifact + self.nvr = nvr + + +class KojiRepoChange(BaseMessage): + """ A class that inherits from BaseMessage to provide a message + object for a repo's info (in fedmsg this replaces the msg dictionary) + :param msg_id: the id of the msg (e.g. 2016-SomeGUID) + :param repo_tag: the repo's tag (e.g. SHADOWBUILD-f25-build) + """ + + def __init__(self, msg_id, repo_tag): + super(KojiRepoChange, self).__init__(msg_id) + self.repo_tag = repo_tag + + +class MBSModule(BaseMessage): + """ A class that inherits from BaseMessage to provide a message + object for a module event generated by module_build_service + :param msg_id: the id of the msg (e.g. 2016-SomeGUID) + :param module_build_id: the id of the module build + :param module_build_state: the state of the module build + """ + + def __init__(self, msg_id, module_build_id, module_build_state): + super(MBSModule, self).__init__(msg_id) + self.module_build_id = module_build_id + self.module_build_state = module_build_state + + +class GreenwaveDecisionUpdate(BaseMessage): + """A class representing message send to topic greenwave.decision.update""" + + def __init__(self, msg_id, decision_context, policies_satisfied, subject_identifier): + super(GreenwaveDecisionUpdate, self).__init__(msg_id) + self.decision_context = decision_context + self.policies_satisfied = policies_satisfied + self.subject_identifier = subject_identifier diff --git a/module_build_service/scheduler/handlers/components.py b/module_build_service/scheduler/handlers/components.py index 05adf4fc..94de4f63 100644 --- a/module_build_service/scheduler/handlers/components.py +++ b/module_build_service/scheduler/handlers/components.py @@ -6,9 +6,10 @@ import logging import koji import module_build_service.builder +from module_build_service import models, log from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder +from module_build_service.scheduler import events from module_build_service.utils.general import mmd_to_str -from module_build_service import models, log, messaging from module_build_service.db_session import db_session logging.basicConfig(level=logging.DEBUG) @@ -110,7 +111,7 @@ def _finalize(config, msg, state): # change message here. log.info("Batch done. No component to tag") further_work += [ - messaging.KojiRepoChange( + events.KojiRepoChange( "components::_finalize: fake msg", builder.module_build_tag["name"]) ] else: diff --git a/module_build_service/scheduler/handlers/modules.py b/module_build_service/scheduler/handlers/modules.py index 0bad2018..d59db174 100644 --- a/module_build_service/scheduler/handlers/modules.py +++ b/module_build_service/scheduler/handlers/modules.py @@ -6,7 +6,6 @@ from module_build_service import conf, models, log, build_logs import module_build_service.builder import module_build_service.resolver import module_build_service.utils -import module_build_service.messaging from module_build_service.utils import ( attempt_to_reuse_all_components, record_component_builds, @@ -17,6 +16,7 @@ from module_build_service.utils import ( ) from module_build_service.db_session import db_session from module_build_service.errors import UnprocessableEntity, Forbidden, ValidationError +from module_build_service.scheduler import events from module_build_service.utils.greenwave import greenwave from module_build_service.scheduler.default_modules import ( add_default_modules, handle_collisions_with_base_module_rpms) @@ -382,7 +382,7 @@ def wait(config, msg): # Return a KojiRepoChange message so that the build can be transitioned to done # in the repos handler return [ - module_build_service.messaging.KojiRepoChange( + events.KojiRepoChange( "handlers.modules.wait: fake msg", builder.module_build_tag["name"]) ] @@ -458,7 +458,7 @@ def wait(config, msg): db_session.commit() else: further_work.append( - module_build_service.messaging.KojiRepoChange( + events.KojiRepoChange( "fake msg", builder.module_build_tag["name"]) ) return further_work diff --git a/module_build_service/scheduler/handlers/tags.py b/module_build_service/scheduler/handlers/tags.py index f7bb92b9..8938df10 100644 --- a/module_build_service/scheduler/handlers/tags.py +++ b/module_build_service/scheduler/handlers/tags.py @@ -5,8 +5,9 @@ import module_build_service.builder import logging import koji -from module_build_service import models, log, messaging +from module_build_service import models, log from module_build_service.db_session import db_session +from module_build_service.scheduler import events logging.basicConfig(level=logging.DEBUG) @@ -70,7 +71,7 @@ def tagged(config, msg): log.info( "All components in module tagged and built, skipping the last repo regeneration") further_work += [ - messaging.KojiRepoChange( + events.KojiRepoChange( "components::_finalize: fake msg", builder.module_build_tag["name"]) ] db_session.commit() diff --git a/module_build_service/scheduler/local.py b/module_build_service/scheduler/local.py new file mode 100644 index 00000000..c190395f --- /dev/null +++ b/module_build_service/scheduler/local.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# SPDX-License-Identifier: MIT + +import fedmsg +import logging +import module_build_service.models +import moksha.hub + +from module_build_service.db_session import db_session + +log = logging.getLogger(__name__) + + +""" +This module contains functions to control fedmsg-hub running locally for local +module build and running tests within test_build.py particularly. +""" + +__all__ = ["main", "make_simple_stop_condition"] + + +def main(initial_messages, stop_condition): + """ Run the consumer until some condition is met. + + Setting stop_condition to None will run the consumer forever. + """ + + config = fedmsg.config.load_config() + config["mbsconsumer"] = True + config["mbsconsumer.stop_condition"] = stop_condition + config["mbsconsumer.initial_messages"] = initial_messages + + # Moksha requires that we subscribe to *something*, so tell it /dev/null + # since we'll just be doing in-memory queue-based messaging for this single + # build. + config["zmq_enabled"] = True + config["zmq_subscribe_endpoints"] = "ipc:///dev/null" + + # Lazy import consumer to avoid potential import cycle. + # For example, in some cases, importing event message from events.py would + # cause importing the consumer module, which then starts to import relative + # code inside handlers module, and the original code is imported eventually. + import module_build_service.scheduler.consumer + + consumers = [module_build_service.scheduler.consumer.MBSConsumer] + + # Note that the hub we kick off here cannot send any message. You + # should use fedmsg.publish(...) still for that. + moksha.hub.main( + # Pass in our config dict + options=config, + # Only run the specified consumers if any are so specified. + consumers=consumers, + # Do not run default producers. + producers=[], + # Tell moksha to quiet its logging. + framework=False, + ) + + +def make_simple_stop_condition(): + """ Return a simple stop_condition callable. + + Intended to be used with the main() function here in manage.py and tests. + + The stop_condition returns true when the latest module build enters the any + of the finished states. + """ + + def stop_condition(message): + # XXX - We ignore the message here and instead just query the DB. + + # Grab the latest module build. + module = ( + db_session.query(module_build_service.models.ModuleBuild) + .order_by(module_build_service.models.ModuleBuild.id.desc()) + .first() + ) + done = ( + module_build_service.models.BUILD_STATES["failed"], + module_build_service.models.BUILD_STATES["ready"], + module_build_service.models.BUILD_STATES["done"], + ) + result = module.state in done + log.debug("stop_condition checking %r, got %r" % (module, result)) + + # moksha.hub.main starts the hub and runs it in a separate thread. When + # the result is True, remove the db_session from that thread local so + # that any pending queries in the transaction will not block other + # queries made from other threads. + # This is useful for testing particularly. + if result: + db_session.remove() + + return result + + return stop_condition diff --git a/module_build_service/scheduler/parser.py b/module_build_service/scheduler/parser.py new file mode 100644 index 00000000..3ed95d24 --- /dev/null +++ b/module_build_service/scheduler/parser.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +# SPDX-License-Identifier: MIT + +import re + +from module_build_service import log +from module_build_service.scheduler import events + + +class MessageParser(object): + """Base class for parsing messages received from a specific message bus + + :param topic_categories: list of known services, that MBS can handle the + messages sent from them. For example, a value could be + ``["buildsys", "mbs", "greenwave"]``. + :type topic_categories: list[str] + """ + + def __init__(self, topic_categories): + self.topic_categories = topic_categories + + def parse(self, msg): + raise NotImplementedError() + + +class FedmsgMessageParser(MessageParser): + + def parse(self, msg): + """ + Parse a received message and convert it to a consistent format + + :param dict msg: the message contents from the message bus. + :return: a mapping representing the corresponding event. + If the topic isn't recognized, None is returned. + :rtype: dict or None + """ + + if "body" in msg: + msg = msg["body"] + topic = msg["topic"] + categories_re = "|".join(map(re.escape, self.topic_categories)) + regex_pattern = re.compile( + r"(?P" + categories_re + r")" + r"(?:(?:\.)(?Pbuild|repo|module|decision))?" + r"(?:(?:\.)(?Pstate|build))?" + r"(?:\.)(?Pchange|done|end|tag|update)$" + ) + regex_results = re.search(regex_pattern, topic) + + if regex_results: + category = regex_results.group("category") + object = regex_results.group("object") + subobject = regex_results.group("subobject") + event = regex_results.group("event") + + msg_id = msg.get("msg_id") + msg_inner_msg = msg.get("msg") + + # If there isn't a msg dict in msg then this message can be skipped + if not msg_inner_msg: + log.debug( + "Skipping message without any content with the " 'topic "{0}"'.format(topic)) + return None + + # Ignore all messages from the secondary koji instances. + if category == "buildsys": + instance = msg_inner_msg.get("instance", "primary") + if instance != "primary": + log.debug("Ignoring message from %r koji hub." % instance) + return + + if object == "build" and subobject == "state" and event == "change": + build_id = msg_inner_msg.get("build_id") + task_id = msg_inner_msg.get("task_id") + build_new_state = msg_inner_msg.get("new") + build_name = msg_inner_msg.get("name") + build_version = msg_inner_msg.get("version") + build_release = msg_inner_msg.get("release") + + return events.KojiBuildChange( + msg_id, + build_id, + task_id, + build_new_state, + build_name, + build_version, + build_release, + ) + + if object == "repo" and subobject is None and event == "done": + repo_tag = msg_inner_msg.get("tag") + return events.KojiRepoChange(msg_id, repo_tag) + + if event == "tag": + tag = msg_inner_msg.get("tag") + name = msg_inner_msg.get("name") + version = msg_inner_msg.get("version") + release = msg_inner_msg.get("release") + nvr = None + if name and version and release: + nvr = "-".join((name, version, release)) + return events.KojiTagChange(msg_id, tag, name, nvr) + + if (category == "mbs" + and object == "module" and subobject == "state" and event == "change"): + return events.MBSModule( + msg_id, + msg_inner_msg.get("id"), + msg_inner_msg.get("state")) + + if (category == "greenwave" + and object == "decision" and subobject is None and event == "update"): + return events.GreenwaveDecisionUpdate( + msg_id=msg_id, + decision_context=msg_inner_msg.get("decision_context"), + policies_satisfied=msg_inner_msg.get("policies_satisfied"), + subject_identifier=msg_inner_msg.get("subject_identifier"), + ) diff --git a/module_build_service/scheduler/producer.py b/module_build_service/scheduler/producer.py index 1f0af2a8..0f8d190d 100644 --- a/module_build_service/scheduler/producer.py +++ b/module_build_service/scheduler/producer.py @@ -16,6 +16,7 @@ import module_build_service.scheduler.consumer from module_build_service import conf, models, log from module_build_service.builder import GenericBuilder from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder +from module_build_service.scheduler import events from module_build_service.utils.greenwave import greenwave from module_build_service.db_session import db_session @@ -109,7 +110,7 @@ class MBSProducer(PollingProducer): log.info(" task {0!r} is in state {1!r}".format(task_id, task_info["state"])) if task_info["state"] in state_mapping: # Fake a fedmsg message on our internal queue - msg = module_build_service.messaging.KojiBuildChange( + msg = events.KojiBuildChange( msg_id="producer::fail_lost_builds fake msg", build_id=component_build.task_id, task_id=component_build.task_id, @@ -214,7 +215,7 @@ class MBSProducer(PollingProducer): # Fake a message to kickstart the build anew in the consumer state = module_build_service.models.BUILD_STATES[state_name] - msg = module_build_service.messaging.MBSModule( + msg = events.MBSModule( "nudge_module_builds_fake_message", build.id, state) log.info(" Scheduling faked event %r" % msg) module_build_service.scheduler.consumer.work_queue_put(msg) @@ -436,7 +437,7 @@ class MBSProducer(PollingProducer): # If it is tagged in final tag, but MBS does not think so, # schedule fake message. if not c.tagged_in_final and module_build.koji_tag in tags: - msg = module_build_service.messaging.KojiTagChange( + msg = events.KojiTagChange( "sync_koji_build_tags_fake_message", module_build.koji_tag, c.package, c.nvr ) log.info(" Scheduling faked event %r" % msg) @@ -446,7 +447,7 @@ class MBSProducer(PollingProducer): # schedule fake message. build_tag = module_build.koji_tag + "-build" if not c.tagged and build_tag in tags: - msg = module_build_service.messaging.KojiTagChange( + msg = events.KojiTagChange( "sync_koji_build_tags_fake_message", build_tag, c.package, c.nvr) log.info(" Scheduling faked event %r" % msg) module_build_service.scheduler.consumer.work_queue_put(msg) diff --git a/module_build_service/utils/batches.py b/module_build_service/utils/batches.py index 71b4ffab..2e95e2f7 100644 --- a/module_build_service/utils/batches.py +++ b/module_build_service/utils/batches.py @@ -4,8 +4,8 @@ import threading import concurrent.futures from module_build_service import conf, log, models -import module_build_service.messaging from module_build_service.db_session import db_session +from module_build_service.scheduler.events import KojiRepoChange from .reuse import get_reusable_components, reuse_component @@ -275,8 +275,7 @@ def start_next_batch_build(config, module, builder, components=None): # message and return if components_reused and not unbuilt_components_after_reuse: further_work.append( - module_build_service.messaging.KojiRepoChange( - "start_build_batch: fake msg", builder.module_build_tag["name"]) + KojiRepoChange("start_build_batch: fake msg", builder.module_build_tag["name"]) ) return further_work diff --git a/module_build_service/utils/reuse.py b/module_build_service/utils/reuse.py index 675de2f4..92aed78d 100644 --- a/module_build_service/utils/reuse.py +++ b/module_build_service/utils/reuse.py @@ -2,11 +2,11 @@ # SPDX-License-Identifier: MIT import kobo.rpmlib -import module_build_service.messaging from module_build_service import log, models, conf from module_build_service.db_session import db_session -from module_build_service.utils.mse import get_base_module_mmds from module_build_service.resolver import GenericResolver +from module_build_service.scheduler.events import KojiBuildChange +from module_build_service.utils.mse import get_base_module_mmds def reuse_component(component, previous_component_build, change_state_now=False): @@ -45,7 +45,7 @@ def reuse_component(component, previous_component_build, change_state_now=False) # Add this message to further_work so that the reused # component will be tagged properly return [ - module_build_service.messaging.KojiBuildChange( + KojiBuildChange( msg_id="reuse_component: fake msg", build_id=None, task_id=component.task_id, diff --git a/tests/test_build/test_build.py b/tests/test_build/test_build.py index 74bc6bfc..5dd8b26b 100644 --- a/tests/test_build/test_build.py +++ b/tests/test_build/test_build.py @@ -17,7 +17,7 @@ import module_build_service.utils from module_build_service.errors import Forbidden from module_build_service import models, conf, build_logs from module_build_service.db_session import db_session -from module_build_service.scheduler import make_simple_stop_condition +from module_build_service.scheduler.local import make_simple_stop_condition from mock import patch, PropertyMock, Mock, MagicMock from werkzeug.datastructures import FileStorage @@ -29,7 +29,7 @@ import itertools from module_build_service.builder.base import GenericBuilder from module_build_service.builder.KojiModuleBuilder import KojiModuleBuilder -from module_build_service.messaging import MBSModule +from module_build_service.scheduler import events from tests import ( app, clean_database, read_staged_data, staged_data_filename ) @@ -232,7 +232,7 @@ class FakeModuleBuilder(GenericBuilder): return {"name": self.tag_name + "-build"} def _send_repo_done(self): - msg = module_build_service.messaging.KojiRepoChange( + msg = events.KojiRepoChange( msg_id="a faked internal message", repo_tag=self.tag_name + "-build") module_build_service.scheduler.consumer.work_queue_put(msg) @@ -241,14 +241,14 @@ class FakeModuleBuilder(GenericBuilder): tag = self.tag_name else: tag = self.tag_name + "-build" - msg = module_build_service.messaging.KojiTagChange( + msg = events.KojiTagChange( msg_id="a faked internal message", tag=tag, artifact=artifact, nvr=nvr) module_build_service.scheduler.consumer.work_queue_put(msg) def _send_build_change(self, state, name, build_id): # build_id=1 and task_id=1 are OK here, because we are building just # one RPM at the time. - msg = module_build_service.messaging.KojiBuildChange( + msg = events.KojiBuildChange( msg_id="a faked internal message", build_id=build_id, task_id=build_id, @@ -299,7 +299,7 @@ class FakeModuleBuilder(GenericBuilder): nvr_dict = kobo.rpmlib.parse_nvr(component_build.nvr) # Send a message stating the build is complete msgs.append( - module_build_service.messaging.KojiBuildChange( + events.KojiBuildChange( "recover_orphaned_artifact: fake message", randint(1, 9999999), component_build.task_id, @@ -312,7 +312,7 @@ class FakeModuleBuilder(GenericBuilder): ) # Send a message stating that the build was tagged in the build tag msgs.append( - module_build_service.messaging.KojiTagChange( + events.KojiTagChange( "recover_orphaned_artifact: fake message", component_build.module_build.koji_tag + "-build", component_build.package, @@ -339,7 +339,7 @@ def cleanup_moksha(): class BaseTestBuild: def run_scheduler(self, msgs=None, stop_condition=None): - module_build_service.scheduler.main( + module_build_service.scheduler.local.main( msgs or [], stop_condition or make_simple_stop_condition() ) @@ -891,7 +891,7 @@ class TestBuild(BaseTestBuild): Stop the scheduler when the module is built or when we try to build more components than the num_concurrent_builds. """ - main_stop = module_build_service.scheduler.make_simple_stop_condition() + main_stop = make_simple_stop_condition() num_building = ( db_session.query(models.ComponentBuild) .filter_by(state=koji.BUILD_STATES["BUILDING"]) @@ -1092,7 +1092,7 @@ class TestBuild(BaseTestBuild): from module_build_service.db_session import db_session # Create a dedicated database session for scheduler to avoid hang - self.run_scheduler(msgs=[MBSModule("local module build", 3, 1)]) + self.run_scheduler(msgs=[events.MBSModule("local module build", 3, 1)]) reused_component_ids = { "module-build-macros": None, @@ -1171,7 +1171,7 @@ class TestBuild(BaseTestBuild): FakeModuleBuilder.on_buildroot_add_artifacts_cb = on_buildroot_add_artifacts_cb - self.run_scheduler(msgs=[MBSModule("local module build", 3, 1)]) + self.run_scheduler(msgs=[events.MBSModule("local module build", 3, 1)]) # All components should be built and module itself should be in "done" # or "ready" state. @@ -1742,7 +1742,7 @@ class TestBuild(BaseTestBuild): cleanup_moksha() module = models.ModuleBuild.get_by_id(db_session, module_build_id) msgs = [ - module_build_service.messaging.KojiRepoChange( + events.KojiRepoChange( msg_id="a faked internal message", repo_tag=module.koji_tag + "-build" ) ] diff --git a/tests/test_builder/test_koji.py b/tests/test_builder/test_koji.py index 61ba4639..732c85de 100644 --- a/tests/test_builder/test_koji.py +++ b/tests/test_builder/test_koji.py @@ -17,6 +17,7 @@ import module_build_service.builder from module_build_service import Modulemd from module_build_service.db_session import db_session from module_build_service.utils.general import mmd_to_str +from module_build_service.scheduler import events import pytest from mock import patch, MagicMock @@ -150,7 +151,7 @@ class TestKojiBuilder: db_session.commit() assert len(actual) == 3 - assert type(actual[0]) == module_build_service.messaging.KojiBuildChange + assert type(actual[0]) == events.KojiBuildChange assert actual[0].build_id == 91 assert actual[0].task_id == 12345 assert actual[0].build_new_state == koji.BUILD_STATES["COMPLETE"] @@ -158,10 +159,10 @@ class TestKojiBuilder: assert actual[0].build_version == "1.0" assert actual[0].build_release == "1.module+e0095747" assert actual[0].module_build_id == 4 - assert type(actual[1]) == module_build_service.messaging.KojiTagChange + assert type(actual[1]) == events.KojiTagChange assert actual[1].tag == "module-foo-build" assert actual[1].artifact == "rubygem-rails" - assert type(actual[2]) == module_build_service.messaging.KojiTagChange + assert type(actual[2]) == events.KojiTagChange assert actual[2].tag == "module-foo" assert actual[2].artifact == "rubygem-rails" assert component_build.state == koji.BUILD_STATES["COMPLETE"] @@ -205,7 +206,7 @@ class TestKojiBuilder: db_session.commit() assert len(actual) == 1 - assert type(actual[0]) == module_build_service.messaging.KojiBuildChange + assert type(actual[0]) == events.KojiBuildChange assert actual[0].build_id == 91 assert actual[0].task_id == 12345 assert actual[0].build_new_state == koji.BUILD_STATES["COMPLETE"] @@ -259,7 +260,7 @@ class TestKojiBuilder: db_session.commit() assert len(actual) == 1 - assert type(actual[0]) == module_build_service.messaging.KojiBuildChange + assert type(actual[0]) == events.KojiBuildChange assert actual[0].build_id == 91 assert actual[0].task_id == 12345 assert actual[0].build_new_state == koji.BUILD_STATES["COMPLETE"] diff --git a/tests/test_manage.py b/tests/test_manage.py index a628c1ca..b16dc679 100644 --- a/tests/test_manage.py +++ b/tests/test_manage.py @@ -149,7 +149,7 @@ class TestCommandBuildModuleLocally: finally: app.config["SQLALCHEMY_DATABASE_URI"] = original_db_uri - @patch("module_build_service.scheduler.main") + @patch("module_build_service.scheduler.local.main") def test_set_stream(self, main): cli_cmd = [ "mbs-manager", "build_module_locally", @@ -159,9 +159,9 @@ class TestCommandBuildModuleLocally: self._run_manager_wrapper(cli_cmd) - # Since module_build_service.scheduler.main is mocked, MBS does not - # really build the testmodule for this test. Following lines assert the - # fact: + # Since module_build_service.scheduler.local.main is mocked, MBS does + # not really build the testmodule for this test. Following lines assert + # the fact: # Module testmodule-local-build is expanded and stored into database, # and this build has buildrequires platform:f28 and requires # platform:f28. @@ -211,7 +211,7 @@ class TestCommandBuildModuleLocally: # We don't run consumer actually, but it could be patched to mark some # module build failed for test purpose. - with patch("module_build_service.scheduler.main", + with patch("module_build_service.scheduler.local.main", side_effect=main_side_effect): with pytest.raises(RuntimeError, match="Module build failed"): self._run_manager_wrapper(cli_cmd) diff --git a/tests/test_messaging.py b/tests/test_messaging.py index 95de2369..3a6f9244 100644 --- a/tests/test_messaging.py +++ b/tests/test_messaging.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- # SPDX-License-Identifier: MIT + from module_build_service import messaging -from module_build_service.messaging import KojiRepoChange # noqa +from module_build_service.scheduler.parser import FedmsgMessageParser class TestFedmsgMessaging: @@ -25,7 +26,8 @@ class TestFedmsgMessaging: "topic": "org.fedoraproject.prod.buildsys.build.state.change", } - msg = messaging.FedmsgMessageParser().parse(buildsys_state_change_msg) + parser = FedmsgMessageParser(messaging.known_fedmsg_services) + msg = parser.parse(buildsys_state_change_msg) assert msg.build_id == 614503 assert msg.build_new_state == 1 @@ -49,7 +51,8 @@ class TestFedmsgMessaging: "topic": "org.fedoraproject.prod.buildsys.tag", } - msg = messaging.FedmsgMessageParser().parse(buildsys_tag_msg) + parser = FedmsgMessageParser(messaging.known_fedmsg_services) + msg = parser.parse(buildsys_tag_msg) assert msg.tag == "module-debugging-tools-master-20170405115403-build" assert msg.artifact == "module-build-macros" @@ -68,6 +71,7 @@ class TestFedmsgMessaging: "topic": "org.fedoraproject.prod.buildsys.repo.done", } - msg = messaging.FedmsgMessageParser().parse(buildsys_tag_msg) + parser = FedmsgMessageParser(messaging.known_fedmsg_services) + msg = parser.parse(buildsys_tag_msg) assert msg.repo_tag == "module-f0f7e44f3c6cccab-build" diff --git a/tests/test_scheduler/test_consumer.py b/tests/test_scheduler/test_consumer.py index fde901bb..3e608a7f 100644 --- a/tests/test_scheduler/test_consumer.py +++ b/tests/test_scheduler/test_consumer.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: MIT from mock import patch, MagicMock from module_build_service.scheduler.consumer import MBSConsumer -from module_build_service.messaging import KojiTagChange, KojiRepoChange +from module_build_service.scheduler.events import KojiTagChange, KojiRepoChange class TestConsumer: diff --git a/tests/test_scheduler/test_module_init.py b/tests/test_scheduler/test_module_init.py index 259bbfe8..1f5925cc 100644 --- a/tests/test_scheduler/test_module_init.py +++ b/tests/test_scheduler/test_module_init.py @@ -12,6 +12,7 @@ from module_build_service import build_logs from module_build_service.db_session import db_session from module_build_service.models import ModuleBuild from module_build_service.utils.general import mmd_to_str, load_mmd +from module_build_service.scheduler.events import MBSModule class TestModuleInit: @@ -70,10 +71,7 @@ class TestModuleInit: platform_build.modulemd = mmd_to_str(mmd) db_session.commit() - msg = module_build_service.messaging.MBSModule( - msg_id=None, module_build_id=2, module_build_state="init" - ) - + msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="init") self.fn(config=conf, msg=msg) build = ModuleBuild.get_by_id(db_session, 2) @@ -114,8 +112,7 @@ class TestModuleInit: get_latest_error=RuntimeError("Failed in mocked_scm_get_latest") ) - msg = module_build_service.messaging.MBSModule( - msg_id=None, module_build_id=2, module_build_state="init") + msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="init") self.fn(config=conf, msg=msg) build = ModuleBuild.get_by_id(db_session, 2) @@ -141,8 +138,7 @@ class TestModuleInit: scmurl = "git://pkgs.domain.local/modules/includedmodule?#da95886" ModuleBuild.create( db_session, conf, "includemodule", "1", 3, mmd_to_str(mmd), scmurl, "mprahl") - msg = module_build_service.messaging.MBSModule( - msg_id=None, module_build_id=3, module_build_state="init") + msg = MBSModule(msg_id=None, module_build_id=3, module_build_state="init") self.fn(config=conf, msg=msg) build = ModuleBuild.get_by_id(db_session, 3) assert build.state == 1 @@ -177,8 +173,7 @@ class TestModuleInit: "7035bd33614972ac66559ac1fdd019ff6027ad22", get_latest_raise=True, ) - msg = module_build_service.messaging.MBSModule( - msg_id=None, module_build_id=2, module_build_state="init") + msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="init") build = ModuleBuild.get_by_id(db_session, 2) mocked_from_module_event.return_value = build diff --git a/tests/test_scheduler/test_module_wait.py b/tests/test_scheduler/test_module_wait.py index f49769e9..99f2906d 100644 --- a/tests/test_scheduler/test_module_wait.py +++ b/tests/test_scheduler/test_module_wait.py @@ -12,6 +12,7 @@ import module_build_service.resolver from module_build_service import build_logs, Modulemd from module_build_service.db_session import db_session from module_build_service.models import ComponentBuild, ModuleBuild +from module_build_service.scheduler.events import MBSModule base_dir = os.path.dirname(os.path.dirname(__file__)) @@ -40,7 +41,7 @@ class TestModuleWait: create_builder.return_value = builder module_build_id = db_session.query(ModuleBuild).first().id - msg = module_build_service.messaging.MBSModule( + msg = MBSModule( msg_id=None, module_build_id=module_build_id, module_build_state="some state") @@ -80,8 +81,7 @@ class TestModuleWait: resolver.get_module_tag.return_value = "module-testmodule-master-20170109091357" generic_resolver.create.return_value = resolver - msg = module_build_service.messaging.MBSModule( - msg_id=None, module_build_id=2, module_build_state="some state") + msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="some state") module_build_service.scheduler.handlers.modules.wait( config=conf, msg=msg) @@ -127,8 +127,7 @@ class TestModuleWait: resolver.get_module_tag.return_value = "module-testmodule-master-20170109091357" generic_resolver.create.return_value = resolver - msg = module_build_service.messaging.MBSModule( - msg_id=None, module_build_id=2, module_build_state="some state") + msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="some state") module_build_service.scheduler.handlers.modules.wait( config=conf, msg=msg) @@ -173,8 +172,7 @@ class TestModuleWait: } generic_resolver.create.return_value = resolver - msg = module_build_service.messaging.MBSModule( - msg_id=None, module_build_id=2, module_build_state="some state") + msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="some state") module_build_service.scheduler.handlers.modules.wait( config=conf, msg=msg) @@ -244,9 +242,7 @@ class TestModuleWait: new=koji_cg_tag_build, ): generic_resolver.create.return_value = resolver - msg = module_build_service.messaging.MBSModule( - msg_id=None, module_build_id=2, module_build_state="some state" - ) + msg = MBSModule(msg_id=None, module_build_id=2, module_build_state="some state") module_build_service.scheduler.handlers.modules.wait( config=conf, msg=msg ) diff --git a/tests/test_scheduler/test_poller.py b/tests/test_scheduler/test_poller.py index 0caa8a1a..9a83390f 100644 --- a/tests/test_scheduler/test_poller.py +++ b/tests/test_scheduler/test_poller.py @@ -7,9 +7,9 @@ from module_build_service import models, conf from tests import clean_database, make_module_in_db import mock import koji -from module_build_service.scheduler.producer import MBSProducer -from module_build_service.messaging import KojiTagChange from module_build_service.db_session import db_session +from module_build_service.scheduler.producer import MBSProducer +from module_build_service.scheduler.events import KojiTagChange import six.moves.queue as queue from datetime import datetime, timedelta diff --git a/tests/test_scheduler/test_repo_done.py b/tests/test_scheduler/test_repo_done.py index 4032eaf4..355873e9 100644 --- a/tests/test_scheduler/test_repo_done.py +++ b/tests/test_scheduler/test_repo_done.py @@ -7,6 +7,7 @@ import module_build_service.scheduler.handlers.repos import module_build_service.models from module_build_service.db_session import db_session from module_build_service.models import ComponentBuild +from module_build_service.scheduler.events import KojiRepoChange from tests import conf, scheduler_init_data @@ -19,7 +20,7 @@ class TestRepoDone: """ scheduler_init_data() from_repo_done_event.return_value = None - msg = module_build_service.messaging.KojiRepoChange( + msg = KojiRepoChange( "no matches for this...", "2016-some-nonexistent-build") module_build_service.scheduler.handlers.repos.done(config=conf, msg=msg) @@ -56,7 +57,7 @@ class TestRepoDone: get_session.return_value = mock.Mock(), "development" build_fn.return_value = 1234, 1, "", None - msg = module_build_service.messaging.KojiRepoChange( + msg = KojiRepoChange( "some_msg_id", "module-testmodule-master-20170109091357-7c29193d-build") module_build_service.scheduler.handlers.repos.done(config=conf, msg=msg) build_fn.assert_called_once_with( @@ -116,7 +117,7 @@ class TestRepoDone: finalizer.side_effect = mocked_finalizer - msg = module_build_service.messaging.KojiRepoChange( + msg = KojiRepoChange( "some_msg_id", "module-testmodule-master-20170109091357-7c29193d-build") module_build_service.scheduler.handlers.repos.done(config=conf, msg=msg) @@ -156,7 +157,7 @@ class TestRepoDone: config.return_value = mock.Mock(), "development" build_fn.return_value = None, 4, "Failed to submit artifact tangerine to Koji", None - msg = module_build_service.messaging.KojiRepoChange( + msg = KojiRepoChange( "some_msg_id", "module-testmodule-master-20170109091357-7c29193d-build") module_build_service.scheduler.handlers.repos.done(config=conf, msg=msg) build_fn.assert_called_once_with( @@ -182,7 +183,7 @@ class TestRepoDone: component_build.tagged = False db_session.commit() - msg = module_build_service.messaging.KojiRepoChange( + msg = KojiRepoChange( "some_msg_id", "module-testmodule-master-20170109091357-7c29193d-build") module_build_service.scheduler.handlers.repos.done(config=conf, msg=msg) @@ -222,7 +223,7 @@ class TestRepoDone: config.return_value = mock.Mock(), "development" build_fn.return_value = None, 4, "Failed to submit artifact x to Koji", None - msg = module_build_service.messaging.KojiRepoChange( + msg = KojiRepoChange( "some_msg_id", "module-testmodule-master-20170109091357-7c29193d-build") module_build_service.scheduler.handlers.repos.done(config=conf, msg=msg) diff --git a/tests/test_scheduler/test_tag_tagged.py b/tests/test_scheduler/test_tag_tagged.py index 00517820..1f9505d7 100644 --- a/tests/test_scheduler/test_tag_tagged.py +++ b/tests/test_scheduler/test_tag_tagged.py @@ -9,6 +9,8 @@ import module_build_service.messaging import module_build_service.scheduler.handlers.repos import module_build_service.scheduler.handlers.tags import module_build_service.models + +from module_build_service.scheduler.events import KojiTagChange from tests import conf from module_build_service.db_session import db_session @@ -24,7 +26,7 @@ class TestTagTagged: that we do nothing gracefully. """ from_tag_change_event.return_value = None - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "no matches for this...", "2016-some-nonexistent-build", "artifact", "artifact-1.2-1") module_build_service.scheduler.handlers.tags.tagged( config=conf, msg=msg) @@ -33,7 +35,7 @@ class TestTagTagged: """ Test that when a tag msg hits us and we have no match, that we do nothing gracefully. """ - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c-build", "artifact", @@ -86,7 +88,7 @@ class TestTagTagged: db_session.commit() # Tag the first component to the buildroot. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c-build", "perl-Tangerine", @@ -96,7 +98,7 @@ class TestTagTagged: config=conf, msg=msg ) # Tag the first component to the final tag. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c", "perl-Tangerine", @@ -111,7 +113,7 @@ class TestTagTagged: assert not koji_session.newRepo.called # Tag the second component to the buildroot. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c-build", "perl-List-Compare", @@ -126,7 +128,7 @@ class TestTagTagged: assert not koji_session.newRepo.called # Tag the first component to the final tag. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c", "perl-List-Compare", @@ -182,7 +184,7 @@ class TestTagTagged: db_session.commit() # Tag the perl-List-Compare component to the buildroot. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c-build", "perl-Tangerine", @@ -191,7 +193,7 @@ class TestTagTagged: module_build_service.scheduler.handlers.tags.tagged( config=conf, msg=msg) # Tag the perl-List-Compare component to final tag. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c", "perl-Tangerine", @@ -252,7 +254,7 @@ class TestTagTagged: db_session.commit() # Tag the perl-List-Compare component to the buildroot. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c-build", "perl-List-Compare", @@ -262,7 +264,7 @@ class TestTagTagged: config=conf, msg=msg ) # Tag the perl-List-Compare component to final tag. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c", "perl-List-Compare", @@ -328,7 +330,7 @@ class TestTagTagged: db_session.commit() # Tag the first component to the buildroot. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c-build", "perl-Tangerine", @@ -337,7 +339,7 @@ class TestTagTagged: module_build_service.scheduler.handlers.tags.tagged( config=conf, msg=msg) # Tag the first component to the final tag. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c", "perl-Tangerine", @@ -351,7 +353,7 @@ class TestTagTagged: assert not koji_session.newRepo.called # Tag the second component to the buildroot. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c-build", "perl-List-Compare", @@ -360,7 +362,7 @@ class TestTagTagged: module_build_service.scheduler.handlers.tags.tagged( config=conf, msg=msg) # Tag the second component to final tag. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c", "perl-List-Compare", @@ -374,7 +376,7 @@ class TestTagTagged: assert not koji_session.newRepo.called # Tag the component from first batch to final tag. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c", "module-build-macros", @@ -383,7 +385,7 @@ class TestTagTagged: module_build_service.scheduler.handlers.tags.tagged( config=conf, msg=msg) # Tag the component from first batch to the buildroot. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c-build", "module-build-macros", @@ -455,7 +457,7 @@ class TestTagTagged: db_session.commit() # Tag the perl-Tangerine component to the buildroot. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c-build", "perl-Tangerine", @@ -465,7 +467,7 @@ class TestTagTagged: config=conf, msg=msg) assert not koji_session.newRepo.called # Tag the perl-List-Compare component to the buildroot. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c-build", "perl-List-Compare", @@ -474,7 +476,7 @@ class TestTagTagged: module_build_service.scheduler.handlers.tags.tagged( config=conf, msg=msg) # Tag the perl-List-Compare component to final tag. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c", "perl-List-Compare", @@ -554,7 +556,7 @@ class TestTagTagged: db_session.commit() # Tag the first component to the buildroot. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c-build", "perl-Tangerine", @@ -563,7 +565,7 @@ class TestTagTagged: module_build_service.scheduler.handlers.tags.tagged( config=conf, msg=msg) # Tag the first component to the final tag. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c", "perl-Tangerine", @@ -572,7 +574,7 @@ class TestTagTagged: module_build_service.scheduler.handlers.tags.tagged( config=conf, msg=msg) # Tag the second component to the buildroot. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c-build", "perl-List-Compare", @@ -581,7 +583,7 @@ class TestTagTagged: module_build_service.scheduler.handlers.tags.tagged( config=conf, msg=msg) # Tag the second component to the final tag. - msg = module_build_service.messaging.KojiTagChange( + msg = KojiTagChange( "id", "module-testmodule-master-20170219191323-c40c156c", "perl-List-Compare", diff --git a/tests/test_utils/test_utils.py b/tests/test_utils/test_utils.py index 984fef61..6fe65102 100644 --- a/tests/test_utils/test_utils.py +++ b/tests/test_utils/test_utils.py @@ -17,6 +17,7 @@ from module_build_service.errors import ProgrammingError, ValidationError, Unpro from module_build_service.utils.reuse import get_reusable_module, get_reusable_component from module_build_service.utils.general import load_mmd from module_build_service.utils.submit import format_mmd +from module_build_service.scheduler.events import KojiBuildChange, KojiRepoChange from tests import ( clean_database, init_data, @@ -1228,7 +1229,7 @@ class TestBatches: # to BUILDING, so KojiBuildChange message handler handles the change # properly. for msg in further_work: - if type(msg) == module_build_service.messaging.KojiBuildChange: + if type(msg) == KojiBuildChange: assert msg.build_new_state == koji.BUILD_STATES["COMPLETE"] component_build = models.ComponentBuild.from_component_event(db_session, msg) assert component_build.state == koji.BUILD_STATES["BUILDING"] @@ -1236,12 +1237,12 @@ class TestBatches: # When we handle these KojiBuildChange messages, MBS should tag all # the components just once. for msg in further_work: - if type(msg) == module_build_service.messaging.KojiBuildChange: + if type(msg) == KojiBuildChange: module_build_service.scheduler.handlers.components.complete(conf, msg) # Since we have reused all the components in the batch, there should # be fake KojiRepoChange message. - assert type(further_work[-1]) == module_build_service.messaging.KojiRepoChange + assert type(further_work[-1]) == KojiRepoChange # Check that packages have been tagged just once. assert len(DummyModuleBuilder.TAGGED_COMPONENTS) == 2