Add celery task router

Add route_task function to route celery tasks to different queues.
If we can figure out what the module build is a task ran for by
checking the task arguments, then we route this task to a queue
named:

    "mbs-{}".format(module_build_id % num_workers)

"num_workers" has default value of 1, and can be changed in
backend_config.py. If module build id can't be figured out, task will
be routed to the default queue which is named "mbs-default".

While setting up the workers, the number of workers should match with
"num_workers" in config, and each worker will listen on two queues:

    1. mbs-default
    2. mbs-{number} # for example, the first worker listens on "mbs-0"

By this design, all tasks for a particular module build will be routed
to the same queue and run on the same worker serially.
This commit is contained in:
Qixiang Wan
2019-12-04 18:51:11 +08:00
committed by mprahl
parent e904626fc5
commit ab0b513562
9 changed files with 217 additions and 21 deletions

View File

@@ -670,6 +670,19 @@ class Config(object):
"default": 30,
"desc": "The timeout configuration for dnf operations, in seconds."
},
"num_workers": {"type": int, "default": 1, "desc": "Number of Celery workers"},
"celery_task_always_eager": {
"type": bool,
"default": False,
"desc": "All Celery tasks will be executed locally by blocking until the task returns "
"when this is True",
},
"celery_task_routes": {
"type": list,
"default": ["module_build_service.route.route_task"],
"desc": "A list of Celery routers. When deciding the final destination queue of a "
"Celery task the routers are consulted in order",
},
"celery_worker_prefetch_multiplier": {
"type": int,
"default": 1,

View File

@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: MIT
""" Define the router used to route Celery tasks to queues."""
import inspect
from module_build_service import conf, log, models
from module_build_service.db_session import db_session
from module_build_service.scheduler.handlers.greenwave import get_corresponding_module_build
def route_task(name, args, kwargs, options, task=None, **kw):
"""
Figure out module build id from task args and route task to queue
per the module build id.
Each celery worker will listens on two queues:
1. mbs-default
2. mbs-{number} # where number is "module_build_id % conf.num_workers"
If a task is associated with a module build, route it to the queue
named "mbs-{number}", otherwise, route it to "mbs-default", this is to ensure
tasks for a module build can run on the same worker serially.
"""
queue_name = "mbs-default"
module_build_id = None
num_workers = conf.num_workers
module, handler_name = name.rsplit(".", 1)
handler = getattr(__import__(module, fromlist=[handler_name]), handler_name)
# handlers can be decorated, inspect the original function
while getattr(handler, "__wrapped__", None):
handler = handler.__wrapped__
handler_args = inspect.getargspec(handler).args
def _get_handler_arg(name):
index = handler_args.index(name)
arg_value = kwargs.get(name, None)
if arg_value is None and len(args) > index:
arg_value = args[index]
return arg_value
if "module_build_id" in handler_args:
module_build_id = _get_handler_arg("module_build_id")
# if module_build_id is not found, we may be able to figure it out
# by checking other arguments
if module_build_id is None:
if "task_id" in handler_args:
task_id = _get_handler_arg("task_id")
component_build = models.ComponentBuild.from_component_event(db_session, task_id)
if component_build:
module_build_id = component_build.module_build.id
elif "tag_name" in handler_args:
tag_name = _get_handler_arg("tag_name")
module_build = models.ModuleBuild.get_by_tag(db_session, tag_name)
if module_build:
module_build_id = module_build.id
elif "subject_identifier" in handler_args:
module_build_nvr = _get_handler_arg("subject_identifier")
module_build = get_corresponding_module_build(module_build_nvr)
if module_build is not None:
module_build_id = module_build.id
if module_build_id is not None:
queue_name = "mbs-{}".format(module_build_id % num_workers)
taskinfo = {"name": name, "args": args, "kwargs": kwargs, "options": options, "kw": kw}
log.debug("Routing task '{}' to queue '{}'. Task info:\n{}".format(name, queue_name, taskinfo))
return {"queue": queue_name}

View File

@@ -66,22 +66,20 @@ class Scheduler(sched.scheduler):
scheduler = Scheduler(time.time, delayfunc=lambda x: x)
def mbs_event_handler():
def mbs_event_handler(func):
"""
A decorator for MBS event handlers. It implements common tasks which should otherwise
be repeated in every MBS event handler, for example:
- at the end of handler, call events.scheduler.run().
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
finally:
scheduler.run()
return wrapper
return decorator
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
finally:
scheduler.run()
# save origin function as functools.wraps from python2 doesn't preserve the signature
if not hasattr(wrapper, "__wrapped__"):
wrapper.__wrapped__ = func
return wrapper

View File

@@ -17,7 +17,7 @@ logging.basicConfig(level=logging.DEBUG)
@celery_app.task
@events.mbs_event_handler()
@events.mbs_event_handler
def build_task_finalize(
msg_id, build_id, task_id, build_new_state,
build_name, build_version, build_release,

View File

@@ -33,7 +33,7 @@ def get_corresponding_module_build(nvr):
@celery_app.task
@events.mbs_event_handler()
@events.mbs_event_handler
def decision_update(msg_id, decision_context, subject_identifier, policies_satisfied):
"""Move module build to ready or failed according to Greenwave result

View File

@@ -41,7 +41,7 @@ def get_artifact_from_srpm(srpm_path):
@celery_app.task
@events.mbs_event_handler()
@events.mbs_event_handler
def failed(msg_id, module_build_id, module_build_state):
"""Called whenever a module enters the 'failed' state.
@@ -102,7 +102,7 @@ def failed(msg_id, module_build_id, module_build_state):
@celery_app.task
@events.mbs_event_handler()
@events.mbs_event_handler
def done(msg_id, module_build_id, module_build_state):
"""Called whenever a module enters the 'done' state.
@@ -141,7 +141,7 @@ def done(msg_id, module_build_id, module_build_state):
@celery_app.task
@events.mbs_event_handler()
@events.mbs_event_handler
def init(msg_id, module_build_id, module_build_state):
"""Called whenever a module enters the 'init' state.
@@ -317,7 +317,7 @@ def get_content_generator_build_koji_tag(module_deps):
@celery_app.task
@events.mbs_event_handler()
@events.mbs_event_handler
def wait(msg_id, module_build_id, module_build_state):
""" Called whenever a module enters the 'wait' state.

View File

@@ -14,7 +14,7 @@ logging.basicConfig(level=logging.DEBUG)
@celery_app.task
@events.mbs_event_handler()
@events.mbs_event_handler
def done(msg_id, tag_name):
"""Called whenever koji rebuilds a repo, any repo.

View File

@@ -13,7 +13,7 @@ logging.basicConfig(level=logging.DEBUG)
@celery_app.task
@events.mbs_event_handler()
@events.mbs_event_handler
def tagged(msg_id, tag_name, build_name, build_nvr):
"""Called whenever koji tags a build to tag.

View File

@@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: MIT
import mock
from module_build_service import celery_app, conf
from module_build_service.scheduler.handlers import components, greenwave, modules, repos, tags
from module_build_service.scheduler.producer import fail_lost_builds
from tests import scheduler_init_data
@mock.patch.object(conf, "num_workers", create=True, new=3)
@mock.patch("celery.app.amqp.AMQP.send_task_message")
class TestCeleryRouteTask:
def setup_method(self, test_method):
self.old_task_always_eager = celery_app.conf.get("task_always_eager")
celery_app.conf.update(task_always_eager=False)
def teardown_method(self, test_method):
celery_app.conf.update(task_always_eager=self.old_task_always_eager)
def test_route_modules_init_task(self, send_task_message):
modules.init.delay("fakemsg", 2, 0)
queue = send_task_message.call_args[1].get("queue")
qname = queue.__dict__.get("name")
assert qname == "mbs-2"
def test_route_modules_init_task_call_with_kwargs(self, send_task_message):
kwargs = {
"msg_id": "fakemsg",
"module_build_id": 2,
"module_build_state": 0,
}
modules.init.delay(**kwargs)
queue = send_task_message.call_args[1].get("queue")
qname = queue.__dict__.get("name")
assert qname == "mbs-2"
def test_route_modules_wait_task(self, send_task_message):
modules.wait.delay("fakemsg", 3, 1)
queue = send_task_message.call_args[1].get("queue")
qname = queue.__dict__.get("name")
assert qname == "mbs-0"
def test_route_modules_done_task(self, send_task_message):
modules.done.delay("fakemsg", 22, 3)
queue = send_task_message.call_args[1].get("queue")
qname = queue.__dict__.get("name")
assert qname == "mbs-1"
def test_route_modules_failed_task(self, send_task_message):
modules.failed.delay("fakemsg", 23, 4)
queue = send_task_message.call_args[1].get("queue")
qname = queue.__dict__.get("name")
assert qname == "mbs-2"
def test_route_components_build_task_finalize_task(self, send_task_message):
scheduler_init_data()
components.build_task_finalize.delay(
"fakemsg", 123, 90276228, 1, "perl-Tangerine", "0.23", "1.module+f28+2+814cfa39")
queue = send_task_message.call_args[1].get("queue")
qname = queue.__dict__.get("name")
assert qname == "mbs-2"
def test_route_components_build_task_finalize_task_without_a_module(self, send_task_message):
scheduler_init_data()
components.build_task_finalize.delay(
"fakemsg", 123, 123456, 1, "hostname", "0.1", "1.module+f28+2+814cfa39")
queue = send_task_message.call_args[1].get("queue")
qname = queue.__dict__.get("name")
assert qname == "mbs-default"
def test_route_repos_done_task(self, send_task_message):
scheduler_init_data()
repos.done.delay("fakemsg", "module-testmodule-master-20170109091357-7c29193d-build")
queue = send_task_message.call_args[1].get("queue")
qname = queue.__dict__.get("name")
assert qname == "mbs-2"
def test_route_repos_done_task_without_a_module(self, send_task_message):
scheduler_init_data()
repos.done.delay("fakemsg", "no-module-build-exist")
queue = send_task_message.call_args[1].get("queue")
qname = queue.__dict__.get("name")
assert qname == "mbs-default"
def test_route_tags_tagged_task(self, send_task_message):
scheduler_init_data()
tags.tagged.delay(
"fakemsg", "module-testmodule-master-20170109091357-7c29193d-build",
"perl-Tangerine", "perl-Tangerine-0.23-1.module+f28+2+814cfa39")
queue = send_task_message.call_args[1].get("queue")
qname = queue.__dict__.get("name")
assert qname == "mbs-2"
@mock.patch("koji.ClientSession")
def test_route_greenwave_decision_update_task(self, kojisession, send_task_message):
kojisession.return_value.getBuild.return_value = {
"extra": {"typeinfo": {"module": {"module_build_service_id": 1}}}
}
scheduler_init_data()
greenwave.decision_update.delay(
"fakemsg",
decision_context="test_dec_context",
subject_identifier="module-testmodule-master-20170109091357-7c29193d-build",
policies_satisfied=False
)
queue = send_task_message.call_args[1].get("queue")
qname = queue.__dict__.get("name")
assert qname == "mbs-1"
def test_route_fail_lost_builds_task(self, send_task_message):
fail_lost_builds.delay()
queue = send_task_message.call_args[1].get("queue")
qname = queue.__dict__.get("name")
assert qname == "mbs-default"