Skip to content

Commit

Permalink
Redid the branch to cleanup the commits
Browse files Browse the repository at this point in the history
  • Loading branch information
nvidianz committed Oct 7, 2024
1 parent 69df48d commit 0149f6d
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 166 deletions.
32 changes: 17 additions & 15 deletions nvflare/fuel/f3/streaming/byte_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,22 +309,22 @@ def close(self):


class ByteReceiver:

received_stream_counter_pool = StatsPoolManager.add_counter_pool(
name="Received_Stream_Counters",
description="Counters of received streams",
counter_names=[COUNTER_NAME_RECEIVED],
)

received_stream_size_pool = StatsPoolManager.add_msg_size_pool(
"Received_Stream_Sizes", "Sizes of streams received (MBs)"
)

def __init__(self, cell: CoreCell):
self.cell = cell
self.cell.register_request_cb(channel=STREAM_CHANNEL, topic=STREAM_DATA_TOPIC, cb=self._data_handler)
self.registry = Registry()

self.received_stream_counter_pool = StatsPoolManager.add_counter_pool(
name="Received_Stream_Counters",
description="Counters of received streams",
counter_names=[COUNTER_NAME_RECEIVED],
scope=self.cell.my_info.fqcn,
)

self.received_stream_size_pool = StatsPoolManager.add_msg_size_pool(
"Received_Stream_Sizes", "Sizes of streams received (MBs)", scope=self.cell.my_info.fqcn
)

def register_callback(self, channel: str, topic: str, stream_cb: Callable, *args, **kwargs):
if not callable(stream_cb):
raise StreamError(f"specified stream_cb {type(stream_cb)} is not callable")
Expand All @@ -345,13 +345,15 @@ def _data_handler(self, message: Message):
task.stop(StreamError(f"{task} No callback is registered for {task.channel}/{task.topic}"))
return

self.received_stream_counter_pool.increment(
category=stream_stats_category(task.channel, task.topic, "stream"),
fqcn = self.cell.my_info.fqcn
ByteReceiver.received_stream_counter_pool.increment(
category=stream_stats_category(fqcn, task.channel, task.topic, "stream"),
counter_name=COUNTER_NAME_RECEIVED,
)

self.received_stream_size_pool.record_value(
category=stream_stats_category(task.channel, task.topic, "stream"), value=task.size / ONE_MB
ByteReceiver.received_stream_size_pool.record_value(
category=stream_stats_category(fqcn, task.channel, task.topic, "stream"),
value=task.size / ONE_MB,
)

stream_thread_pool.submit(self._callback_wrapper, task, callback)
Expand Down
Loading

0 comments on commit 0149f6d

Please sign in to comment.