12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- from queue import Queue
- from threading import Thread
- from joblib import Parallel, delayed
- import multiprocessing as mb
- import traceback
- def createParallelPool (func, inputs):
- result = Parallel(mb.cpu_count()/2)
- for i in inputs:
- (task, workload) = i
- result(delayed(func)(task, workload))
- return result
- class Worker (Thread):
- """Thread executing tasks from a given tasks queue"""
- def __init__ (self, tasks):
- Thread.__init__(self)
- self.tasks = tasks
- self.daemon = True
- self.start()
- def run (self):
- while True:
- func, args, kargs = self.tasks.get()
- try: func(*args, **kargs)
- except Exception as ex:
- #traceback.print_exception(type(ex), ex, ex.__traceback__)
- print(type(ex), ex)
- self.tasks.task_done()
- class ThreadPool:
- """Pool of threads consuming tasks from a queue"""
- def __init__ (self, num_threads):
- self.tasks = Queue(num_threads)
- for _ in range(num_threads): Worker(self.tasks)
- def add_task (self, func, *args, **kargs):
- """Add a task to the queue"""
- self.tasks.put((func, args, kargs))
- def wait_completion (self):
- """Wait for completion of all the tasks in the queue"""
- self.tasks.join()
|