Skip to content

Commit

Permalink
Merge branch 'main' into fix_streaming_test_conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
nvidianz authored Nov 27, 2024
2 parents 17da566 + db0bead commit 723bcfa
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
23 changes: 16 additions & 7 deletions nvflare/private/fed/app/simulator/simulator_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
RunnerTask,
RunProcessKey,
SystemConfigs,
SystemVarName,
WorkspaceConstants,
)
from nvflare.apis.job_def import ALL_SITES, JobMetaKey
Expand Down Expand Up @@ -111,6 +112,7 @@ def __init__(
self.client_config = None
self.deploy_args = None
self.build_ctx = None
self.server_custom_folder = None

self.clients_created = 0

Expand Down Expand Up @@ -480,7 +482,7 @@ def simulator_run_main(self):
executor = ThreadPoolExecutor(max_workers=len(gpus))
for index in range(len(gpus)):
clients = split_clients[index]
executor.submit(lambda p: self.client_run(*p), [clients, gpus[index]])
executor.submit(lambda p: self.client_run(*p), [self.server_custom_folder, clients, gpus[index]])

executor.shutdown()
# Abort the server after all clients finished run
Expand All @@ -497,8 +499,10 @@ def simulator_run_main(self):
run_status = 1
return run_status

def client_run(self, clients, gpu):
client_runner = SimulatorClientRunner(self.args, clients, self.client_config, self.deploy_args, self.build_ctx)
def client_run(self, server_custom_folder, clients, gpu):
client_runner = SimulatorClientRunner(
server_custom_folder, self.args, clients, self.client_config, self.deploy_args, self.build_ctx
)
client_runner.run(gpu)

def start_server_app(self, args):
Expand All @@ -507,9 +511,9 @@ def start_server_app(self, args):
os.chdir(args.workspace)

args.server_config = os.path.join("config", JobConstants.SERVER_JOB_CONFIG)
app_custom_folder = os.path.join(app_server_root, "custom")
if os.path.isdir(app_custom_folder) and app_custom_folder not in sys.path:
sys.path.append(app_custom_folder)
self.server_custom_folder = os.path.join(app_server_root, "custom")
if os.path.isdir(self.server_custom_folder) and self.server_custom_folder not in sys.path:
sys.path.append(self.server_custom_folder)

startup = os.path.join(args.workspace, WorkspaceConstants.STARTUP_FOLDER_NAME)
os.makedirs(startup, exist_ok=True)
Expand Down Expand Up @@ -553,8 +557,9 @@ def dump_stats(self, workspace: Workspace):


class SimulatorClientRunner(FLComponent):
def __init__(self, args, clients: [], client_config, deploy_args, build_ctx):
def __init__(self, server_custom_folder, args, clients: [], client_config, deploy_args, build_ctx):
super().__init__()
self.server_custom_folder = server_custom_folder
self.args = args
self.federated_clients = clients
self.run_client_index = -1
Expand Down Expand Up @@ -702,6 +707,10 @@ def do_one_task(self, client, num_of_threads, gpu, lock, timeout=60.0, task_name
command += " --gpu " + str(gpu)
new_env = os.environ.copy()
add_custom_dir_to_path(app_custom_folder, new_env)
if self.server_custom_folder:
python_paths = new_env[SystemVarName.PYTHONPATH].split(os.pathsep)
python_paths.remove(self.server_custom_folder)
new_env[SystemVarName.PYTHONPATH] = os.pathsep.join(python_paths)

_ = subprocess.Popen(shlex.split(command, True), preexec_fn=os.setsid, env=new_env)

Expand Down
10 changes: 5 additions & 5 deletions nvflare/private/fed/utils/fed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import importlib
import json
import logging
Expand Down Expand Up @@ -475,11 +476,10 @@ def get_simulator_app_root(simulator_root, site_name):


def add_custom_dir_to_path(app_custom_folder, new_env):
path = new_env.get(SystemVarName.PYTHONPATH, "")
if path:
new_env[SystemVarName.PYTHONPATH] = path + os.pathsep + app_custom_folder
else:
new_env[SystemVarName.PYTHONPATH] = app_custom_folder
"""Util method to add app_custom_folder into the sys.path and carry into the child process."""
sys_path = copy.copy(sys.path)
sys_path.append(app_custom_folder)
new_env[SystemVarName.PYTHONPATH] = os.pathsep.join(sys_path)


def extract_participants(participants_list):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def test_start_server_app(self, mock_deploy, mock_admin, mock_register, mock_cel
def test_get_new_sys_path_with_empty(self):
args = Namespace(workspace="/tmp")
args.set = []
runner = SimulatorClientRunner(args, [], None, None, None)
runner = SimulatorClientRunner(None, args, [], None, None, None)
old_sys_path = copy.deepcopy(sys.path)
sys.path.insert(0, "")
sys.path.append("/temp/test")
Expand All @@ -172,7 +172,7 @@ def test_get_new_sys_path_with_empty(self):
def test_get_new_sys_path_with_multiple_empty(self):
args = Namespace(workspace="/tmp")
args.set = []
runner = SimulatorClientRunner(args, [], None, None, None)
runner = SimulatorClientRunner(None, args, [], None, None, None)
old_sys_path = copy.deepcopy(sys.path)
sys.path.insert(0, "")
if len(sys.path) > 2:
Expand All @@ -185,7 +185,7 @@ def test_get_new_sys_path_with_multiple_empty(self):
def test_get_new_sys_path(self):
args = Namespace(workspace="/tmp")
args.set = []
runner = SimulatorClientRunner(args, [], None, None, None)
runner = SimulatorClientRunner(None, args, [], None, None, None)
old_sys_path = copy.deepcopy(sys.path)
sys.path.append("/temp/test")
new_sys_path = runner._get_new_sys_path()
Expand Down

0 comments on commit 723bcfa

Please sign in to comment.