mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-02-03 05:03:43 +08:00
Use StopIteration in QueueBasedThreadPool to stop the threads.
This commit is contained in:
@@ -54,12 +54,17 @@ class Worker(Thread):
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
func, args, kargs = self.tasks.get()
|
||||
task_data = self.tasks.get()
|
||||
if task_data == StopIteration:
|
||||
self.tasks.task_done()
|
||||
break
|
||||
|
||||
func, args, kargs = task_data
|
||||
try:
|
||||
func(*args, **kargs)
|
||||
except Exception as e:
|
||||
# An exception happened in this thread
|
||||
log.error(str(e))
|
||||
log.exception("An exception happened in a worker thread.")
|
||||
finally:
|
||||
# Mark this task as done, whether an exception happened or not
|
||||
self.tasks.task_done()
|
||||
@@ -68,6 +73,7 @@ class Worker(Thread):
|
||||
class QueueBasedThreadPool:
|
||||
""" Pool of threads consuming tasks from a queue. """
|
||||
def __init__(self, num_threads):
|
||||
self.num_threads = num_threads
|
||||
self.tasks = Queue(num_threads)
|
||||
for _ in range(num_threads):
|
||||
Worker(self.tasks)
|
||||
@@ -83,6 +89,8 @@ class QueueBasedThreadPool:
|
||||
|
||||
def wait_completion(self):
|
||||
""" Wait for completion of all the tasks in the queue """
|
||||
for _ in range(self.num_threads):
|
||||
self.tasks.put(StopIteration)
|
||||
self.tasks.join()
|
||||
|
||||
def retry(timeout=conf.net_timeout, interval=conf.net_retry_interval, wait_on=Exception):
|
||||
|
||||
Reference in New Issue
Block a user