mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-02-03 05:03:43 +08:00
Use concurrent.futures instead of our own ThreadPool implementation
This commit is contained in:
@@ -13,6 +13,7 @@ RUN dnf install -y \
|
||||
python-mock \
|
||||
python-pip \
|
||||
python-qpid \
|
||||
python-futures \
|
||||
python2-cffi \
|
||||
python2-cryptography \
|
||||
python2-pdc-client \
|
||||
|
||||
1
Vagrantfile
vendored
1
Vagrantfile
vendored
@@ -25,6 +25,7 @@ $script = <<SCRIPT
|
||||
python-mock \
|
||||
python-qpid \
|
||||
python-virtualenv \
|
||||
python-futures \
|
||||
redhat-rpm-config \
|
||||
redhat-rpm-config \
|
||||
rpm-build \
|
||||
|
||||
@@ -41,57 +41,8 @@ from module_build_service import conf, db
|
||||
from module_build_service.errors import (Unauthorized, Conflict)
|
||||
import module_build_service.messaging
|
||||
from multiprocessing.dummy import Pool as ThreadPool
|
||||
from six.moves.queue import Queue
|
||||
from threading import Thread
|
||||
|
||||
class Worker(Thread):
|
||||
""" Thread executing tasks from a given tasks queue """
|
||||
def __init__(self, tasks):
|
||||
Thread.__init__(self)
|
||||
self.tasks = tasks
|
||||
self.daemon = True
|
||||
self.start()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
task_data = self.tasks.get()
|
||||
if task_data == StopIteration:
|
||||
self.tasks.task_done()
|
||||
break
|
||||
|
||||
func, args, kargs = task_data
|
||||
try:
|
||||
func(*args, **kargs)
|
||||
except Exception as e:
|
||||
# An exception happened in this thread
|
||||
log.exception("An exception happened in a worker thread.")
|
||||
finally:
|
||||
# Mark this task as done, whether an exception happened or not
|
||||
self.tasks.task_done()
|
||||
|
||||
|
||||
class QueueBasedThreadPool:
|
||||
""" Pool of threads consuming tasks from a queue. """
|
||||
def __init__(self, num_threads):
|
||||
self.num_threads = num_threads
|
||||
self.tasks = Queue(num_threads)
|
||||
for _ in range(num_threads):
|
||||
Worker(self.tasks)
|
||||
|
||||
def add_task(self, func, *args, **kargs):
|
||||
""" Add a task to the queue """
|
||||
self.tasks.put((func, args, kargs))
|
||||
|
||||
def map(self, func, args_list):
|
||||
""" Add a list of tasks to the queue """
|
||||
for args in args_list:
|
||||
self.add_task(func, args)
|
||||
|
||||
def wait_completion(self):
|
||||
""" Wait for completion of all the tasks in the queue """
|
||||
for _ in range(self.num_threads):
|
||||
self.tasks.put(StopIteration)
|
||||
self.tasks.join()
|
||||
import concurrent.futures
|
||||
|
||||
def retry(timeout=conf.net_timeout, interval=conf.net_retry_interval, wait_on=Exception):
|
||||
""" A decorator that allows to retry a section of code...
|
||||
@@ -209,9 +160,9 @@ def start_build_batch(config, module, session, builder, components=None):
|
||||
return
|
||||
|
||||
# Start build of components in this batch.
|
||||
pool = QueueBasedThreadPool(config.num_consecutive_builds)
|
||||
pool.map(start_build_component, unbuilt_components)
|
||||
pool.wait_completion()
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=config.num_consecutive_builds) as executor:
|
||||
futures = {executor.submit(start_build_component, c): c for c in unbuilt_components}
|
||||
concurrent.futures.wait(futures)
|
||||
|
||||
further_work = []
|
||||
|
||||
|
||||
@@ -19,3 +19,4 @@ python-fedora
|
||||
qpid-python
|
||||
six
|
||||
sqlalchemy
|
||||
futures # Python 2 only
|
||||
|
||||
Reference in New Issue
Block a user