Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions dali/python/nvidia/dali/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import copy
import functools
import inspect
import pickle # nosec B403
import json
import sys
import traceback
import warnings
Expand Down Expand Up @@ -1164,7 +1164,14 @@ def start_py_workers(self):
def _restore_state_from_checkpoint(self):
if self._checkpoint is not None:
external_ctx_cpt = self._pipe.RestoreFromSerializedCheckpoint(self._checkpoint)
pipeline_data = pickle.loads(external_ctx_cpt.pipeline_data) # nosec B301
try:
pipeline_data = json.loads(external_ctx_cpt.pipeline_data)
except json.JSONDecodeError:
raise ValueError(
"Pipeline checkpoint data is not a valid JSON string. "
"Please make sure that the checkpoint was created with the same version "
"of DALI."
)
self._consumer_epoch_idx = self._epoch_idx = pipeline_data["epoch_idx"]
self._consumer_iter = pipeline_data["iter"]
if self._input_callbacks:
Expand Down Expand Up @@ -1880,7 +1887,7 @@ def _get_checkpoint(self, iterator_data=""):
"""

external_ctx_cpt = b.ExternalContextCheckpoint()
external_ctx_cpt.pipeline_data = pickle.dumps(
external_ctx_cpt.pipeline_data = json.dumps(
{"iter": self._consumer_iter, "epoch_idx": self._epoch_idx}
)
external_ctx_cpt.iterator_data = iterator_data
Expand Down
45 changes: 33 additions & 12 deletions dali/python/nvidia/dali/plugin/base_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import logging
import numpy as np
import warnings
import pickle # nosec B403
import json
from enum import Enum, unique
from collections.abc import Iterable

Expand Down Expand Up @@ -251,13 +251,27 @@ def __init__(

def _checkpointed_fields(self):
return [
"_counter",
"_counter_per_gpu",
"_shard_sizes_per_gpu",
"_shards_id",
"_size",
("_counter", np.int64),
("_counter_per_gpu", (np.ndarray, np.int64)),
("_shard_sizes_per_gpu", (np.ndarray, np.int64)),
("_shards_id", (np.ndarray, np.int64)),
("_size", None),
]

def _serialize_value(self, value, field_type):
if isinstance(field_type, tuple) and len(field_type) == 2 and field_type[0] is np.ndarray:
return value.tolist()
if field_type is np.int64:
return int(value)
return value

def _deserialize_value(self, value, field_type):
if isinstance(field_type, tuple) and len(field_type) == 2 and field_type[0] is np.ndarray:
return np.asarray(value, dtype=field_type[1])
if field_type is np.int64:
return np.int64(value)
return value

def _restore_state(self, iterator_data):
"""
Restores state of the iterator based on serialized `iterator_data`
Expand All @@ -269,16 +283,23 @@ def _restore_state(self, iterator_data):
)
return

iterator_data = pickle.loads(iterator_data) # nosec B301
for field in self._checkpointed_fields():
try:
iterator_data = json.loads(iterator_data)
except json.JSONDecodeError:
raise ValueError(
"Iterator checkpoint data is not a valid JSON string. "
"Please make sure that the checkpoint was created with the same version of DALI."
)
for field, field_type in self._checkpointed_fields():
if hasattr(self, field):
setattr(self, field, iterator_data[field])
setattr(self, field, self._deserialize_value(iterator_data[field], field_type))

def _save_state(self):
iterator_data = pickle.dumps(
checkpointed_fields = self._checkpointed_fields()
iterator_data = json.dumps(
{
field: getattr(self, field)
for field in self._checkpointed_fields()
field: self._serialize_value(getattr(self, field), field_type)
for field, field_type in checkpointed_fields
if hasattr(self, field)
}
)
Expand Down