Skip to content

Commit ee1552e

Browse files
committed
Improved usability
1 parent 79616dc commit ee1552e

File tree

3 files changed

+65
-15
lines changed

3 files changed

+65
-15
lines changed

src/napari_stream/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
except ImportError:
44
__version__ = "unknown"
55

6-
from .sender import send
6+
from .sender import StreamSender, send
77

88
__all__ = []

src/napari_stream/_receiver_widget.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@ def __init__(self, napari_viewer: Viewer):
4040
self.autocontrast = QCheckBox("Auto-contrast on new images")
4141
self.autocontrast.setChecked(True)
4242

43-
self.btn_start = QPushButton("Start")
44-
self.btn_stop = QPushButton("Stop")
45-
self.btn_stop.setEnabled(False)
43+
self.btn_run = QPushButton("Start")
4644
self.btn_copy = QPushButton("Copy Endpoint")
4745

4846
top = QVBoxLayout(self)
@@ -55,16 +53,22 @@ def __init__(self, napari_viewer: Viewer):
5553
top.addWidget(self.autocontrast)
5654
top.addWidget(self.status_label)
5755
row2 = QHBoxLayout()
58-
row2.addWidget(self.btn_start)
59-
row2.addWidget(self.btn_stop)
56+
row2.addWidget(self.btn_run)
6057
top.addLayout(row2)
6158

62-
self.btn_start.clicked.connect(self._on_start)
63-
self.btn_stop.clicked.connect(self._on_stop)
59+
self.btn_run.clicked.connect(self._on_toggle_clicked)
6460
self.btn_copy.clicked.connect(self._copy_endpoint)
6561
self.public_access.stateChanged.connect(self._on_public_toggled)
6662

63+
def _on_toggle_clicked(self):
64+
if self._is_running():
65+
self._on_stop()
66+
else:
67+
self._on_start()
68+
6769
def _on_start(self):
70+
if self._is_running():
71+
return
6872
endpoint = self.endpoint_edit.text().strip()
6973
bind_endpoint = self._resolve_endpoint_for_worker(endpoint)
7074
self._thread = QThread()
@@ -76,8 +80,8 @@ def _on_start(self):
7680
self._worker.status.connect(self.status_label.setText)
7781
self._worker.error.connect(self._on_error)
7882

79-
self.btn_start.setEnabled(False)
80-
self.btn_stop.setEnabled(True)
83+
self.btn_run.setText("Stop")
84+
self.btn_run.setEnabled(True)
8185
self._thread.start()
8286

8387
def _on_stop(self):
@@ -86,8 +90,10 @@ def _on_stop(self):
8690
if self._thread is not None:
8791
self._thread.quit()
8892
self._thread.wait()
89-
self.btn_start.setEnabled(True)
90-
self.btn_stop.setEnabled(False)
93+
self._thread = None
94+
self._worker = None
95+
self.btn_run.setText("Start")
96+
self.btn_run.setEnabled(True)
9197

9298
def _on_public_toggled(self, checked: int):
9399
is_public = bool(checked)
@@ -96,6 +102,8 @@ def _on_public_toggled(self, checked: int):
96102
if current == self._last_auto_endpoint:
97103
self.endpoint_edit.setText(suggested)
98104
self._last_auto_endpoint = suggested
105+
if self._is_running():
106+
self._restart_listener()
99107

100108
def _copy_endpoint(self):
101109
endpoint = self.endpoint_edit.text().strip()
@@ -112,6 +120,13 @@ def _resolve_endpoint_for_worker(self, endpoint: str) -> str:
112120
self._last_auto_endpoint = fallback
113121
return bind_endpoint_for_public(fallback)
114122

123+
def _is_running(self) -> bool:
124+
return self._thread is not None and self._thread.isRunning()
125+
126+
def _restart_listener(self):
127+
self._on_stop()
128+
self._on_start()
129+
115130
def _on_received(self, arr: np.ndarray, meta: dict):
116131
name = meta.get("name", "array")
117132
is_labels = bool(meta.get("is_labels", False))

src/napari_stream/sender.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations
22
import json
3+
import os
4+
import time
35
from typing import Optional, Sequence, Any, Iterable
46
from collections.abc import Mapping
57

@@ -28,12 +30,16 @@ def __init__(
2830
linger_ms: int = 2000,
2931
ensure_delivery: bool = True,
3032
):
31-
self.endpoint = endpoint or default_endpoint()
33+
if endpoint is None:
34+
endpoint = os.environ.get("NAPARI_STREAM_ENDPOINT") or default_endpoint()
35+
self.endpoint = endpoint
3236
self.ensure_delivery = ensure_delivery
3337
self._ctx = zmq.Context.instance()
3438
self._sock = self._ctx.socket(zmq.PUSH)
3539
self._sock.setsockopt(zmq.SNDHWM, high_water_mark)
3640
self._sock.setsockopt(zmq.LINGER, linger_ms)
41+
# Fail fast if there is no receiver instead of silently queueing.
42+
self._sock.setsockopt(zmq.IMMEDIATE, 1)
3743
self._sock.connect(self.endpoint)
3844

3945
# ------------------------------ public API ------------------------------
@@ -152,7 +158,24 @@ def _send_numpy(
152158

153159
header = json.dumps(meta).encode("utf-8")
154160
buf = memoryview(arr) # zero-copy
155-
tracker = self._sock.send_multipart([header, buf], copy=False, track=self.ensure_delivery)
161+
tracker = None
162+
last_err = None
163+
for _ in range(3):
164+
try:
165+
tracker = self._sock.send_multipart(
166+
[header, buf],
167+
flags=zmq.NOBLOCK,
168+
copy=False,
169+
track=self.ensure_delivery,
170+
)
171+
last_err = None
172+
break
173+
except zmq.Again as e:
174+
last_err = e
175+
# Allow a brief window for sockets to finish connecting
176+
time.sleep(0.05)
177+
if last_err is not None:
178+
raise RuntimeError(f"No receiver available at endpoint {self.endpoint}") from last_err
156179
if self.ensure_delivery and tracker is not None:
157180
tracker.wait()
158181

@@ -298,5 +321,17 @@ def is_arraylike(self, x):
298321

299322

300323
def send(*args, **kwargs) -> None:
301-
sender = StreamSender()
324+
"""Send data to a napari receiver.
325+
326+
Accepts:
327+
- numpy.ndarray
328+
- torch.Tensor (detach().cpu().numpy())
329+
- blosc2.NDArray
330+
- zarr.Array
331+
- Python lists/tuples and dicts (recursively searched for arraylikes;
332+
nested Python lists of numbers are also converted to NumPy)
333+
"""
334+
sender_keys = ("endpoint", "high_water_mark", "linger_ms", "ensure_delivery")
335+
sender_kwargs = {k: kwargs.pop(k) for k in sender_keys if k in kwargs}
336+
sender = StreamSender(**sender_kwargs)
302337
sender.send(*args, **kwargs)

0 commit comments

Comments
 (0)