Skip to content

Commit

Permalink
Implement simple support for delegates with `colour.utilities.Delegat…
Browse files Browse the repository at this point in the history
…e` class.
  • Loading branch information
KelSolaar committed Dec 29, 2024
1 parent 8aec135 commit 29ab3d2
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 1 deletion.
4 changes: 4 additions & 0 deletions colour/utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,12 @@
index_along_last_axis,
format_array_as_row,
)
from .delegate import Delegate
from .metrics import metric_mse, metric_psnr
from .network import (
TreeNode,
Port,
notify_process_state,
PortNode,
PortGraph,
ExecutionPort,
Expand Down Expand Up @@ -278,13 +280,15 @@
"index_along_last_axis",
"format_array_as_row",
]
__all__ += ["Delegate"]
__all__ += [
"metric_mse",
"metric_psnr",
]
__all__ += [
"TreeNode",
"Port",
"notify_process_state",
"PortNode",
"PortGraph",
"ExecutionPort",
Expand Down
73 changes: 73 additions & 0 deletions colour/utilities/delegate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
Delegate - Event Notifications
==============================
Define a delegate class for event notifications:
- :class:`colour.utilities.Delegate`
"""

from __future__ import annotations

import typing

if typing.TYPE_CHECKING:
from colour.hints import Any, Callable, List

__author__ = "Colour Developers"
__copyright__ = "Copyright 2013 Colour Developers"
__license__ = "BSD-3-Clause - https://opensource.org/licenses/BSD-3-Clause"
__maintainer__ = "Colour Developers"
__email__ = "[email protected]"
__status__ = "Production"

__all__ = ["Delegate"]


class Delegate:
"""
Define a delegate allowing listeners to register and be notified of events.
Methods
-------
- :meth:`~colour.utilities.Delegate.add_listener`
- :meth:`~colour.utilities.Delegate.remove_listener`
- :meth:`~colour.utilities.Delegate.notify`
"""

def __init__(self) -> None:
self._listeners: List = []

def add_listener(self, listener: Callable) -> None:
"""
Add the given listener to the delegate.
Parameters
----------
listener
Callable listening to the delegate notifications.
"""

if listener not in self._listeners:
self._listeners.append(listener)

def remove_listener(self, listener: Callable) -> None:
"""
Remove the given listener from the delegate.
Parameters
----------
listener
Callable listening to the delegate notifications.
"""

if listener in self._listeners:
self._listeners.remove(listener)

def notify(self, *args: Any, **kwargs: Any) -> None:
"""
Notify the delegate listeners.
"""

for listener in self._listeners:
listener(*args, **kwargs)
46 changes: 45 additions & 1 deletion colour/utilities/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import atexit
import concurrent.futures
import functools
import multiprocessing
import os
import threading
Expand All @@ -38,6 +39,7 @@
if typing.TYPE_CHECKING:
from colour.hints import (
Any,
Callable,
Dict,
Generator,
List,
Expand All @@ -47,7 +49,7 @@
Type,
)

from colour.utilities import MixinLogging, attest, optional, required
from colour.utilities import Delegate, MixinLogging, attest, optional, required

__author__ = "Colour Developers"
__copyright__ = "Copyright 2013 Colour Developers"
Expand All @@ -59,6 +61,7 @@
__all__ = [
"TreeNode",
"Port",
"notify_process_state",
"PortNode",
"ControlFlowNode",
"PortGraph",
Expand Down Expand Up @@ -926,6 +929,33 @@ def to_graphviz(self) -> str:
return f"<{self._name}> {self.name}"


def notify_process_state(function: Callable) -> Any:
"""
Define a decorator to notify about the process state.
Parameters
----------
function
Function to decorate.
"""

@functools.wraps(function)
def wrapper(*args: Any, **kwargs: Any) -> Any:
self = next(iter(args)) if args else None

if self is not None and hasattr(self, "on_process_start"):
self.on_process_start.notify(self)

result = function(*args, **kwargs)

if self is not None and hasattr(self, "on_process_end"):
self.on_process_end.notify(self)

return result

return wrapper


class PortNode(TreeNode, MixinLogging):
"""
Define a node with support for input and output ports.
Expand All @@ -942,6 +972,10 @@ class PortNode(TreeNode, MixinLogging):
- :attr:`~colour.utilities.PortNode.dirty`
- :attr:`~colour.utilities.PortNode.edges`
- :attr:`~colour.utilities.PortNode.description`
- :attr:`~colour.utilities.PortNode.on_connected`
- :attr:`~colour.utilities.PortNode.on_disconnected`
- :attr:`~colour.utilities.PortNode.on_process_start`
- :attr:`~colour.utilities.PortNode.on_process_end`
Methods
-------
Expand Down Expand Up @@ -997,6 +1031,11 @@ def __init__(self, name: str | None = None, description: str = "") -> None:
self._output_ports = {}
self._dirty = True

self.on_connected: Delegate = Delegate()
self.on_disconnected: Delegate = Delegate()
self.on_process_start: Delegate = Delegate()
self.on_process_end: Delegate = Delegate()

@property
def input_ports(self) -> Dict[str, Port]:
"""
Expand Down Expand Up @@ -1435,6 +1474,8 @@ def connect(

port_source.connect(port_target)

self.on_connected.notify((self, source_port, target_node, target_port))

def disconnect(
self,
source_port: str,
Expand Down Expand Up @@ -1480,6 +1521,9 @@ def disconnect(

port_source.disconnect(port_target)

self.on_disconnected.notify((self, source_port, target_node, target_port))

@notify_process_state
def process(self) -> None:
"""
Process the node, must be reimplemented by sub-classes.
Expand Down
57 changes: 57 additions & 0 deletions colour/utilities/tests/test_delegate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Define the unit tests for the :mod:`colour.utilities.delegate` module."""

from __future__ import annotations

from colour.utilities import (
Delegate,
)

__author__ = "Colour Developers"
__copyright__ = "Copyright 2013 Colour Developers"
__license__ = "BSD-3-Clause - https://opensource.org/licenses/BSD-3-Clause"
__maintainer__ = "Colour Developers"
__email__ = "[email protected]"
__status__ = "Production"

__all__ = [
"TestDelegate",
]


class TestDelegate:
"""
Define :class:`colour.utilities.structures.Delegate` class unit tests
methods.
"""

def test_required_methods(self) -> None:
"""Test the presence of required methods."""

required_methods = ("add_listener", "remove_listener", "notify")

for method in required_methods:
assert method in dir(Delegate)

def test_Delegate(self) -> None:
"""Test the :class:`colour.utilities.structures.Delegate` class."""

delegate = Delegate()

data = []

def _listener(a: int) -> None:
"""Define a unit tests listener."""

data.append(a)

delegate.add_listener(_listener)

delegate.notify("Foo")

assert data == ["Foo"]

delegate.remove_listener(_listener)

delegate.notify("Bar")

assert data == ["Foo"]
48 changes: 48 additions & 0 deletions colour/utilities/tests/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from colour.utilities.network import (
ProcessPoolExecutorManager,
ThreadPoolExecutorManager,
notify_process_state,
)

__author__ = "Colour Developers"
Expand Down Expand Up @@ -452,6 +453,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.add_input_port("b")
self.add_output_port("output")

@notify_process_state
def process(self) -> None:
a = self.get_input("a")
b = self.get_input("b")
Expand All @@ -474,6 +476,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.add_input_port("b")
self.add_output_port("output")

@notify_process_state
def process(self) -> None:
a = self.get_input("a")
b = self.get_input("b")
Expand All @@ -486,6 +489,42 @@ def process(self) -> None:
self.dirty = False


class TestNotifyProcessState:
"""
Define :func:`colour.utilities.network.notify_process_state` definition
unit tests methods.
"""

def test_notify_process_state(self) -> None:
"""
Test :func:`colour.utilities.network.notify_process_state` definition.
"""

data = []

def _listener_on_process_start_(self: _NodeAdd) -> None: # noqa: ARG001
"""Define a unit tests listener."""

data.append("Foo")

def _listener_on_process_end(self: _NodeAdd) -> None: # noqa: ARG001
"""Define a unit tests listener."""

data.append("Bar")

add = _NodeAdd()
add.on_process_start.add_listener(_listener_on_process_start_)
add.on_process_end.add_listener(_listener_on_process_end)

add.set_input("a", 1)
add.set_input("b", 1)

add.process()

assert add.get_output("output") == 2
assert data == ["Foo", "Bar"]


class TestPortNode:
"""
Define :class:`colour.utilities.network.PortNode` class unit tests methods.
Expand All @@ -509,6 +548,10 @@ def test_required_attributes(self) -> None:
"dirty",
"edges",
"description",
"on_connected",
"on_disconnected",
"on_process_start",
"on_process_end",
)

for attribute in required_attributes:
Expand Down Expand Up @@ -903,6 +946,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.add_input_port("value")
self.add_input_port("mapping", {})

@notify_process_state
def process(self) -> None:
"""
Process the node.
Expand All @@ -928,6 +972,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.add_input_port("mapping", {})
self.add_output_port("summation")

@notify_process_state
def process(self) -> None:
mapping = self.get_input("mapping")
if len(mapping) == 0:
Expand Down Expand Up @@ -990,6 +1035,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.connect("input", self.nodes["Add Item"], "key")
self.nodes["Add Item"].connect("mapping", self, "output")

@notify_process_state
def process(self, **kwargs: Any) -> None:
self.nodes["Add 1"].set_input("a", 1)
self.nodes["Multiply 1"].set_input("b", 2)
Expand Down Expand Up @@ -1030,6 +1076,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.add_input_port("array", [])
self.add_output_port("summation")

@notify_process_state
def process(self) -> None:
array = self.get_input("array")
if len(array) == 0:
Expand Down Expand Up @@ -1082,6 +1129,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.connect("input", self.nodes["Add 1"], "b")
self.nodes["Add 2"].connect("output", self, "output")

@notify_process_state
def process(self, **kwargs: Any) -> None:
self.nodes["Add 1"].set_input("a", 1)
self.nodes["Multiply 1"].set_input("b", 2)
Expand Down
Loading

0 comments on commit 29ab3d2

Please sign in to comment.