threadPool.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. from queue import Queue
  2. from threading import Thread
  3. from joblib import Parallel, delayed
  4. import multiprocessing as mb
  5. import traceback
  6. def createParallelPool (func, inputs):
  7. result = Parallel(mb.cpu_count()/2)
  8. for i in inputs:
  9. (task, workload) = i
  10. result(delayed(func)(task, workload))
  11. return result
  12. class Worker (Thread):
  13. """Thread executing tasks from a given tasks queue"""
  14. def __init__ (self, tasks):
  15. Thread.__init__(self)
  16. self.tasks = tasks
  17. self.daemon = True
  18. self.start()
  19. def run (self):
  20. while True:
  21. func, args, kargs = self.tasks.get()
  22. try: func(*args, **kargs)
  23. except Exception as ex:
  24. #traceback.print_exception(type(ex), ex, ex.__traceback__)
  25. print(type(ex), ex)
  26. self.tasks.task_done()
  27. class ThreadPool:
  28. """Pool of threads consuming tasks from a queue"""
  29. def __init__ (self, num_threads):
  30. self.tasks = Queue(num_threads)
  31. for _ in range(num_threads): Worker(self.tasks)
  32. def add_task (self, func, *args, **kargs):
  33. """Add a task to the queue"""
  34. self.tasks.put((func, args, kargs))
  35. def wait_completion (self):
  36. """Wait for completion of all the tasks in the queue"""
  37. self.tasks.join()