From 372671eeef79aed277b4ef21bf67177bf176dedf Mon Sep 17 00:00:00 2001 From: Alexandru Fikl Date: Sun, 2 Oct 2022 22:40:38 +0300 Subject: [PATCH] add read_events and write_events to Array --- pyopencl/array.py | 825 +++++++++++++++++++++++++++++++--------------- 1 file changed, 565 insertions(+), 260 deletions(-) diff --git a/pyopencl/array.py b/pyopencl/array.py index fd0fedc09..83ffc0e94 100644 --- a/pyopencl/array.py +++ b/pyopencl/array.py @@ -29,7 +29,7 @@ """ from dataclasses import dataclass, field -from typing import Any, List, Optional +from typing import Any, List, Optional, Tuple, Union from functools import reduce from warnings import warn @@ -55,6 +55,7 @@ _SVMPointer_or_nothing = () +_MAX_EVENT_CLEAR_COUNT = 4 _COMMON_DTYPE_CACHE = {} @@ -254,7 +255,6 @@ def _splay(device, n, kernel_specific_max_wg_size=None): def elwise_kernel_runner(kernel_getter): """Take a kernel getter of the same signature as the kernel and return a function that invokes that kernel. - Assumes that the zeroth entry in *args* is an :class:`Array`. """ @@ -498,21 +498,44 @@ class Array: .. versionadded:: 2014.1.1 - .. attribute:: events + .. attribute:: write_events + + A list of :class:`pyopencl.Event` instances that the current array + depends on for writes. User code should not modify this list directly, + but should use :meth:`add_write_event` to append and :meth:`finish` to + wait on the events. + + .. attribute:: read_events - A list of :class:`pyopencl.Event` instances that the current content of - this array depends on. User code may read, but should never modify this - list directly. To update this list, instead use the following methods. + A list of :class:`pyopencl.Event` instances that the current array + depends on for reads. User code should not modify this list directly, but + should use :meth:`add_read_event` to append and :meth:`finish` to wait + on the events. - .. automethod:: add_event + .. automethod:: add_write_event + .. automethod:: add_read_event .. automethod:: finish """ __array_priority__ = 100 - def __init__(self, cq, shape, dtype, order="C", allocator=None, - data=None, offset=0, strides=None, events=None, _flags=None, - _fast=False, _size=None, _context=None, _queue=None): + def __init__(self, + cq: Optional[Union["cl.Context", "cl.CommandQueue"]], + shape: Tuple[int, ...], + dtype: "np.dtype", + order: str = "C", + allocator: Optional[Any] = None, + data: Any = None, + offset: int = 0, + strides: Optional[int] = None, + events: Optional[List["cl.Event"]] = None, + write_events: Optional[List["cl.Event"]] = None, + read_events: Optional[List["cl.Event"]] = None, + _flags: Any = None, + _fast: bool = False, + _size: Optional[int] = None, + _context: Optional["cl.Context"] = None, + _queue: Optional["cl.CommandQueue"] = None) -> None: if _fast: # Assumptions, should be disabled if not testing if 0: @@ -629,11 +652,26 @@ def __init__(self, cq, shape, dtype, order="C", allocator=None, raise ValueError("cannot allocate CL buffer with " "negative size") + if events is not None: + warn("Passing 'events' is deprecated and will be removed in 2024. " + "Pass either 'write_events' or 'read_events' explicitly.", + DeprecationWarning, stacklevel=2) + + if write_events is None: + write_events = events + + if write_events is None: + write_events = [] + + if read_events is None: + read_events = [] + self.queue = queue self.shape = shape self.dtype = dtype self.strides = strides - self.events = [] if events is None else events + self.write_events = write_events + self.read_events = read_events self.nbytes = alloc_nbytes self.size = size self.allocator = allocator @@ -668,6 +706,14 @@ def __init__(self, cq, shape, dtype, order="C", allocator=None, "than expected, potentially leading to crashes.", InconsistentOpenCLQueueWarning, stacklevel=2) + @property + def events(self): + warn("Using 'events' is deprecated and will be removed in 2024. Prefer " + "either 'write_events' or 'read_events' depending on the situation.", + DeprecationWarning, stacklevel=2) + + return self.write_events + @property def ndim(self): return len(self.shape) @@ -713,13 +759,15 @@ def _new_with_changes(self, data, offset, shape=None, dtype=None, # share the same events list. if data is None: - events = None + write_events = read_events = None else: - events = self.events + write_events = self.write_events + read_events = self.read_events return self.__class__(None, shape, dtype, allocator=allocator, strides=strides, data=data, offset=offset, - events=events, + write_events=write_events, + read_events=read_events, _fast=fast, _context=self.context, _queue=queue, _size=size) def with_queue(self, queue): @@ -797,11 +845,11 @@ def set(self, ary, queue=None, async_=None, **kwargs): stacklevel=2) if self.size: - event1 = cl.enqueue_copy(queue or self.queue, self.base_data, ary, + evt = cl.enqueue_copy(queue or self.queue, self.base_data, ary, dst_offset=self.offset, is_blocking=not async_) - self.add_event(event1) + self.add_write_event(evt) def _get(self, queue=None, ary=None, async_=None, **kwargs): # {{{ handle 'async' deprecation @@ -834,7 +882,7 @@ def _get(self, queue=None, ary=None, async_=None, **kwargs): if self.shape != ary.shape: warn("get() between arrays of different shape is deprecated " - "and will be removed in PyCUDA 2017.x", + "and will be removed in PyOpenCL 2017.x", DeprecationWarning, stacklevel=2) assert self.flags.forc, "Array in get() must be contiguous" @@ -847,15 +895,15 @@ def _get(self, queue=None, ary=None, async_=None, **kwargs): "to associate one.") if self.size: - event1 = cl.enqueue_copy(queue, ary, self.base_data, + evt = cl.enqueue_copy(queue, ary, self.base_data, src_offset=self.offset, - wait_for=self.events, is_blocking=not async_) + wait_for=self.write_events, is_blocking=not async_) - self.add_event(event1) + self.add_read_event(evt) else: - event1 = None + evt = None - return ary, event1 + return ary, evt def get(self, queue=None, ary=None, async_=None, **kwargs): """Transfer the contents of *self* into *ary* or a newly allocated @@ -865,10 +913,9 @@ def get(self, queue=None, ary=None, async_=None, **kwargs): .. versionchanged:: 2019.1.2 Calling with `async_=True` was deprecated and replaced by - :meth:`get_async`. - The event returned by :meth:`pyopencl.enqueue_copy` is now stored into - :attr:`events` to ensure data is not modified before the copy is - complete. + :meth:`get_async`. The event returned by :meth:`pyopencl.enqueue_copy` + is now stored into :attr:`read_events` to ensure data is not modified + before the copy is complete. .. versionchanged:: 2015.2 @@ -888,16 +935,15 @@ def get(self, queue=None, ary=None, async_=None, **kwargs): "device-to-host transfers", DeprecationWarning, 2) - ary, event1 = self._get(queue=queue, ary=ary, async_=async_, **kwargs) + ary, _ = self._get(queue=queue, ary=ary, async_=async_, **kwargs) return ary def get_async(self, queue=None, ary=None, **kwargs): """ Asynchronous version of :meth:`get` which returns a tuple ``(ary, event)`` - containing the host array `ary` - and the :class:`pyopencl.NannyEvent` `event` returned by - :meth:`pyopencl.enqueue_copy`. + containing the host array `ary` and the :class:`pyopencl.NannyEvent` + event returned by :meth:`pyopencl.enqueue_copy`. .. versionadded:: 2019.1.2 """ @@ -928,11 +974,13 @@ def copy(self, queue=_copy_queue): raise RuntimeError("cannot copy non-contiguous array") if self.nbytes: - event1 = cl.enqueue_copy(queue or self.queue, + evt = cl.enqueue_copy(queue or self.queue, result.base_data, self.base_data, src_offset=self.offset, byte_count=self.nbytes, - wait_for=self.events) - result.add_event(event1) + wait_for=self.write_events) + + self.add_read_event(evt) + result.add_write_event(evt) return result @@ -946,12 +994,11 @@ def __str__(self): def __repr__(self): if self.queue is None: return (f"") + f"at {id(self):x} without queue, call with_queue()>") result = repr(self.get()) if result[:5] == "array": - result = f"cl.{type(self).__name__}" + result[5:] + result = f"cl.{type(self).__name__}{result[5:]}" else: warn(f"{type(result).__name__}.__repr__ was expected to return a " f"string starting with 'array', got '{result[:10]!r}'") @@ -1044,7 +1091,7 @@ def _abs(result, arg): elif arg.dtype.kind in ["u", "i"]: fname = "abs" else: - raise TypeError("unsupported dtype in _abs()") + raise TypeError(f"unsupported dtype in 'abs': {arg.dtype!r}") return elementwise.get_unary_func_kernel( arg.context, fname, arg.dtype, out_dtype=result.dtype) @@ -1166,24 +1213,30 @@ def mul_add(self, selffac, other, otherfac, queue=None): """Return `selffac * self + otherfac*other`. """ queue = queue or self.queue + assert np.isscalar(selffac) and np.isscalar(otherfac) if isinstance(other, Array): result = _get_broadcasted_binary_op_result(self, other, queue) - result.add_event( - self._axpbyz( - result, selffac, self, otherfac, other, - queue=queue)) + evt = self._axpbyz(result, selffac, self, otherfac, other, queue=queue) + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) + return result elif np.isscalar(other): common_dtype = _get_common_dtype(self, other, queue) result = self._new_like_me(common_dtype, queue=queue) - result.add_event( - self._axpbz(result, selffac, - self, common_dtype.type(otherfac * other), - queue=queue)) + evt = self._axpbz( + result, selffac, self, common_dtype.type(otherfac * other), + queue=queue) + + self.add_read_event(evt) + result.add_write_event(evt) + return result else: - raise NotImplementedError + raise NotImplementedError(f"'mul_add' with '{type(other).__name__}'") def __add__(self, other): """Add an array with an array or an array with a scalar.""" @@ -1191,11 +1244,12 @@ def __add__(self, other): if isinstance(other, Array): # add another vector result = _get_broadcasted_binary_op_result(self, other, self.queue) + evt = self._axpbyz( + result, self.dtype.type(1), self, other.dtype.type(1), other) - result.add_event( - self._axpbyz(result, - self.dtype.type(1), self, - other.dtype.type(1), other)) + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) return result elif np.isscalar(other): @@ -1205,9 +1259,12 @@ def __add__(self, other): else: common_dtype = _get_common_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - result.add_event( - self._axpbz(result, self.dtype.type(1), - self, common_dtype.type(other))) + evt = self._axpbz( + result, self.dtype.type(1), self, common_dtype.type(other)) + + self.add_read_event(evt) + result.add_write_event(evt) + return result else: return NotImplemented @@ -1219,10 +1276,12 @@ def __sub__(self, other): if isinstance(other, Array): result = _get_broadcasted_binary_op_result(self, other, self.queue) - result.add_event( - self._axpbyz(result, - self.dtype.type(1), self, - result.dtype.type(-1), other)) + evt = self._axpbyz( + result, self.dtype.type(1), self, result.dtype.type(-1), other) + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) return result elif np.isscalar(other): @@ -1231,9 +1290,12 @@ def __sub__(self, other): return self.copy() else: result = self._new_like_me( - _get_common_dtype(self, other, self.queue)) - result.add_event( - self._axpbz(result, self.dtype.type(1), self, -other)) + _get_common_dtype(self, other, self.queue)) + evt = self._axpbz(result, self.dtype.type(1), self, -other) + + self.add_read_event(evt) + result.add_write_event(evt) + return result else: return NotImplemented @@ -1244,41 +1306,52 @@ def __rsub__(self, other): x = n - self """ common_dtype = _get_common_dtype(self, other, self.queue) + # other must be a scalar result = self._new_like_me(common_dtype) - result.add_event( - self._axpbz(result, result.dtype.type(-1), self, - common_dtype.type(other))) + evt = self._axpbz( + result, result.dtype.type(-1), self, common_dtype.type(other)) + + self.add_read_event(evt) + result.add_write_event(evt) + return result def __iadd__(self, other): if isinstance(other, Array): - if (other.shape != self.shape - and other.shape != ()): + if other.shape != self.shape and other.shape != (): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event( - self._axpbyz(self, - self.dtype.type(1), self, - other.dtype.type(1), other)) + evt = self._axpbyz( + self, self.dtype.type(1), self, other.dtype.type(1), other) + + self.add_write_event(evt) + other.add_read_event(evt) + return self else: - self.add_event( - self._axpbz(self, self.dtype.type(1), self, other)) + evt = self._axpbz(self, self.dtype.type(1), self, other) + self.add_write_event(evt) + return self def __isub__(self, other): if isinstance(other, Array): - if (other.shape != self.shape - and other.shape != ()): + if other.shape != self.shape and other.shape != (): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event( - self._axpbyz(self, self.dtype.type(1), self, - other.dtype.type(-1), other)) + evt = self._axpbyz( + self, self.dtype.type(1), self, other.dtype.type(-1), other) + + self.add_write_event(evt) + other.add_read_event(evt) + return self elif np.isscalar(other): - self._axpbz(self, self.dtype.type(1), self, -other) + evt = self._axpbz(self, self.dtype.type(1), self, -other) + + self.add_write_event(evt) + return self else: return NotImplemented @@ -1288,21 +1361,32 @@ def __pos__(self): def __neg__(self): result = self._new_like_me() - result.add_event(self._axpbz(result, -1, self, 0)) + evt = self._axpbz(result, -1, self, 0) + + self.add_read_event(evt) + result.add_write_event(evt) + return result def __mul__(self, other): if isinstance(other, Array): result = _get_broadcasted_binary_op_result(self, other, self.queue) - result.add_event( - self._elwise_multiply(result, self, other)) + evt = self._elwise_multiply(result, self, other) + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) + return result elif np.isscalar(other): common_dtype = _get_common_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - result.add_event( - self._axpbz(result, - common_dtype.type(other), self, self.dtype.type(0))) + evt = self._axpbz( + result, common_dtype.type(other), self, self.dtype.type(0)) + + self.add_read_event(evt) + result.add_write_event(evt) + return result else: return NotImplemented @@ -1310,48 +1394,58 @@ def __mul__(self, other): def __rmul__(self, scalar): common_dtype = _get_common_dtype(self, scalar, self.queue) result = self._new_like_me(common_dtype) - result.add_event( - self._axpbz(result, - common_dtype.type(scalar), self, self.dtype.type(0))) + evt = self._axpbz( + result, common_dtype.type(scalar), self, self.dtype.type(0)) + + self.add_read_event(evt) + result.add_write_event(evt) + return result def __imul__(self, other): if isinstance(other, Array): - if (other.shape != self.shape - and other.shape != ()): + if other.shape != self.shape and other.shape != (): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event( - self._elwise_multiply(self, self, other)) + evt = self._elwise_multiply(self, self, other) + + other.add_read_event(evt) + self.add_write_event(evt) + return self elif np.isscalar(other): - # scalar - self.add_event( - self._axpbz(self, other, self, self.dtype.type(0))) + evt = self._axpbz(self, other, self, self.dtype.type(0)) + self.add_write_event(evt) + return self else: return NotImplemented def __div__(self, other): - """Divides an array by an array or a scalar, i.e. ``self / other``. - """ + """Divides an array by an array or a scalar, i.e. ``self / other``.""" if isinstance(other, Array): result = _get_broadcasted_binary_op_result( - self, other, self.queue, - dtype_getter=_get_truedivide_dtype) - result.add_event(self._div(result, self, other)) + self, other, self.queue, dtype_getter=_get_truedivide_dtype) + evt = self._div(result, self, other) + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) + return result elif np.isscalar(other): if other == 1: return self.copy() else: common_dtype = _get_truedivide_dtype(self, other, self.queue) - # create a new array for the result result = self._new_like_me(common_dtype) - result.add_event( - self._axpbz(result, - np.true_divide(common_dtype.type(1), other), - self, self.dtype.type(0))) + evt = self._axpbz( + result, np.true_divide(common_dtype.type(1), other), + self, self.dtype.type(0)) + + self.add_read_event(evt) + result.add_write_event(evt) + return result else: return NotImplemented @@ -1359,19 +1453,26 @@ def __div__(self, other): __truediv__ = __div__ def __rdiv__(self, other): - """Divides an array by a scalar or an array, i.e. ``other / self``. - """ + """Divides an array by a scalar or an array, i.e. ``other / self``.""" common_dtype = _get_truedivide_dtype(self, other, self.queue) if isinstance(other, Array): result = self._new_like_me(common_dtype) - result.add_event(other._div(result, self)) + evt = other._div(result, self) + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) + return result elif np.isscalar(other): # create a new array for the result result = self._new_like_me(common_dtype) - result.add_event( - self._rdiv_scalar(result, self, common_dtype.type(other))) + evt = self._rdiv_scalar(result, self, common_dtype.type(other)) + + self.add_read_event(evt) + result.add_write_event(evt) + return result else: return NotImplemented @@ -1390,16 +1491,20 @@ def __itruediv__(self, other): and other.shape != ()): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event( - self._div(self, self, other)) + evt = self._div(self, self, other) + self.add_write_event(evt) + other.add_read_event(evt) + return self elif np.isscalar(other): if other == 1: return self else: - self.add_event( - self._axpbz(self, common_dtype.type(np.true_divide(1, other)), - self, self.dtype.type(0))) + evt = self._axpbz( + self, common_dtype.type(np.true_divide(1, other)), + self, self.dtype.type(0)) + self.add_write_event(evt) + return self else: return NotImplemented @@ -1413,12 +1518,17 @@ def __and__(self, other): if isinstance(other, Array): result = _get_broadcasted_binary_op_result(self, other, self.queue) - result.add_event(self._array_binop(result, self, other, op="&")) + evt = self._array_binop(result, self, other, op="&") + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) else: - # create a new array for the result result = self._new_like_me(common_dtype) - result.add_event( - self._scalar_binop(result, self, other, op="&")) + evt = self._scalar_binop(result, self, other, op="&") + + self.add_read_event(evt) + result.add_write_event(evt) return result @@ -1433,12 +1543,18 @@ def __or__(self, other): if isinstance(other, Array): result = _get_broadcasted_binary_op_result(self, other, self.queue) - result.add_event(self._array_binop(result, self, other, op="|")) + evt = self._array_binop(result, self, other, op="|") + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) else: # create a new array for the result result = self._new_like_me(common_dtype) - result.add_event( - self._scalar_binop(result, self, other, op="|")) + evt = self._scalar_binop(result, self, other, op="|") + + self.add_read_event(evt) + result.add_write_event(evt) return result @@ -1453,12 +1569,18 @@ def __xor__(self, other): if isinstance(other, Array): result = _get_broadcasted_binary_op_result(self, other, self.queue) - result.add_event(self._array_binop(result, self, other, op="^")) + evt = self._array_binop(result, self, other, op="^") + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) else: # create a new array for the result result = self._new_like_me(common_dtype) - result.add_event( - self._scalar_binop(result, self, other, op="^")) + evt = self._scalar_binop(result, self, other, op="^") + + self.add_read_event(evt) + result.add_write_event(evt) return result @@ -1475,10 +1597,13 @@ def __iand__(self, other): and other.shape != ()): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event(self._array_binop(self, self, other, op="&")) + evt = self._array_binop(self, self, other, op="&") + + self.add_read_event(evt) + other.add_write_event(evt) else: - self.add_event( - self._scalar_binop(self, self, other, op="&")) + evt = self._scalar_binop(self, self, other, op="&") + self.add_write_event(evt) return self @@ -1493,10 +1618,13 @@ def __ior__(self, other): and other.shape != ()): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event(self._array_binop(self, self, other, op="|")) + evt = self._array_binop(self, self, other, op="|") + + self.add_read_event(evt) + other.add_write_event(evt) else: - self.add_event( - self._scalar_binop(self, self, other, op="|")) + evt = self._scalar_binop(self, self, other, op="|") + self.add_write_event(evt) return self @@ -1511,15 +1639,19 @@ def __ixor__(self, other): and other.shape != ()): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event(self._array_binop(self, self, other, op="^")) + evt = self._array_binop(self, self, other, op="^") + + self.add_read_event(evt) + other.add_write_event(evt) else: - self.add_event( - self._scalar_binop(self, self, other, op="^")) + evt = self._scalar_binop(self, self, other, op="^") + self.add_write_event(evt) return self def _zero_fill(self, queue=None, wait_for=None): queue = queue or self.queue + wait_for = wait_for or [] if not self.size: return @@ -1533,9 +1665,10 @@ def _zero_fill(self, queue=None, wait_for=None): # circumvent bug with large buffers on NVIDIA # https://github.com/inducer/pyopencl/issues/395 if cl_version_gtr_1_2 and not (on_nvidia and self.nbytes >= 2**31): - self.add_event( - cl.enqueue_fill(queue, self.base_data, np.int8(0), - self.nbytes, offset=self.offset, wait_for=wait_for)) + evt = cl.enqueue_fill( + queue, self.base_data, np.int8(0), + self.nbytes, offset=self.offset, wait_for=wait_for) + self.add_write_event(evt) else: zero = np.zeros((), self.dtype) self.fill(zero, queue=queue) @@ -1546,8 +1679,8 @@ def fill(self, value, queue=None, wait_for=None): :returns: *self*. """ - self.add_event( - self._fill(self, value, queue=queue, wait_for=wait_for)) + evt = self._fill(self, value, queue=queue, wait_for=wait_for) + self.add_write_event(evt) return self @@ -1564,7 +1697,10 @@ def __abs__(self): """ result = self._new_like_me(self.dtype.type(0).real.dtype) - result.add_event(self._abs(result, self)) + evt = self._abs(result, self) + self.add_read_event(evt) + result.add_write_event(evt) + return result def __pow__(self, other): @@ -1577,12 +1713,17 @@ def __pow__(self, other): result = self._new_like_me( _get_common_dtype(self, other, self.queue)) - result.add_event( - self._pow_array(result, self, other)) + evt = self._pow_array(result, self, other) + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) else: - result = self._new_like_me( - _get_common_dtype(self, other, self.queue)) - result.add_event(self._pow_scalar(result, self, other)) + result = self._new_like_me(_get_common_dtype(self, other, self.queue)) + evt = self._pow_scalar(result, self, other) + + self.add_read_event(evt) + result.add_write_event(evt) return result @@ -1590,8 +1731,11 @@ def __rpow__(self, other): # other must be a scalar common_dtype = _get_common_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - result.add_event( - self._rpow_scalar(result, common_dtype.type(other), self)) + evt = self._rpow_scalar(result, common_dtype.type(other), self) + + self.add_read_event(evt) + result.add_write_event(evt) + return result def __invert__(self): @@ -1599,7 +1743,10 @@ def __invert__(self): raise TypeError("Integral types only") result = self._new_like_me() - result.add_event(self._unop(result, self, op="~")) + evt = self._unop(result, self, op="~") + + self.add_read_event(evt) + result.add_write_event(evt) return result @@ -1611,8 +1758,11 @@ def reverse(self, queue=None): """ result = self._new_like_me() - result.add_event( - self._reverse(result, self)) + evt = self._reverse(result, self) + + self.add_read_event(evt) + result.add_write_event(evt) + return result def astype(self, dtype, queue=None): @@ -1621,7 +1771,11 @@ def astype(self, dtype, queue=None): return self.copy() result = self._new_like_me(dtype=dtype) - result.add_event(self._copy(result, self, queue=queue)) + evt = self._copy(result, self, queue=queue) + + self.add_read_event(evt) + result.add_write_event(evt) + return result # {{{ rich comparisons, any, all @@ -1636,21 +1790,31 @@ def __bool__(self): def any(self, queue=None, wait_for=None): from pyopencl.reduction import get_any_kernel krnl = get_any_kernel(self.context, self.dtype) - if wait_for is None: - wait_for = [] - result, event1 = krnl(self, queue=queue, - wait_for=wait_for + self.events, return_event=True) - result.add_event(event1) + + queue = queue or self.queue + wait_for = wait_for or [] + result, evt = krnl( + self, queue=queue, wait_for=wait_for + self.write_events, + return_event=True) + + self.add_read_event(evt) + result.add_write_event(evt) + return result def all(self, queue=None, wait_for=None): from pyopencl.reduction import get_all_kernel krnl = get_all_kernel(self.context, self.dtype) - if wait_for is None: - wait_for = [] - result, event1 = krnl(self, queue=queue, - wait_for=wait_for + self.events, return_event=True) - result.add_event(event1) + + queue = queue or self.queue + wait_for = wait_for or [] + result, evt = krnl( + self, queue=queue, wait_for=wait_for + self.write_events, + return_event=True) + + self.add_read_event(evt) + result.add_write_event(evt) + return result @staticmethod @@ -1670,72 +1834,115 @@ def _array_comparison(out, a, b, queue=None, op=None): def __eq__(self, other): if isinstance(other, Array): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._array_comparison(result, self, other, op="==")) + evt = self._array_comparison(result, self, other, op="==") + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) + return result else: result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._scalar_comparison(result, self, other, op="==")) + evt = self._scalar_comparison(result, self, other, op="==") + + self.add_read_event(evt) + result.add_write_event(evt) + return result def __ne__(self, other): if isinstance(other, Array): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._array_comparison(result, self, other, op="!=")) + evt = self._array_comparison(result, self, other, op="!=") + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) + return result else: result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._scalar_comparison(result, self, other, op="!=")) + evt = self._scalar_comparison(result, self, other, op="!=") + + self.add_read_event(evt) + result.add_write_event(evt) + return result def __le__(self, other): if isinstance(other, Array): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._array_comparison(result, self, other, op="<=")) + evt = self._array_comparison(result, self, other, op="<=") + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) + return result else: result = self._new_like_me(_BOOL_DTYPE) - self._scalar_comparison(result, self, other, op="<=") + evt = self._scalar_comparison(result, self, other, op="<=") + + self.add_read_event(evt) + result.add_write_event(evt) + return result def __ge__(self, other): if isinstance(other, Array): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._array_comparison(result, self, other, op=">=")) + evt = self._array_comparison(result, self, other, op=">=") + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) + return result else: result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._scalar_comparison(result, self, other, op=">=")) + evt = self._scalar_comparison(result, self, other, op=">=") + + self.add_read_event(evt) + result.add_write_event(evt) + return result def __lt__(self, other): if isinstance(other, Array): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._array_comparison(result, self, other, op="<")) + evt = self._array_comparison(result, self, other, op="<") + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) + return result else: result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._scalar_comparison(result, self, other, op="<")) + evt = self._scalar_comparison(result, self, other, op="<") + + self.add_read_event(evt) + result.add_write_event(evt) + return result def __gt__(self, other): if isinstance(other, Array): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._array_comparison(result, self, other, op=">")) + evt = self._array_comparison(result, self, other, op=">") + + self.add_read_event(evt) + other.add_read_event(evt) + result.add_write_event(evt) + return result else: result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._scalar_comparison(result, self, other, op=">")) + evt = self._scalar_comparison(result, self, other, op=">") + + self.add_read_event(evt) + result.add_write_event(evt) + return result # }}} @@ -1745,8 +1952,11 @@ def __gt__(self, other): def real(self): if self.dtype.kind == "c": result = self._new_like_me(self.dtype.type(0).real.dtype) - result.add_event( - self._real(result, self)) + evt = self._real(result, self) + + self.add_read_event(evt) + result.add_write_event(evt) + return result else: return self @@ -1755,8 +1965,11 @@ def real(self): def imag(self): if self.dtype.kind == "c": result = self._new_like_me(self.dtype.type(0).real.dtype) - result.add_event( - self._imag(result, self)) + evt = self._imag(result, self) + + self.add_read_event(evt) + result.add_write_event(evt) + return result else: return zeros_like(self) @@ -1766,7 +1979,11 @@ def conj(self): """.. versionadded:: 2012.1""" if self.dtype.kind == "c": result = self._new_like_me() - result.add_event(self._conj(result, self)) + evt = self._conj(result, self) + + self.add_read_event(evt) + result.add_write_event(evt) + return result else: return self @@ -1778,25 +1995,46 @@ def conj(self): # {{{ event management def add_event(self, evt): - """Add *evt* to :attr:`events`. If :attr:`events` is too long, this method - may implicitly wait for a subset of :attr:`events` and clear them from the - list. + return self.add_write_event(evt) + + def add_write_event(self, evt: "cl.Event") -> None: + """Add *evt* to :attr:`write_events`. If :attr:`write_events` is too + long, this method may implicitly wait for a subset of :attr:`write_events` + and clear them from the list. """ - n_wait = 4 + n = _MAX_EVENT_CLEAR_COUNT - self.events.append(evt) + self.write_events.append(evt) + if len(self.write_events) > 3*n: + wait_events = self.write_events[:n] + cl.wait_for_events(wait_events) + del self.write_events[:n] - if len(self.events) > 3*n_wait: - wait_events = self.events[:n_wait] + def add_read_event(self, evt: "cl.Event") -> None: + """Add *evt* to :attr:`read_events`. If :attr:`read_events` is too + long, this method may implicitly wait for a subset of :attr:`read_events` + and clear them from the list. + """ + n = _MAX_EVENT_CLEAR_COUNT + + self.read_events.append(evt) + if len(self.read_events) > 3*n: + wait_events = self.read_events[:n] cl.wait_for_events(wait_events) - del self.events[:n_wait] + del self.read_events[:n] def finish(self): - """Wait for the entire contents of :attr:`events`, clear it.""" + """Wait for the entire contents of :attr:`write_events` and + :attr:`read_events` and clear the lists. + """ - if self.events: - cl.wait_for_events(self.events) - del self.events[:] + if self.write_events: + cl.wait_for_events(self.write_events) + del self.write_events[:] + + if self.read_events: + cl.wait_for_events(self.read_events) + del self.read_events[:] # }}} @@ -2052,7 +2290,8 @@ def map_to_host(self, queue=None, flags=None, is_blocking=True, wait_for=None): ary, evt = cl.enqueue_map_buffer( queue or self.queue, self.base_data, flags, self.offset, self.shape, self.dtype, strides=self.strides, - wait_for=wait_for + self.events, is_blocking=is_blocking) + wait_for=wait_for + self.write_events, is_blocking=is_blocking) + self.add_read_event(evt) if is_blocking: return ary @@ -2173,7 +2412,7 @@ def setitem(self, subscript, value, queue=None, wait_for=None): queue = queue or self.queue or value.queue if wait_for is None: wait_for = [] - wait_for = wait_for + self.events + wait_for = wait_for + self.write_events if isinstance(subscript, Array): if subscript.dtype.kind not in ("i", "u"): @@ -2194,9 +2433,10 @@ def setitem(self, subscript, value, queue=None, wait_for=None): if isinstance(value, np.ndarray): if subarray.shape == value.shape and subarray.strides == value.strides: - self.add_event( - cl.enqueue_copy(queue, subarray.base_data, - value, dst_offset=subarray.offset, wait_for=wait_for)) + evt = cl.enqueue_copy( + queue, subarray.base_data, value, dst_offset=subarray.offset, + wait_for=wait_for) + subarray.add_write_event(evt) return else: value = to_device(queue, value, self.allocator) @@ -2212,11 +2452,11 @@ def setitem(self, subscript, value, queue=None, wait_for=None): raise NotImplementedError("cannot assign between arrays of " "differing strides") - self.add_event( - self._copy(subarray, value, queue=queue, wait_for=wait_for)) + evt = self._copy(subarray, value, queue=queue, wait_for=wait_for) + subarray.add_write_event(evt) else: - # Let's assume it's a scalar + assert np.isscalar(value) subarray.fill(value, queue=queue, wait_for=wait_for) def __setitem__(self, subscript, value): @@ -2312,8 +2552,9 @@ def to_device(queue, ary, allocator=None, async_=None, first_arg = queue.context result = Array(first_arg, ary.shape, ary.dtype, - allocator=allocator, strides=ary.strides) + allocator=allocator, strides=ary.strides) result.set(ary, async_=async_, queue=queue) + return result @@ -2332,6 +2573,7 @@ def zeros(queue, shape, dtype, order="C", allocator=None): order=order, allocator=allocator, _context=queue.context, _queue=queue) result._zero_fill() + return result @@ -2340,8 +2582,8 @@ def empty_like(ary, queue=_copy_queue, allocator=None): as *other_ary*. """ - return ary._new_with_changes(data=None, offset=0, queue=queue, - allocator=allocator) + return ary._new_with_changes( + data=None, offset=0, queue=queue, allocator=allocator) def zeros_like(ary): @@ -2366,8 +2608,7 @@ class _ArangeInfo: @elwise_kernel_runner def _arange_knl(result, start, step): - return elementwise.get_arange_kernel( - result.context, result.dtype) + return elementwise.get_arange_kernel(result.context, result.dtype) def arange(queue, *args, **kwargs): @@ -2453,8 +2694,8 @@ def arange(queue, *args, **kwargs): size = int(ceil((stop-start)/step)) result = Array(queue, (size,), dtype, allocator=inf.allocator) - result.add_event( - _arange_knl(result, start, step, queue=queue, wait_for=wait_for)) + evt = _arange_knl(result, start, step, queue=queue, wait_for=wait_for) + result.add_write_event(evt) # }}} @@ -2467,8 +2708,7 @@ def arange(queue, *args, **kwargs): @elwise_kernel_runner def _take(result, ary, indices): - return elementwise.get_take_kernel( - result.context, result.dtype, indices.dtype) + return elementwise.get_take_kernel(result.context, result.dtype, indices.dtype) def take(a, indices, out=None, queue=None, wait_for=None): @@ -2481,8 +2721,12 @@ def take(a, indices, out=None, queue=None, wait_for=None): out = type(a)(queue, indices.shape, a.dtype, allocator=a.allocator) assert len(indices.shape) == 1 - out.add_event( - _take(out, a, indices, queue=queue, wait_for=wait_for)) + evt = _take(out, a, indices, queue=queue, wait_for=wait_for) + + indices.add_read_event(evt) + a.add_read_event(evt) + out.add_write_event(evt) + return out @@ -2532,16 +2776,20 @@ def make_func_for_chunk_size(chunk_size): cl.kernel_work_group_info.WORK_GROUP_SIZE, queue.device)) - wait_for_this = (indices.events - + _builtin_sum((i.events for i in arrays[chunk_slice]), []) - + _builtin_sum((o.events for o in out[chunk_slice]), [])) + wait_for_this = (indices.write_events + + _builtin_sum((i.write_events for i in arrays[chunk_slice]), []) + + _builtin_sum((o.write_events for o in out[chunk_slice]), [])) evt = knl(queue, gs, ls, indices.data, *([o.data for o in out[chunk_slice]] + [i.data for i in arrays[chunk_slice]] + [indices.size]), wait_for=wait_for_this) - for o in out[chunk_slice]: - o.add_event(evt) + + indices.add_read_event(evt) + for o, a in zip(out[chunk_slice], arrays[chunk_slice]): + o.add_write_event(evt) + if a is not o: + a.add_read_event(evt) return out @@ -2610,17 +2858,23 @@ def make_func_for_chunk_size(chunk_size): cl.kernel_work_group_info.WORK_GROUP_SIZE, queue.device)) - wait_for_this = (dest_indices.events + src_indices.events - + _builtin_sum((i.events for i in arrays[chunk_slice]), []) - + _builtin_sum((o.events for o in out[chunk_slice]), [])) + wait_for_this = ( + dest_indices.write_events + src_indices.write_events + + _builtin_sum((i.write_events for i in arrays[chunk_slice]), []) + + _builtin_sum((o.write_events for o in out[chunk_slice]), [])) evt = knl(queue, gs, ls, *([o for o in out[chunk_slice]] + [dest_indices, src_indices] + [i for i in arrays[chunk_slice]] + src_offsets_list[chunk_slice] + [src_indices.size]), wait_for=wait_for_this) - for o in out[chunk_slice]: - o.add_event(evt) + + src_indices.add_read_event(evt) + dest_indices.add_read_event(evt) + for o, a in zip(out[chunk_slice], arrays[chunk_slice]): + o.add_write_event(evt) + if a is not o: + a.add_read_event(evt) return out @@ -2637,7 +2891,7 @@ def multi_put(arrays, dest_indices, dest_shape=None, out=None, queue=None, queue = queue or dest_indices.queue if wait_for is None: wait_for = [] - wait_for = wait_for + dest_indices.events + wait_for = wait_for + dest_indices.write_events vec_count = len(arrays) @@ -2688,8 +2942,8 @@ def make_func_for_chunk_size(chunk_size): queue.device)) wait_for_this = (wait_for - + _builtin_sum((i.events for i in arrays[chunk_slice]), []) - + _builtin_sum((o.events for o in out[chunk_slice]), [])) + + _builtin_sum((i.write_events for i in arrays[chunk_slice]), []) + + _builtin_sum((o.write_events for o in out[chunk_slice]), [])) evt = knl(queue, gs, ls, *( [o for o in out[chunk_slice]] @@ -2698,8 +2952,11 @@ def make_func_for_chunk_size(chunk_size): + [use_fill_cla, array_lengths_cla, dest_indices.size]), wait_for=wait_for_this) - for o in out[chunk_slice]: - o.add_event(evt) + dest_indices.add_read_event(evt) + for o, a in zip(out[chunk_slice], arrays[chunk_slice]): + o.add_write_event(evt) + if a is not o: + a.add_read_event(evt) return out @@ -2774,7 +3031,7 @@ def concatenate(arrays, axis=0, queue=None, allocator=None): @elwise_kernel_runner def _diff(result, array): - return elementwise.get_diff_kernel(array.context, array.dtype) + return elementwise.get_diff_kernel(result.context, result.dtype) def diff(array, queue=None, allocator=None): @@ -2791,8 +3048,11 @@ def diff(array, queue=None, allocator=None): allocator = allocator or array.allocator result = array.__class__(queue, (n-1,), array.dtype, allocator=allocator) - event1 = _diff(result, array, queue=queue) - result.add_event(event1) + evt = _diff(result, array, queue=queue) + + array.add_read_event(evt) + result.add_write_event(evt) + return result @@ -2999,8 +3259,14 @@ def if_positive(criterion, then_, else_, out=None, queue=None): allocator=criterion.allocator, strides=out_strides) - event1 = _if_positive(out, criterion, then_, else_, queue=queue) - out.add_event(event1) + evt = _if_positive(out, criterion, then_, else_, queue=queue) + out.add_write_event(evt) + + criterion.add_read_event(evt) + if not is_then_scalar: + then_.add_read_event(evt) + if not is_else_scalar: + else_.add_read_event(evt) return out @@ -3042,7 +3308,12 @@ def maximum(a, b, out=None, queue=None): elif not b_is_scalar: out = b._new_like_me(out_dtype, queue) - out.add_event(_minimum_maximum_backend(out, a, b, queue=queue, minmax="max")) + evt = _minimum_maximum_backend(out, a, b, queue=queue, minmax="max") + out.add_write_event(evt) + if not a_is_scalar: + a.add_read_event(evt) + if not b_is_scalar: + b.add_read_event(evt) return out @@ -3068,7 +3339,12 @@ def minimum(a, b, out=None, queue=None): elif not b_is_scalar: out = b._new_like_me(out_dtype, queue) - out.add_event(_minimum_maximum_backend(out, a, b, queue=queue, minmax="min")) + evt = _minimum_maximum_backend(out, a, b, queue=queue, minmax="min") + out.add_write_event(evt) + if not a_is_scalar: + a.add_read_event(evt) + if not b_is_scalar: + b.add_read_event(evt) return out @@ -3096,9 +3372,12 @@ def sum(a, dtype=None, queue=None, slice=None, initial=np._NoValue): from pyopencl.reduction import get_sum_kernel krnl = get_sum_kernel(a.context, dtype, a.dtype) - result, event1 = krnl(a, queue=queue, slice=slice, wait_for=a.events, - return_event=True) - result.add_event(event1) + result, evt = krnl( + a, queue=queue, slice=slice, wait_for=a.write_events, + return_event=True) + + a.add_read_event(evt) + result.add_write_event(evt) # NOTE: neutral element in `get_sum_kernel` is 0 by default if initial is not np._NoValue: @@ -3131,9 +3410,13 @@ def dot(a, b, dtype=None, queue=None, slice=None): from pyopencl.reduction import get_dot_kernel krnl = get_dot_kernel(a.context, dtype, a.dtype, b.dtype) - result, event1 = krnl(a, b, queue=queue, slice=slice, - wait_for=a.events + b.events, return_event=True) - result.add_event(event1) + result, evt = krnl( + a, b, queue=queue, slice=slice, + wait_for=a.write_events + b.write_events, return_event=True) + + a.add_read_event(evt) + b.add_read_event(evt) + result.add_write_event(evt) return result @@ -3150,9 +3433,13 @@ def vdot(a, b, dtype=None, queue=None, slice=None): krnl = get_dot_kernel(a.context, dtype, a.dtype, b.dtype, conjugate_first=True) - result, event1 = krnl(a, b, queue=queue, slice=slice, - wait_for=a.events + b.events, return_event=True) - result.add_event(event1) + result, evt = krnl( + a, b, queue=queue, slice=slice, + wait_for=a.write_events + b.write_events, return_event=True) + + a.add_read_event(evt) + b.add_read_event(evt) + result.add_write_event(evt) return result @@ -3168,9 +3455,15 @@ def subset_dot(subset, a, b, dtype=None, queue=None, slice=None): krnl = get_subset_dot_kernel( a.context, dtype, subset.dtype, a.dtype, b.dtype) - result, event1 = krnl(subset, a, b, queue=queue, slice=slice, - wait_for=subset.events + a.events + b.events, return_event=True) - result.add_event(event1) + result, evt = krnl( + subset, a, b, queue=queue, slice=slice, + wait_for=subset.write_events + a.write_events + b.write_events, + return_event=True) + + subset.add_read_event(evt) + a.add_read_event(evt) + b.add_read_event(evt) + result.add_write_event(evt) return result @@ -3193,9 +3486,12 @@ def f(a, queue=None, initial=np._NoValue): from pyopencl.reduction import get_minmax_kernel krnl = get_minmax_kernel(a.context, what, a.dtype) - result, event1 = krnl(a, queue=queue, wait_for=a.events, - return_event=True) - result.add_event(event1) + result, evt = krnl( + a, queue=queue, wait_for=a.write_events, + return_event=True) + + a.add_read_event(evt) + result.add_write_event(evt) if initial is not np._NoValue: initial = a.dtype.type(initial) @@ -3228,10 +3524,17 @@ def _make_subset_minmax_kernel(what): def f(subset, a, queue=None, slice=None): from pyopencl.reduction import get_subset_minmax_kernel krnl = get_subset_minmax_kernel(a.context, what, a.dtype, subset.dtype) - result, event1 = krnl(subset, a, queue=queue, slice=slice, - wait_for=a.events + subset.events, return_event=True) - result.add_event(event1) + result, evt = krnl( + subset, a, queue=queue, slice=slice, + wait_for=a.write_events + subset.write_events, + return_event=True) + + a.add_read_event(evt) + subset.add_read_event(evt) + result.add_write_event(evt) + return result + return f @@ -3265,8 +3568,10 @@ def cumsum(a, output_dtype=None, queue=None, from pyopencl.scan import get_cumsum_kernel krnl = get_cumsum_kernel(a.context, a.dtype, output_dtype) - evt = krnl(a, result, queue=queue, wait_for=wait_for + a.events) - result.add_event(evt) + evt = krnl(a, result, queue=queue, wait_for=wait_for + a.write_events) + + a.add_read_event(evt) + result.add_write_event(evt) if return_event: return evt, result