Files
fm-orchestrator/module_build_service/route.py
Qixiang Wan ab0b513562 Add celery task router
Add route_task function to route celery tasks to different queues.
If we can figure out what the module build is a task ran for by
checking the task arguments, then we route this task to a queue
named:

    "mbs-{}".format(module_build_id % num_workers)

"num_workers" has default value of 1, and can be changed in
backend_config.py. If module build id can't be figured out, task will
be routed to the default queue which is named "mbs-default".

While setting up the workers, the number of workers should match with
"num_workers" in config, and each worker will listen on two queues:

    1. mbs-default
    2. mbs-{number} # for example, the first worker listens on "mbs-0"

By this design, all tasks for a particular module build will be routed
to the same queue and run on the same worker serially.
2020-03-03 14:48:47 -05:00

70 lines
2.9 KiB
Python

# -*- coding: utf-8 -*-
# SPDX-License-Identifier: MIT
""" Define the router used to route Celery tasks to queues."""
import inspect
from module_build_service import conf, log, models
from module_build_service.db_session import db_session
from module_build_service.scheduler.handlers.greenwave import get_corresponding_module_build
def route_task(name, args, kwargs, options, task=None, **kw):
"""
Figure out module build id from task args and route task to queue
per the module build id.
Each celery worker will listens on two queues:
1. mbs-default
2. mbs-{number} # where number is "module_build_id % conf.num_workers"
If a task is associated with a module build, route it to the queue
named "mbs-{number}", otherwise, route it to "mbs-default", this is to ensure
tasks for a module build can run on the same worker serially.
"""
queue_name = "mbs-default"
module_build_id = None
num_workers = conf.num_workers
module, handler_name = name.rsplit(".", 1)
handler = getattr(__import__(module, fromlist=[handler_name]), handler_name)
# handlers can be decorated, inspect the original function
while getattr(handler, "__wrapped__", None):
handler = handler.__wrapped__
handler_args = inspect.getargspec(handler).args
def _get_handler_arg(name):
index = handler_args.index(name)
arg_value = kwargs.get(name, None)
if arg_value is None and len(args) > index:
arg_value = args[index]
return arg_value
if "module_build_id" in handler_args:
module_build_id = _get_handler_arg("module_build_id")
# if module_build_id is not found, we may be able to figure it out
# by checking other arguments
if module_build_id is None:
if "task_id" in handler_args:
task_id = _get_handler_arg("task_id")
component_build = models.ComponentBuild.from_component_event(db_session, task_id)
if component_build:
module_build_id = component_build.module_build.id
elif "tag_name" in handler_args:
tag_name = _get_handler_arg("tag_name")
module_build = models.ModuleBuild.get_by_tag(db_session, tag_name)
if module_build:
module_build_id = module_build.id
elif "subject_identifier" in handler_args:
module_build_nvr = _get_handler_arg("subject_identifier")
module_build = get_corresponding_module_build(module_build_nvr)
if module_build is not None:
module_build_id = module_build.id
if module_build_id is not None:
queue_name = "mbs-{}".format(module_build_id % num_workers)
taskinfo = {"name": name, "args": args, "kwargs": kwargs, "options": options, "kw": kw}
log.debug("Routing task '{}' to queue '{}'. Task info:\n{}".format(name, queue_name, taskinfo))
return {"queue": queue_name}