From 0c2d3a0009a2e379dff5ea73b9ccabc7c88287e4 Mon Sep 17 00:00:00 2001 From: Thomas Mansencal Date: Tue, 24 Dec 2024 01:54:44 +0000 Subject: [PATCH] Add `colour.utilities.network.ThreadPoolExecutorManager` and `colour.utilities.network.ProcessPooThreadPoolExecutorManager` singleton classes. --- colour/utilities/network.py | 177 +++++++++++++++++++++---- colour/utilities/tests/test_network.py | 34 +++++ 2 files changed, 186 insertions(+), 25 deletions(-) diff --git a/colour/utilities/network.py b/colour/utilities/network.py index 733e00f61..b1ee47e24 100644 --- a/colour/utilities/network.py +++ b/colour/utilities/network.py @@ -28,6 +28,7 @@ from __future__ import annotations +import atexit import concurrent.futures import multiprocessing import os @@ -65,7 +66,9 @@ "ExecutionNode", "ControlFlowNode", "For", + "ThreadPoolExecutorManager", "ParallelForThread", + "ProcessPoolExecutorManager", "ParallelForMultiprocess", ] @@ -2140,6 +2143,65 @@ def _task_thread(args: Sequence) -> tuple[int, Any]: return i, sub_graph.get_output("output") +class ThreadPoolExecutorManager: + """ + Define a singleton class managing our + :class:`concurrent.futures.ThreadPoolExecutor` class instance. + + Attributes + ---------- + - :attr:`~colour.utilities.ThreadPoolExecutorManager.ThreadPoolExecutor` + + Methods + ------- + - :meth:`~colour.utilities.ThreadPoolExecutorManager.get_executor` + - :meth:`~colour.utilities.ThreadPoolExecutorManager.shutdown_executor` + """ + + ThreadPoolExecutor: concurrent.futures.ThreadPoolExecutor | None = None + + @staticmethod + def get_executor( + max_workers: int | None = None, + ) -> concurrent.futures.ThreadPoolExecutor | None: + """ + Return the :class:`concurrent.futures.ThreadPoolExecutor` class instance or + create it if not existing. + + Parameters + ---------- + max_workers + Maximum worker count. + + Returns + ------- + :class:`concurrent.futures.ThreadPoolExecutor` + + Notes + ----- + The :class:`concurrent.futures.ThreadPoolExecutor` class instance is + automatically shutdown on process exit. + """ + + if ThreadPoolExecutorManager.ThreadPoolExecutor is None: + ThreadPoolExecutorManager.ThreadPoolExecutor = ( + concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) + ) + + return ThreadPoolExecutorManager.ThreadPoolExecutor + + @atexit.register + @staticmethod + def shutdown_executor() -> None: + """ + Shutdown the :class:`concurrent.futures.ThreadPoolExecutor` class instance. + """ + + if ThreadPoolExecutorManager.ThreadPoolExecutor is not None: + ThreadPoolExecutorManager.ThreadPoolExecutor.shutdown(wait=True) + ThreadPoolExecutorManager.ThreadPoolExecutor = None + + class ParallelForThread(ControlFlowNode): """ Define an advanced ``for`` loop node distributing the work across multiple @@ -2198,18 +2260,20 @@ def process(self) -> None: self.log(f'Processing "{node}" node...') results = {} - with concurrent.futures.ThreadPoolExecutor( - max_workers=self.get_input("workers") - ) as executor: - futures = [ - executor.submit(self.get_input("task"), (i, element, node, self)) - for i, element in enumerate(self.get_input("array")) - ] - - for future in concurrent.futures.as_completed(futures): - index, element = future.result() - self.log(f'Processed "{element}" element with index "{index}".') - results[index] = element + thread_pool_executor = ThreadPoolExecutorManager.get_executor( + max_workers=self.get_input("processes") + ) + futures = [ + thread_pool_executor.submit( + self.get_input("task"), (i, element, node, self) + ) + for i, element in enumerate(self.get_input("array")) + ] + + for future in concurrent.futures.as_completed(futures): + index, element = future.result() + self.log(f'Processed "{element}" element with index "{index}".') + results[index] = element results = dict(sorted(results.items())) self.set_output("results", list(results.values())) @@ -2253,6 +2317,68 @@ def _task_multiprocess(args: Sequence) -> tuple[int, Any]: return i, sub_graph.get_output("output") +class ProcessPoolExecutorManager: + """ + Define a singleton class managing our + :class:`concurrent.futures.ProcessPoolExecutor` class instance. + + Attributes + ---------- + - :attr:`~colour.utilities.ProcessPoolExecutorManager.ProcessPoolExecutor` + + Methods + ------- + - :meth:`~colour.utilities.ProcessPoolExecutorManager.get_executor` + - :meth:`~colour.utilities.ProcessPoolExecutorManager.shutdown_executor` + """ + + ProcessPoolExecutor: concurrent.futures.ProcessPoolExecutor | None = None + + @staticmethod + def get_executor( + max_workers: int | None = None, + ) -> concurrent.futures.ProcessPoolExecutor | None: + """ + Return the :class:`concurrent.futures.ProcessPoolExecutor` class instance or + create it if not existing. + + Parameters + ---------- + max_workers + Maximum worker count. + + Returns + ------- + :class:`concurrent.futures.ProcessPoolExecutor` + + Notes + ----- + The :class:`concurrent.futures.ProcessPoolExecutor` class instance is + automatically shutdown on process exit. + """ + + if ProcessPoolExecutorManager.ProcessPoolExecutor is None: + context = multiprocessing.get_context("spawn") + ProcessPoolExecutorManager.ProcessPoolExecutor = ( + concurrent.futures.ProcessPoolExecutor( + mp_context=context, max_workers=max_workers + ) + ) + + return ProcessPoolExecutorManager.ProcessPoolExecutor + + @atexit.register + @staticmethod + def shutdown_executor() -> None: + """ + Shutdown the :class:`concurrent.futures.ProcessPoolExecutor` class instance. + """ + + if ProcessPoolExecutorManager.ProcessPoolExecutor is not None: + ProcessPoolExecutorManager.ProcessPoolExecutor.shutdown(wait=True) + ProcessPoolExecutorManager.ProcessPoolExecutor = None + + class ParallelForMultiprocess(ControlFlowNode): """ Define an advanced ``for`` loop node distributing the work across multiple @@ -2305,19 +2431,20 @@ def process(self) -> None: self.log(f'Processing "{node}" node...') results = {} - context = multiprocessing.get_context("spawn") - with concurrent.futures.ProcessPoolExecutor( - mp_context=context, max_workers=self.get_input("processes") - ) as executor: - futures = [ - executor.submit(self.get_input("task"), (i, element, node, self)) - for i, element in enumerate(self.get_input("array")) - ] - - for future in concurrent.futures.as_completed(futures): - index, element = future.result() - self.log(f'Processed "{element}" element with index "{index}".') - results[index] = element + process_pool_executor = ProcessPoolExecutorManager.get_executor( + max_workers=self.get_input("processes") + ) + futures = [ + process_pool_executor.submit( + self.get_input("task"), (i, element, node, self) + ) + for i, element in enumerate(self.get_input("array")) + ] + + for future in concurrent.futures.as_completed(futures): + index, element = future.result() + self.log(f'Processed "{element}" element with index "{index}".') + results[index] = element results = dict(sorted(results.items())) self.set_output("results", list(results.values())) diff --git a/colour/utilities/tests/test_network.py b/colour/utilities/tests/test_network.py index a6e5116d2..be6ac0b4c 100644 --- a/colour/utilities/tests/test_network.py +++ b/colour/utilities/tests/test_network.py @@ -21,6 +21,10 @@ TreeNode, is_pydot_installed, ) +from colour.utilities.network import ( + ProcessPoolExecutorManager, + ThreadPoolExecutorManager, +) __author__ = "Colour Developers" __copyright__ = "Copyright 2013 Colour Developers" @@ -35,7 +39,9 @@ "TestPortNode", "TestPortGraph", "TestFor", + "TestThreadPoolExecutorManager", "TestParallelForThread", + "TestProcessPoolExecutorManager", "TestParallelForMultiProcess", ] @@ -1084,6 +1090,20 @@ def process(self, **kwargs: Any) -> None: super().process(**kwargs) +class TestThreadPoolExecutorManager: + """ + Define :class:`colour.utilities.network.ThreadPoolExecutorManager` class unit tests + methods. + """ + + def test_ThreadPoolExecutorManager(self) -> None: + """Test :class:`colour.utilities.network.ThreadPoolExecutorManager` class.""" + + executor = ThreadPoolExecutorManager.get_executor() + + assert executor is not None + + class TestParallelForThread: """ Define :class:`colour.utilities.network.ParallelForThread` class unit tests @@ -1108,6 +1128,20 @@ def test_ParallelForThread(self) -> None: assert sum_array.get_output("summation") == 140 +class TestProcessPoolExecutorManager: + """ + Define :class:`colour.utilities.network.ProcessPoolExecutorManager` class unit tests + methods. + """ + + def test_ProcessPoolExecutorManager(self) -> None: + """Test :class:`colour.utilities.network.ProcessPoolExecutorManager` class.""" + + executor = ProcessPoolExecutorManager.get_executor() + + assert executor is not None + + class TestParallelForMultiProcess: """ Define :class:`colour.utilities.network.ParallelForMultiProcess` class unit