Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BaseFedJob layer #3098

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/programming_guide/fed_job_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ The FedAvgJob automatically adds the FedAvg controller, PTFileModelPersistor and

For more examples of job patterns, see:

* :class:`BaseFedJob<nvflare.app_opt.pt.job_config.base_fed_job.BaseFedJob>`
* :class:`CommonJob<nvflare.job_config.common_job.CommonJob>`
* :class:`FedAvgJob<nvflare.app_opt.pt.job_config.fed_avg.FedAvgJob>` (pytorch)
* :class:`FedAvgJob<nvflare.app_opt.tf.job_config.fed_avg.FedAvgJob>` (tensorflow)
* :class:`CCWFJob<nvflare.app_common.ccwf.ccwf_job.CCWFJob>`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@
"#### 2. Define a FedJob\n",
"The `FedJob` is used to define how controllers and executors are placed within a federated job using the `to(object, target)` routine.\n",
"\n",
"Here we use a PyTorch `BaseFedJob`, where we can define the job name and the initial global model.\n",
"The `BaseFedJob` automatically configures components for model persistence, model selection, and TensorBoard streaming for convenience."
"Here we use a PyTorch `PTJob`, where we can define the job name and the initial global model.\n",
"The `PTJob` automatically configures components for model persistence, model selection, and TensorBoard streaming for convenience."
]
},
{
Expand All @@ -335,10 +335,10 @@
"from src.lit_net import LitNet\n",
"\n",
"from nvflare.app_common.workflows.fedavg import FedAvg\n",
"from nvflare.app_opt.pt.job_config.base_fed_job import BaseFedJob\n",
"from nvflare.app_opt.pt.job_config.pt_job import PTJob\n",
"from nvflare.job_config.script_runner import ScriptRunner\n",
"\n",
"job = BaseFedJob(\n",
"job = PTJob(\n",
" name=\"cifar10_lightning_fedavg\",\n",
" initial_model=LitNet(),\n",
")"
Expand Down
8 changes: 4 additions & 4 deletions examples/getting_started/pt/nvflare_pt_getting_started.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@
"#### 2. Define a FedJob\n",
"The `FedJob` is used to define how controllers and executors are placed within a federated job using the `to(object, target)` routine.\n",
"\n",
"Here we use a PyTorch `BaseFedJob`, where we can define the job name and the initial global model.\n",
"The `BaseFedJob` automatically configures components for model persistence, model selection, and TensorBoard streaming for convenience."
"Here we use a PyTorch `PTJob`, where we can define the job name and the initial global model.\n",
"The `PTJob` automatically configures components for model persistence, model selection, and TensorBoard streaming for convenience."
]
},
{
Expand All @@ -277,10 +277,10 @@
"from src.net import Net\n",
"\n",
"from nvflare.app_common.workflows.fedavg import FedAvg\n",
"from nvflare.app_opt.pt.job_config.base_fed_job import BaseFedJob\n",
"from nvflare.app_opt.pt.job_config.pt_job import PTJob\n",
"from nvflare.job_config.script_runner import ScriptRunner\n",
"\n",
"job = BaseFedJob(\n",
"job = PTJob(\n",
" name=\"cifar10_pt_fedavg\",\n",
" initial_model=Net(),\n",
")"
Expand Down
8 changes: 4 additions & 4 deletions examples/getting_started/tf/nvflare_tf_getting_started.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@
"#### 2. Define a FedJob\n",
"The `FedJob` is used to define how controllers and executors are placed within a federated job using the `to(object, target)` routine.\n",
"\n",
"Here we use a TensorFlow `BaseFedJob`, where we can define the job name and the initial global model.\n",
"The `BaseFedJob` automatically configures components for model persistence, model selection, and TensorBoard streaming for convenience."
"Here we use a TensorFlow `TFJob`, where we can define the job name and the initial global model.\n",
"The `TFJob` automatically configures components for model persistence, model selection, and TensorBoard streaming for convenience."
]
},
{
Expand All @@ -267,10 +267,10 @@
"from src.tf_net import TFNet\n",
"\n",
"from nvflare.app_common.workflows.fedavg import FedAvg\n",
"from nvflare.app_opt.tf.job_config.base_fed_job import BaseFedJob\n",
"from nvflare.app_opt.tf.job_config.tf_job import TFJob\n",
"from nvflare.job_config.script_runner import FrameworkType, ScriptRunner\n",
"\n",
"job = BaseFedJob(\n",
"job = TFJob(\n",
" name=\"cifar10_tf_fedavg\",\n",
" initial_model=TFNet(),\n",
")"
Expand Down
4 changes: 2 additions & 2 deletions nvflare/app_opt/pt/job_config/fed_avg.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import torch.nn as nn

from nvflare.app_common.workflows.fedavg import FedAvg
from nvflare.app_opt.pt.job_config.base_fed_job import BaseFedJob
from nvflare.app_opt.pt.job_config.pt_job import PTJob


class FedAvgJob(BaseFedJob):
class FedAvgJob(PTJob):
def __init__(
self,
initial_model: nn.Module,
Expand Down
4 changes: 2 additions & 2 deletions nvflare/app_opt/pt/job_config/fed_sag_mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
from nvflare.app_common.aggregators import InTimeAccumulateWeightedAggregator
from nvflare.app_common.shareablegenerators import FullModelShareableGenerator
from nvflare.app_common.workflows.scatter_and_gather import ScatterAndGather
from nvflare.app_opt.pt.job_config.base_fed_job import BaseFedJob
from nvflare.app_opt.pt.job_config.pt_job import PTJob
from nvflare.app_opt.tracking.mlflow.mlflow_receiver import MLflowReceiver
from nvflare.app_opt.tracking.mlflow.mlflow_writer import MLflowWriter


class SAGMLFlowJob(BaseFedJob):
class SAGMLFlowJob(PTJob):
def __init__(
self,
initial_model: nn.Module,
Expand Down
97 changes: 97 additions & 0 deletions nvflare/app_opt/pt/job_config/pt_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional

from torch import nn as nn

from nvflare.app_common.abstract.model_locator import ModelLocator
from nvflare.app_common.abstract.model_persistor import ModelPersistor
from nvflare.app_common.widgets.convert_to_fed_event import ConvertToFedEvent
from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector
from nvflare.app_common.widgets.streaming import AnalyticsReceiver
from nvflare.app_common.widgets.validation_json_generator import ValidationJsonGenerator
from nvflare.app_opt.pt.job_config.model import PTModel
from nvflare.app_opt.tracking.tb.tb_receiver import TBAnalyticsReceiver
from nvflare.job_config.api import validate_object_for_job
from nvflare.job_config.common_job import CommonJob


class PTJob(CommonJob):
def __init__(
self,
initial_model: nn.Module = None,
name: str = "fed_job",
min_clients: int = 1,
mandatory_clients: Optional[List[str]] = None,
key_metric: str = "accuracy",
validation_json_generator: Optional[ValidationJsonGenerator] = None,
intime_model_selector: Optional[IntimeModelSelector] = None,
convert_to_fed_event: Optional[ConvertToFedEvent] = None,
analytics_receiver: Optional[AnalyticsReceiver] = None,
model_persistor: Optional[ModelPersistor] = None,
model_locator: Optional[ModelLocator] = None,
):
"""PyTorch CommonJob.

Configures ValidationJsonGenerator, IntimeModelSelector, AnalyticsReceiver, ConvertToFedEvent.

User must add controllers and executors.

Args:
initial_model (nn.Module): initial PyTorch Model. Defaults to None.
name (name, optional): name of the job. Defaults to "fed_job".
min_clients (int, optional): the minimum number of clients for the job. Defaults to 1.
mandatory_clients (List[str], optional): mandatory clients to run the job. Default None.
key_metric (str, optional): Metric used to determine if the model is globally best.
if metrics are a `dict`, `key_metric` can select the metric used for global model selection.
Defaults to "accuracy".
validation_json_generator (ValidationJsonGenerator, optional): A component for generating validation results.
if not provided, a ValidationJsonGenerator will be configured.
intime_model_selector: (IntimeModelSelector, optional): A component for select the model.
if not provided, an IntimeModelSelector will be configured.
convert_to_fed_event: (ConvertToFedEvent, optional): A component to covert certain events to fed events.
if not provided, a ConvertToFedEvent object will be created.
analytics_receiver (AnlyticsReceiver, optional): Receive analytics.
If not provided, a TBAnalyticsReceiver will be configured.
model_persistor (optional, ModelPersistor): how to persistor the model.
model_locator (optional, ModelLocator): how to locate the model.
"""
super().__init__(
name=name,
min_clients=min_clients,
mandatory_clients=mandatory_clients,
key_metric=key_metric,
validation_json_generator=validation_json_generator,
intime_model_selector=intime_model_selector,
convert_to_fed_event=convert_to_fed_event,
)

self.initial_model = initial_model
self.comp_ids = {}

if analytics_receiver:
validate_object_for_job("analytics_receiver", analytics_receiver, AnalyticsReceiver)
else:
analytics_receiver = TBAnalyticsReceiver()

self.to_server(
id="receiver",
obj=analytics_receiver,
)

if initial_model:
self.comp_ids.update(
self.to_server(PTModel(model=initial_model, persistor=model_persistor, locator=model_locator))
)
4 changes: 2 additions & 2 deletions nvflare/app_opt/tf/job_config/fed_avg.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import tensorflow as tf

from nvflare.app_common.workflows.fedavg import FedAvg
from nvflare.app_opt.tf.job_config.base_fed_job import BaseFedJob
from nvflare.app_opt.tf.job_config.tf_job import TFJob


class FedAvgJob(BaseFedJob):
class FedAvgJob(TFJob):
def __init__(
self,
initial_model: tf.keras.Model,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
import tensorflow as tf

from nvflare.app_common.abstract.model_persistor import ModelPersistor
from nvflare.app_common.tracking.tracker_types import ANALYTIC_EVENT_TYPE
from nvflare.app_common.widgets.convert_to_fed_event import ConvertToFedEvent
from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector
from nvflare.app_common.widgets.streaming import AnalyticsReceiver
from nvflare.app_common.widgets.validation_json_generator import ValidationJsonGenerator
from nvflare.app_opt.tf.job_config.model import TFModel
from nvflare.app_opt.tracking.tb.tb_receiver import TBAnalyticsReceiver
from nvflare.job_config.api import FedJob, validate_object_for_job
from nvflare.job_config.api import validate_object_for_job
from nvflare.job_config.common_job import CommonJob


class BaseFedJob(FedJob):
class TFJob(CommonJob):
def __init__(
self,
initial_model: tf.keras.Model = None,
Expand All @@ -41,7 +41,7 @@ def __init__(
analytics_receiver: Optional[AnalyticsReceiver] = None,
model_persistor: Optional[ModelPersistor] = None,
):
"""TensorFlow BaseFedJob.
"""TensorFlow CommonJob.

Configures ValidationJsonGenerator, IntimeModelSelector, TBAnalyticsReceiver, ConvertToFedEvent.

Expand Down Expand Up @@ -69,29 +69,15 @@ def __init__(
name=name,
min_clients=min_clients,
mandatory_clients=mandatory_clients,
key_metric=key_metric,
validation_json_generator=validation_json_generator,
intime_model_selector=intime_model_selector,
convert_to_fed_event=convert_to_fed_event,
)

self.initial_model = initial_model
self.comp_ids = {}

if validation_json_generator:
validate_object_for_job("validation_json_generator", validation_json_generator, ValidationJsonGenerator)
else:
validation_json_generator = ValidationJsonGenerator()
self.to_server(id="json_generator", obj=validation_json_generator)

if intime_model_selector:
validate_object_for_job("intime_model_selector", intime_model_selector, IntimeModelSelector)
self.to_server(id="model_selector", obj=intime_model_selector)
elif key_metric:
self.to_server(id="model_selector", obj=IntimeModelSelector(key_metric=key_metric))

if convert_to_fed_event:
validate_object_for_job("convert_to_fed_event", convert_to_fed_event, ConvertToFedEvent)
else:
convert_to_fed_event = ConvertToFedEvent(events_to_convert=[ANALYTIC_EVENT_TYPE])
self.convert_to_fed_event = convert_to_fed_event

if analytics_receiver:
validate_object_for_job("analytics_receiver", analytics_receiver, AnalyticsReceiver)
else:
Expand All @@ -104,6 +90,3 @@ def __init__(

if initial_model:
self.comp_ids["persistor_id"] = self.to_server(TFModel(model=initial_model, persistor=model_persistor))

def set_up_client(self, target: str):
self.to(id="event_to_fed", obj=self.convert_to_fed_event, target=target)
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,19 @@

from typing import List, Optional

from torch import nn as nn

from nvflare.app_common.abstract.model_locator import ModelLocator
from nvflare.app_common.abstract.model_persistor import ModelPersistor
from nvflare.app_common.tracking.tracker_types import ANALYTIC_EVENT_TYPE
from nvflare.app_common.widgets.convert_to_fed_event import ConvertToFedEvent
from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector
from nvflare.app_common.widgets.streaming import AnalyticsReceiver
from nvflare.app_common.widgets.validation_json_generator import ValidationJsonGenerator
from nvflare.app_opt.pt.job_config.model import PTModel
from nvflare.app_opt.tracking.tb.tb_receiver import TBAnalyticsReceiver
from nvflare.job_config.api import FedJob, validate_object_for_job


class BaseFedJob(FedJob):
class CommonJob(FedJob):
def __init__(
self,
initial_model: nn.Module = None,
name: str = "fed_job",
min_clients: int = 1,
mandatory_clients: Optional[List[str]] = None,
Expand All @@ -43,14 +38,15 @@ def __init__(
model_persistor: Optional[ModelPersistor] = None,
model_locator: Optional[ModelLocator] = None,
):
"""PyTorch BaseFedJob.
"""CommonJob.

By default configures ValidationJsonGenerator, IntimeModelSelector, ConvertToFedEvent.

Configures ValidationJsonGenerator, IntimeModelSelector, AnalyticsReceiver, ConvertToFedEvent.
If provided, configures AnalyticsReceiver, ModelPersistor, ModelLocator.

User must add controllers and executors.

Args:
initial_model (nn.Module): initial PyTorch Model. Defaults to None.
name (name, optional): name of the job. Defaults to "fed_job".
min_clients (int, optional): the minimum number of clients for the job. Defaults to 1.
mandatory_clients (List[str], optional): mandatory clients to run the job. Default None.
Expand All @@ -64,8 +60,7 @@ def __init__(
convert_to_fed_event: (ConvertToFedEvent, optional): A component to covert certain events to fed events.
if not provided, a ConvertToFedEvent object will be created.
analytics_receiver (AnlyticsReceiver, optional): Receive analytics.
If not provided, a TBAnalyticsReceiver will be configured.
model_persistor (optional, ModelPersistor): how to persistor the model.
model_persistor (optional, ModelPersistor): how to persist the model.
model_locator (optional, ModelLocator): how to locate the model.
"""
super().__init__(
Expand All @@ -74,9 +69,6 @@ def __init__(
mandatory_clients=mandatory_clients,
)

self.initial_model = initial_model
self.comp_ids = {}

if validation_json_generator:
validate_object_for_job("validation_json_generator", validation_json_generator, ValidationJsonGenerator)
else:
Expand All @@ -97,18 +89,15 @@ def __init__(

if analytics_receiver:
validate_object_for_job("analytics_receiver", analytics_receiver, AnalyticsReceiver)
else:
analytics_receiver = TBAnalyticsReceiver()
self.to_server(id="receiver", obj=analytics_receiver)

self.to_server(
id="receiver",
obj=analytics_receiver,
)
if model_persistor:
validate_object_for_job("persistor", model_persistor, ModelPersistor)
self.to_server(id="persistor", obj=model_persistor)

if initial_model:
self.comp_ids.update(
self.to_server(PTModel(model=initial_model, persistor=model_persistor, locator=model_locator))
)
if model_locator:
validate_object_for_job("locator", model_locator, ModelLocator)
self.to_server(id="locator", obj=model_locator)

def set_up_client(self, target: str):
self.to(id="event_to_fed", obj=self.convert_to_fed_event, target=target)
Loading
Loading