From f90b238b1d3d48014b4fa0bb01da34e776b921f6 Mon Sep 17 00:00:00 2001 From: Emil Khshiboun Date: Wed, 19 Apr 2023 21:34:32 +0300 Subject: [PATCH] Fix a bug that caused the controller to hang indefinitely on exit. During the teardown process (i.e when issuing quit() command), the controller tries to close all resources including the StreamChannel (StreamChannel is started on controller initaliazation). StreamChannel events are handled by separated thread, which listens for gRPC messages/replies from the P4Agent, when there is no requests, the StreamChannel blocks, which in turn blocks the thread. As part of the teardown, the controller join() the StreamChannel thread, which does not exit as there are no requests from the P4Agent and thus the controller hangs forever. The fix uses TimeoutIterator to query StreamChannel events with given timeout, if no events are received the loops continues and thus can check for program termination This commit adds dependecy on 'iterators' module that implments the TimeoutIterator Signed-off-by: Emil Khshiboun --- p4runtime_sh/p4runtime.py | 10 +++++++++- setup.cfg | 1 + 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/p4runtime_sh/p4runtime.py b/p4runtime_sh/p4runtime.py index f858f92..0f54465 100644 --- a/p4runtime_sh/p4runtime.py +++ b/p4runtime_sh/p4runtime.py @@ -21,6 +21,7 @@ import queue import sys import threading +from iterators import TimeoutIterator from typing import NamedTuple from p4.v1 import p4runtime_pb2 @@ -191,6 +192,7 @@ def __init__(self, device_id, grpc_addr, election_id, role_name=None, ssl_option self.set_up_stream() def set_up_stream(self): + self.signal_q = queue.Queue() self.stream_out_q = queue.Queue() # queues for different messages self.stream_in_q = { @@ -211,7 +213,12 @@ def stream_req_iterator(): def stream_recv_wrapper(stream): @parse_p4runtime_error def stream_recv(): - for p in stream: + iterator = TimeoutIterator(stream, timeout=1) + for p in iterator: + if not self.signal_q.empty(): + break + if p == iterator.get_sentinel(): + continue if p.HasField("arbitration"): self.stream_in_q["arbitration"].put(p) elif p.HasField("packet"): @@ -304,6 +311,7 @@ def tear_down(self): for k in self.stream_in_q: self.stream_in_q[k].put(None) if self.stream_recv_thread: + self.signal_q.put(None) self.stream_recv_thread.join() self.channel.close() del self.channel # avoid a race condition if channel deleted when process terminates diff --git a/setup.cfg b/setup.cfg index 5985412..c55e99c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -20,6 +20,7 @@ python_requires = >=3.6 setup_requires = setuptools_scm install_requires = + iterators == 0.2.0 ipaddr == 2.2.0 jedi == 0.17.2 ipython >= 7.31.1, == 7.31.*; python_version>='3.7'