diff --git a/p4runtime_sh/p4runtime.py b/p4runtime_sh/p4runtime.py index f858f92..0f54465 100644 --- a/p4runtime_sh/p4runtime.py +++ b/p4runtime_sh/p4runtime.py @@ -21,6 +21,7 @@ import queue import sys import threading +from iterators import TimeoutIterator from typing import NamedTuple from p4.v1 import p4runtime_pb2 @@ -191,6 +192,7 @@ def __init__(self, device_id, grpc_addr, election_id, role_name=None, ssl_option self.set_up_stream() def set_up_stream(self): + self.signal_q = queue.Queue() self.stream_out_q = queue.Queue() # queues for different messages self.stream_in_q = { @@ -211,7 +213,12 @@ def stream_req_iterator(): def stream_recv_wrapper(stream): @parse_p4runtime_error def stream_recv(): - for p in stream: + iterator = TimeoutIterator(stream, timeout=1) + for p in iterator: + if not self.signal_q.empty(): + break + if p == iterator.get_sentinel(): + continue if p.HasField("arbitration"): self.stream_in_q["arbitration"].put(p) elif p.HasField("packet"): @@ -304,6 +311,7 @@ def tear_down(self): for k in self.stream_in_q: self.stream_in_q[k].put(None) if self.stream_recv_thread: + self.signal_q.put(None) self.stream_recv_thread.join() self.channel.close() del self.channel # avoid a race condition if channel deleted when process terminates diff --git a/setup.cfg b/setup.cfg index 5985412..c55e99c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -20,6 +20,7 @@ python_requires = >=3.6 setup_requires = setuptools_scm install_requires = + iterators == 0.2.0 ipaddr == 2.2.0 jedi == 0.17.2 ipython >= 7.31.1, == 7.31.*; python_version>='3.7'