Skip to content

Commit 0dca135

Browse files
committed
Merge branch 'main' into simple_eye_events
2 parents 91f9fbd + ba9917d commit 0dca135

File tree

6 files changed

+179
-34
lines changed

6 files changed

+179
-34
lines changed

CHANGES.rst

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
1.5.0
2+
###########
3+
- Auto-start only necessary streams in Simple API
4+
- Adds error information
5+
6+
1.4.0
7+
###########
8+
- Adds eye events (blinks, fixations)
9+
10+
1.3.6
11+
###########
12+
- Adds eyelid data
13+
114
1.3.5
215
###########
316
- Fixes streaming bug when audio is enabled

examples/async/start_stop_recordings.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
import asyncio
22

33
from pupil_labs.realtime_api import Device, Network, StatusUpdateNotifier
4-
from pupil_labs.realtime_api.models import Recording
4+
from pupil_labs.realtime_api.models import Recording, Sensor
55

66

7-
async def print_recording(component):
7+
async def on_status_update(component):
88
if isinstance(component, Recording):
9-
print(f"Update: {component.message}")
9+
if component.action == "ERROR":
10+
print(f"Error : {component.message}")
11+
12+
elif isinstance(component, Sensor):
13+
if component.stream_error:
14+
print(f"Stream error in sensor {component.sensor}")
1015

1116

1217
async def main():
@@ -18,7 +23,7 @@ async def main():
1823

1924
async with Device.from_discovered_device(dev_info) as device:
2025
# get update when recording is fully started
21-
notifier = StatusUpdateNotifier(device, callbacks=[print_recording])
26+
notifier = StatusUpdateNotifier(device, callbacks=[on_status_update])
2227
await notifier.receive_updates_start()
2328
recording_id = await device.recording_start()
2429
print(f"Initiated recording with id {recording_id}")

examples/async/stream_video_with_overlayed_fixations.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ async def enqueue_sensor_data(sensor: T.AsyncIterator, queue: asyncio.Queue) ->
7575

7676

7777
async def match_and_draw(queue_video, queue_eye_events):
78-
fixation_history = deque(maxlen=5)
78+
fixation_history = deque(maxlen=10)
7979
fixation_counter = 0
8080

8181
blink = None
@@ -88,6 +88,9 @@ async def match_and_draw(queue_video, queue_eye_events):
8888
while not queue_eye_events.empty():
8989
_, eye_event = await queue_eye_events.get()
9090
if isinstance(eye_event, FixationEventData):
91+
if eye_event.event_type == 0:
92+
continue
93+
9194
fixation_history.append(
9295
{
9396
"id": fixation_counter,

examples/simple/start_stop_recordings.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,13 @@
1313
recording_id = device.recording_start()
1414
print(f"Started recording with id {recording_id}")
1515

16-
time.sleep(5)
16+
# Check for errors while recording runs
17+
start_time = time.time()
18+
while time.time() - start_time < 5:
19+
for e in device.get_errors():
20+
print("Error:", e)
21+
22+
time.sleep(1)
1723

1824
device.recording_stop_and_save()
1925

src/pupil_labs/realtime_api/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class Sensor(T.NamedTuple):
128128
params: T.Optional[str] = None
129129
port: T.Optional[int] = None
130130
protocol: str = "rtsp"
131+
stream_error: bool = True
131132

132133
@property
133134
def url(self) -> T.Optional[str]:

src/pupil_labs/realtime_api/simple/device.py

Lines changed: 145 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,15 @@
1010
from ..base import DeviceBase
1111
from ..device import Device as _DeviceAsync
1212
from ..device import StatusUpdateNotifier
13-
from ..models import Component, Event, Sensor, Status, Template, TemplateDataFormat
13+
from ..models import (
14+
Component,
15+
Event,
16+
Recording,
17+
Sensor,
18+
Status,
19+
Template,
20+
TemplateDataFormat,
21+
)
1422
from ..streaming import (
1523
BlinkEventData,
1624
FixationEventData,
@@ -60,6 +68,8 @@ def __init__(
6068
self._status = self._get_status()
6169
self._start_background_worker(start_streaming_by_default)
6270

71+
self._errors: T.List[str] = []
72+
6373
@property
6474
def phone_name(self) -> str:
6575
return self._status.phone.device_name
@@ -107,6 +117,12 @@ def serial_number_scene_cam(self) -> T.Optional[str]:
107117
"""Returns ``None`` if no scene camera is connected"""
108118
return self._status.hardware.world_camera_serial
109119

120+
def get_errors(self) -> T.List[str]:
121+
errors = self._errors.copy()
122+
self._errors.clear()
123+
124+
return errors
125+
110126
def world_sensor(self) -> T.Optional[Sensor]:
111127
return self._status.direct_world_sensor()
112128

@@ -282,9 +298,16 @@ def receive_matched_scene_and_eyes_video_frames_and_gaze(
282298
def _receive_item(
283299
self, sensor: str, timeout_seconds: T.Optional[float] = None
284300
) -> T.Optional[T.Union[VideoFrame, GazeDataType]]:
285-
if not self.is_currently_streaming:
286-
logger.debug("receive_* called without being streaming")
287-
self.streaming_start()
301+
if sensor == MATCHED_ITEM_LABEL:
302+
self.start_stream_if_needed(Sensor.Name.GAZE.value)
303+
self.start_stream_if_needed(Sensor.Name.WORLD.value)
304+
elif sensor == MATCHED_GAZE_EYES_LABEL:
305+
self.start_stream_if_needed(Sensor.Name.GAZE.value)
306+
self.start_stream_if_needed(Sensor.Name.EYES.value)
307+
self.start_stream_if_needed(Sensor.Name.WORLD.value)
308+
else:
309+
self.start_stream_if_needed(sensor)
310+
288311
try:
289312
return self._most_recent_item[sensor].popleft()
290313
except IndexError:
@@ -295,23 +318,67 @@ def _receive_item(
295318
return self._most_recent_item[sensor].popleft()
296319
return None
297320

298-
def streaming_start(self):
299-
self._streaming_trigger_action(self._EVENT.SHOULD_STREAMS_START)
321+
def start_stream_if_needed(self, sensor: str):
322+
if not self._is_streaming_flags[sensor].is_set():
323+
logger.debug("receive_* called without being streaming")
324+
self.streaming_start(sensor)
325+
326+
def streaming_start(self, stream_name: str):
327+
if stream_name is None:
328+
for event in (
329+
self._EVENT.SHOULD_START_GAZE,
330+
self._EVENT.SHOULD_START_WORLD,
331+
self._EVENT.SHOULD_START_EYES,
332+
self._EVENT.SHOULD_START_IMU,
333+
):
334+
self._streaming_trigger_action(event)
335+
return
336+
337+
event = None
338+
if stream_name == Sensor.Name.GAZE.value:
339+
event = self._EVENT.SHOULD_START_GAZE
340+
elif stream_name == Sensor.Name.WORLD.value:
341+
event = self._EVENT.SHOULD_START_WORLD
342+
elif stream_name == Sensor.Name.EYES.value:
343+
event = self._EVENT.SHOULD_START_EYES
344+
elif stream_name == Sensor.Name.IMU.value:
345+
event = self._EVENT.SHOULD_START_IMU
346+
347+
self._streaming_trigger_action(event)
348+
349+
def streaming_stop(self, stream_name: str = None):
350+
if stream_name is None:
351+
for event in (
352+
self._EVENT.SHOULD_STOP_GAZE,
353+
self._EVENT.SHOULD_STOP_WORLD,
354+
self._EVENT.SHOULD_STOP_EYES,
355+
self._EVENT.SHOULD_STOP_IMU,
356+
):
357+
self._streaming_trigger_action(event)
358+
return
359+
360+
event = None
361+
if stream_name == Sensor.Name.GAZE.value:
362+
event = self._EVENT.SHOULD_STOP_GAZE
363+
elif stream_name == Sensor.Name.WORLD.value:
364+
event = self._EVENT.SHOULD_STOP_WORLD
365+
elif stream_name == Sensor.Name.EYES.value:
366+
event = self._EVENT.SHOULD_STOP_EYES
367+
elif stream_name == Sensor.Name.IMU.value:
368+
event = self._EVENT.SHOULD_STOP_IMU
300369

301-
def streaming_stop(self):
302-
self._streaming_trigger_action(self._EVENT.SHOULD_STREAMS_STOP)
370+
self._streaming_trigger_action(event)
303371

304372
def _streaming_trigger_action(self, action):
305373
if self._event_manager and self._background_loop:
306374
logger.debug(f"Sending {action.name} trigger")
307375
self._event_manager.trigger_threadsafe(action)
308376
else:
309-
logger.debug(f"Could send {action.name} trigger")
377+
logger.debug(f"Could not send {action.name} trigger")
310378

311379
@property
312380
def is_currently_streaming(self) -> bool:
313-
is_streaming = self._is_streaming_flag.is_set()
314-
return is_streaming
381+
return any(flag.is_set() for flag in self._is_streaming_flags.values())
315382

316383
def estimate_time_offset(
317384
self,
@@ -349,8 +416,14 @@ def __del__(self):
349416

350417
class _EVENT(enum.Enum):
351418
SHOULD_WORKER_CLOSE = "should worker close"
352-
SHOULD_STREAMS_START = "should stream start"
353-
SHOULD_STREAMS_STOP = "should streams stop"
419+
SHOULD_START_GAZE = "should start gaze"
420+
SHOULD_START_WORLD = "should start world"
421+
SHOULD_START_EYES = "should start eyes"
422+
SHOULD_START_IMU = "should start imu"
423+
SHOULD_STOP_GAZE = "should stop gaze"
424+
SHOULD_STOP_WORLD = "should stop world"
425+
SHOULD_STOP_EYES = "should stop eyes"
426+
SHOULD_STOP_IMU = "should stop imu"
354427

355428
def _start_background_worker(self, start_streaming_by_default):
356429
self._event_manager = None
@@ -377,13 +450,18 @@ def _start_background_worker(self, start_streaming_by_default):
377450
self._cached_eyes_for_matching: EyesCacheType = collections.deque(maxlen=200)
378451

379452
event_auto_update_started = threading.Event()
380-
self._is_streaming_flag = threading.Event()
453+
self._is_streaming_flags = {
454+
Sensor.Name.GAZE.value: threading.Event(),
455+
Sensor.Name.WORLD.value: threading.Event(),
456+
Sensor.Name.EYES.value: threading.Event(),
457+
Sensor.Name.IMU.value: threading.Event(),
458+
}
381459
self._auto_update_thread = threading.Thread(
382460
target=self._auto_update,
383461
kwargs=dict(
384462
device_weakref=weakref.ref(self), # weak ref to avoid cycling ref
385463
auto_update_started_flag=event_auto_update_started,
386-
is_streaming_flag=self._is_streaming_flag,
464+
is_streaming_flags=self._is_streaming_flags,
387465
start_streaming_by_default=start_streaming_by_default,
388466
),
389467
name=f"{self} auto-update thread",
@@ -409,7 +487,7 @@ async def _get_status():
409487
def _auto_update(
410488
device_weakref: weakref.ReferenceType,
411489
auto_update_started_flag: threading.Event,
412-
is_streaming_flag: threading.Event,
490+
is_streaming_flags: T.Dict[str, threading.Event],
413491
start_streaming_by_default: bool = False,
414492
):
415493
stream_managers = {
@@ -450,6 +528,14 @@ async def _process_status_changes(changed: Component):
450528
else:
451529
logger.debug(f"Unhandled DIRECT sensor {changed.sensor}")
452530

531+
elif isinstance(changed, Recording) and changed.action == "ERROR":
532+
device_weakref()._errors.append(changed.message)
533+
534+
elif isinstance(changed, Sensor) and changed.stream_error:
535+
error = f"Stream error in sensor {changed.sensor}"
536+
if error not in device_weakref()._errors:
537+
device_weakref()._errors.append(error)
538+
453539
async def _auto_update_until_closed():
454540
async with _DeviceAsync.convert_from(device_weakref()) as device:
455541
event_manager = _AsyncEventManager(Device._EVENT)
@@ -467,28 +553,59 @@ async def _auto_update_until_closed():
467553
auto_update_started_flag.set()
468554
if start_streaming_by_default:
469555
logger.debug("Streaming started by default")
470-
is_streaming_flag.set()
556+
start_stream(Sensor.Name.GAZE.value)
557+
start_stream(Sensor.Name.WORLD.value)
558+
start_stream(Sensor.Name.EYES.value)
559+
start_stream(Sensor.Name.IMU.value)
471560

472561
while True:
473562
logger.debug("Background worker waiting for event...")
474563
event = await event_manager.wait_for_first_event()
475564
logger.debug(f"Background worker received {event}")
565+
476566
if event is Device._EVENT.SHOULD_WORKER_CLOSE:
477567
break
478-
elif event is Device._EVENT.SHOULD_STREAMS_START:
479-
for manager in stream_managers.values():
480-
manager.should_be_streaming = True
481-
is_streaming_flag.set()
482-
logger.debug("Streaming started")
483-
elif event is Device._EVENT.SHOULD_STREAMS_STOP:
484-
for manager in stream_managers.values():
485-
manager.should_be_streaming = False
486-
is_streaming_flag.clear()
487-
logger.debug("Streaming stopped")
488-
else:
568+
569+
try:
570+
func = event_func_map[event]
571+
stream = event_stream_map[event]
572+
func(stream.value)
573+
except KeyError:
489574
raise RuntimeError(f"Unhandled {event!r}")
490575

491576
await notifier.receive_updates_stop()
492577
device_weakref()._event_manager = None
493578

579+
def start_stream(stream_name):
580+
is_streaming_flags[stream_name].set()
581+
stream_managers[stream_name].should_be_streaming = True
582+
logger.debug(f"Streaming started {stream_name}")
583+
584+
def stop_stream(stream_name):
585+
stream_managers[stream_name].should_be_streaming = False
586+
is_streaming_flags[stream_name].clear()
587+
logger.debug(f"Streaming stopped {stream_name}")
588+
589+
event_func_map = {
590+
Device._EVENT.SHOULD_START_GAZE: start_stream,
591+
Device._EVENT.SHOULD_START_WORLD: start_stream,
592+
Device._EVENT.SHOULD_START_EYES: start_stream,
593+
Device._EVENT.SHOULD_START_IMU: start_stream,
594+
Device._EVENT.SHOULD_STOP_GAZE: stop_stream,
595+
Device._EVENT.SHOULD_STOP_WORLD: stop_stream,
596+
Device._EVENT.SHOULD_STOP_EYES: stop_stream,
597+
Device._EVENT.SHOULD_STOP_IMU: stop_stream,
598+
}
599+
600+
event_stream_map = {
601+
Device._EVENT.SHOULD_START_GAZE: Sensor.Name.GAZE,
602+
Device._EVENT.SHOULD_START_WORLD: Sensor.Name.WORLD,
603+
Device._EVENT.SHOULD_START_EYES: Sensor.Name.EYES,
604+
Device._EVENT.SHOULD_START_IMU: Sensor.Name.IMU,
605+
Device._EVENT.SHOULD_STOP_GAZE: Sensor.Name.GAZE,
606+
Device._EVENT.SHOULD_STOP_WORLD: Sensor.Name.WORLD,
607+
Device._EVENT.SHOULD_STOP_EYES: Sensor.Name.EYES,
608+
Device._EVENT.SHOULD_STOP_IMU: Sensor.Name.IMU,
609+
}
610+
494611
return asyncio.run(_auto_update_until_closed())

0 commit comments

Comments
 (0)