From aa6db9fccedb8da53d602aa89a1aa6023f865e24 Mon Sep 17 00:00:00 2001 From: Jan Kaluza Date: Mon, 20 Feb 2017 10:42:42 +0100 Subject: [PATCH] Use StopIteration in QueueBasedThreadPool to stop the threads. --- module_build_service/utils.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/module_build_service/utils.py b/module_build_service/utils.py index 97ea7600..8b7c5a04 100644 --- a/module_build_service/utils.py +++ b/module_build_service/utils.py @@ -54,12 +54,17 @@ class Worker(Thread): def run(self): while True: - func, args, kargs = self.tasks.get() + task_data = self.tasks.get() + if task_data == StopIteration: + self.tasks.task_done() + break + + func, args, kargs = task_data try: func(*args, **kargs) except Exception as e: # An exception happened in this thread - log.error(str(e)) + log.exception("An exception happened in a worker thread.") finally: # Mark this task as done, whether an exception happened or not self.tasks.task_done() @@ -68,6 +73,7 @@ class Worker(Thread): class QueueBasedThreadPool: """ Pool of threads consuming tasks from a queue. """ def __init__(self, num_threads): + self.num_threads = num_threads self.tasks = Queue(num_threads) for _ in range(num_threads): Worker(self.tasks) @@ -83,6 +89,8 @@ class QueueBasedThreadPool: def wait_completion(self): """ Wait for completion of all the tasks in the queue """ + for _ in range(self.num_threads): + self.tasks.put(StopIteration) self.tasks.join() def retry(timeout=conf.net_timeout, interval=conf.net_retry_interval, wait_on=Exception):