-
Notifications
You must be signed in to change notification settings - Fork 658
Use JSON in pipeline checkpointing #6154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Szymon Karpiński <[email protected]>
Signed-off-by: Szymon Karpiński <[email protected]>
|
!build |
|
CI MESSAGE: [41566518]: BUILD STARTED |
Greptile SummaryThis PR refactors pipeline checkpointing from pickle to JSON serialization, improving security by eliminating arbitrary code execution risks. The implementation adds type-aware serialization/deserialization methods to handle numpy arrays and int64 values correctly, converting them to JSON-compatible types (lists and ints) during save and restoring them with proper dtypes during load. Both pipeline and iterator checkpoint handling now include JSONDecodeError handling that provides clear error messages when encountering old pickle-based checkpoints.
Confidence Score: 5/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant User
participant Pipeline
participant Iterator
participant Checkpoint
Note over User,Checkpoint: Saving Checkpoint
User->>Pipeline: checkpoint()
Pipeline->>Iterator: _save_state()
Iterator->>Iterator: _serialize_value() for each field
Note over Iterator: Convert np.int64 to int<br/>Convert np.ndarray to list
Iterator-->>Pipeline: JSON string (iterator_data)
Pipeline->>Pipeline: json.dumps(pipeline_data)
Note over Pipeline: Serialize epoch_idx and iter
Pipeline->>Checkpoint: GetSerializedCheckpoint()
Checkpoint-->>User: Serialized checkpoint
Note over User,Checkpoint: Restoring Checkpoint
User->>Pipeline: Pipeline(checkpoint=data)
Pipeline->>Pipeline: _restore_state_from_checkpoint()
Pipeline->>Pipeline: json.loads(pipeline_data)
alt JSON decode success
Pipeline->>Pipeline: Restore epoch_idx and iter
Pipeline->>Iterator: _restore_state(iterator_data)
Iterator->>Iterator: json.loads(iterator_data)
alt JSON decode success
Iterator->>Iterator: _deserialize_value() for each field
Note over Iterator: Convert int to np.int64<br/>Convert list to np.ndarray
Iterator->>Iterator: setattr() for each field
else JSON decode fails
Iterator-->>User: ValueError (version mismatch)
end
else JSON decode fails
Pipeline-->>User: ValueError (version mismatch)
end
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 files reviewed, 2 comments
| iterator_data = json.loads(iterator_data) # nosec B301 | ||
| for field in self._checkpointed_fields(): | ||
| if hasattr(self, field): | ||
| setattr(self, field, iterator_data[field]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if the serialization issue in _save_state() is fixed by converting numpy arrays to lists, this restoration code has a critical type mismatch bug.
When json.loads() deserializes the data, numpy arrays will be restored as Python lists, not numpy arrays. The code then directly assigns these lists to fields that are expected to be numpy arrays (e.g., _counter_per_gpu, _shard_sizes_per_gpu, _shards_id).
This will cause failures when the code later tries to use these fields with numpy operations. For example:
- Line 435:
np.any(self._counter_per_gpu + counter > self._shard_sizes_per_gpu)- will fail if these are lists - Line 502:
self._counter_per_gpu = self._counter_per_gpu + self._counter- numpy array addition fails with lists - Line 518:
self._shards_id = (self._shards_id + 1) % self._shards_num- fails if_shards_idis a list
The restoration needs to convert lists back to numpy arrays with the correct dtype:
for field in self._checkpointed_fields():
if hasattr(self, field):
value = iterator_data[field]
if field in ["_counter_per_gpu", "_shard_sizes_per_gpu", "_shards_id"] and isinstance(value, list):
value = np.array(value, dtype=np.int64)
setattr(self, field, value)
Additional Comments (1)
The Example failure scenario: import json
import numpy as np
data = {"_shards_id": np.array([0, 1], dtype=np.int64)}
json.dumps(data) # TypeError: Object of type ndarray is not JSON serializableThe previous
This will break any code path that uses |
Greptile OverviewGreptile SummaryThis PR attempts to replace pickle with JSON for pipeline checkpoint serialization to improve safety. However, the implementation has a critical bug that will cause runtime failures. Key IssuesCritical: NumPy Array Serialization FailureThe most severe issue is in Example failure path:
Breaking Change: No Backward CompatibilityThe PR provides no migration path for existing checkpoints. Checkpoints saved with pickle (previous version) cannot be loaded with the new JSON-based code, as What WorksThe changes in Recommendations
Confidence Score: 0/5
Important Files ChangedFile Analysis
Sequence DiagramsequenceDiagram
participant User
participant Iterator as DaliBaseIterator
participant Pipeline
participant Backend as C++ Backend
Note over User,Backend: Checkpointing Flow (with reader_name)
User->>Iterator: checkpoints()
Iterator->>Iterator: _save_state()
Note over Iterator: Collects fields:<br/>_counter (int)<br/>_counter_per_gpu (numpy array)<br/>_shard_sizes_per_gpu (numpy array)<br/>_shards_id (numpy array)<br/>_size (int)
Iterator->>Iterator: json.dumps(fields)
Note over Iterator: ❌ TypeError!<br/>NumPy arrays not JSON serializable
Note over User,Backend: Correct Flow (should be)
User->>Iterator: checkpoints()
Iterator->>Iterator: _save_state()
Iterator->>Iterator: Convert numpy arrays to lists
Iterator->>Iterator: json.dumps(fields)
Iterator-->>Pipeline: iterator_data (JSON string)
Pipeline->>Pipeline: _get_checkpoint(iterator_data)
Pipeline->>Pipeline: json.dumps({"iter": ..., "epoch_idx": ...})
Pipeline-->>Backend: ExternalContextCheckpoint
Note over Backend: pipeline_data (JSON string)<br/>iterator_data (JSON string)
Backend->>Backend: GetSerializedCheckpoint()
Backend-->>User: Checkpoint bytes
Note over User,Backend: Restoration Flow
User->>Pipeline: Pipeline(checkpoint=...)
Pipeline->>Backend: RestoreFromSerializedCheckpoint()
Backend-->>Pipeline: ExternalContextCheckpoint
Pipeline->>Pipeline: json.loads(pipeline_data)
Pipeline->>Iterator: _restore_state(iterator_data)
Iterator->>Iterator: json.loads(iterator_data)
Iterator->>Iterator: Restore fields from dict
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 files reviewed, 3 comments
Additional Comments (1)
When
Calling To fix this, you need to either:
|
|
CI MESSAGE: [41566518]: BUILD FAILED |
Signed-off-by: Szymon Karpiński <[email protected]>
Signed-off-by: Szymon Karpiński <[email protected]>
|
!build |
|
CI MESSAGE: [41630007]: BUILD STARTED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 file reviewed, 1 comment
|
CI MESSAGE: [41630007]: BUILD FAILED |
|
CI MESSAGE: [41630007]: BUILD PASSED |
Signed-off-by: Szymon Karpiński <[email protected]>
|
!build |
|
CI MESSAGE: [41813463]: BUILD STARTED |
|
CI MESSAGE: [41813463]: BUILD FAILED |
Signed-off-by: Szymon Karpiński <[email protected]>
|
!build |
Greptile's behavior is changing!From now on, if a review finishes with no comments, we will not post an additional "statistics" comment to confirm that our review found nothing to comment on. However, you can confirm that we reviewed your changes in the status check section. This feature can be toggled off in your Code Review Settings by deselecting "Create a status check for each PR". |
|
CI MESSAGE: [42047609]: BUILD STARTED |
|
CI MESSAGE: [42047609]: BUILD FAILED |
|
CI MESSAGE: [42047609]: BUILD PASSED |
Category:
Refactoring (Redesign of existing code that doesn't affect functionality)
Description:
Pipeline checkpoints are simple enough that the safe JSON can be used instead of Pickle.
Additional information:
Affected modules and functionalities:
Internal format of the checkpoints
Key points relevant for the review:
Tests:
Checklist
Documentation
DALI team only
Requirements
REQ IDs: N/A
JIRA TASK: DALI-4546