-
Notifications
You must be signed in to change notification settings - Fork 3.1k
[WIP][data] feat: TransferQueue - integrate TransferQueue into main codebase #4987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
…#4902) ### What does this PR do? Unify the return values of the reward function to make the logic clearer ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: ... - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data`, `cfg`, `reward` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### API and Usage Example > Demonstrate how the API changes if any, and provide usage example(s) if possible. ```python # Add code snippet or script demonstrating how to use this ``` ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [x] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [x] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [x] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).) - [x] If your PR is related to the `recipe` submodule, please also update the reference to the submodule commit via `git submodule update --remote` or `cd recipe && git pull origin main`. --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
there are different _balance_batch funcs for ray_trainer with and without tq
dataproto.from_tensordict requires at least 1 tensor inside the tensordict, however, currently for generation, there is no tensor in the gen_batch, therefore I relax the restriction to enable tqbridge
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
1. pass tq config to worker config when enabling TQ, note that currently only ray_trainer_tq has been modified, and later ray_trainer needs to be updated too 2. change reward_extra_keys type from list to set 3. in tqbridge, when converting batchmeta to tensordict, several special meta_info keys need to be set as nontensordata even though their datatypes are list (compared with prints from non-tq version) 4. @register decorator bug fix
check the original code and find that it calls:
dataproto.to_tensordict():
...
output = tu.get_tensordict(tensor_dict=tensor_batch, non_tensor_dict=self.meta_info)
tu.get_tensordict():
...
for key, val in non_tensor_dict.items(): # non_tensor_dict is meta_info
assert key not in tensor_dict
tensor_dict[key] = NonTensorData(val)
by now, the meta_info will only be turned into NonTensorData
only need to pay attention to tu.assign_non_tensor(batch_td, xxx) in compute_yy funcs and make sure our modification aligns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request integrates the experimental TransferQueue feature into the main codebase, which is a significant refactoring. The changes involve moving and deleting several files, updating configurations, and modifying the training loop to support BatchMeta-based operations. My review focuses on critical correctness and performance issues. I've identified a critical bug that could lead to a crash in the training loop and a high-severity performance issue related to inefficient data fetching from the TransferQueue. Addressing these points will improve the stability and performance of the new feature.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR integrates the experimental TransferQueue feature into the main codebase, enabling efficient data transfer in distributed PPO training. The integration includes a new trainer implementation, configuration updates, and modifications to support BatchMeta-based operations.
Changes:
- Added
RayPPOTrainerTransferQueueas a new trainer implementation inheriting fromRayPPOTrainer - Updated configuration files to include TransferQueue-related settings (storage backend, batch management)
- Modified entry point (
main_ppo.py) to conditionally enable TransferQueue based on configuration - Integrated TransferQueue client creation in worker initialization and decorator functions
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
verl/trainer/ppo/ray_trainer_tq.py |
New trainer implementation with TransferQueue support |
verl/trainer/main_ppo.py |
Added conditional logic to select trainer based on TransferQueue config |
verl/utils/transferqueue_utils.py |
Enhanced utility functions for BatchMeta/TensorDict conversion |
verl/workers/engine_workers.py |
Added TransferQueue client initialization in workers |
verl/single_controller/base/decorator.py |
Extended register decorator with TransferQueue parameters |
verl/workers/config/engine.py |
Added TransferQueueConfig dataclass |
| Configuration files | Updated YAML configs with TransferQueue settings |
| CI/Scripts | Updated test scripts and workflow to use new entry point |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
verl/trainer/ppo/ray_trainer_tq.py
Outdated
| ) | ||
| batch_meta = batch_meta.union(compute_advantage_output_meta) | ||
|
|
||
| if "resampled_idx" in batch_meta.field_names and self.config.transferqueue.enable: |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In line 1094, there's a typo in the field name check: 'resampled_idx' should be 'pf_ppo_reweight_idx' based on the field being selected in line 1095. This inconsistency will cause the condition to never be True.
| if "resampled_idx" in batch_meta.field_names and self.config.transferqueue.enable: | |
| if "pf_ppo_reweight_idx" in batch_meta.field_names and self.config.transferqueue.enable: |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
1. convert lists to tuples in tqbridge 2. convert tuples back as lists in decorator
in order to avoid reorder/resample of the full data, the core_algos.compute_pf_ppo_reweight_data is replaced with core_algos.compute_pf_ppo_reweight_data_tq and accordingly, the compute_advantage function is also modified a bit for tq version
1. add TransferQueueConfig class and update TrainingWorkerConfig class 2. create TransferQueueConfig from self.config in rayppotrainertq and trainingworker
…sordict all DataProto.from_xxx methods require batch_size from tensor data, however, before generation, only non-tensor data exist in tensordict, which causes DataProto.check_consistency issue. To avoid this problem, we write a func from_tensordict_without_tensor and apply it when all data are non tensors.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
forgot to update this part
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
What does this PR do?
This PR integrates the experimental
TransferQueuefeature into the main codebase.Key Changes
1. New Trainer Implementation
RayPPOTrainerTransferQueuewhich inherits fromRayPPOTrainer.BatchMeta-based operations to replace the distribution of real data objects (DataProto/TensorDict).2. Configuration
ppo_trainer.yamlandppo_megatron_trainer.yamlto include TransferQueue-related configurations.3. Integration & Entry Point
RayPPOTrainerTransferQueueto adapt to the new features.main_ppo.pyto enable the TransferQueue via config.\Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includefsdp,megatron,veomni,sglang,vllm,rollout,trainer,ci,training_utils,recipe,hardware,deployment,ray,worker,single_controller,misc,perf,model,algo,env,tool,ckpt,doc,data,cfg,reward,like[megatron, fsdp, doc]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batchingTest
API and Usage Example
# Add code snippet or script demonstrating how to use thisDesign & Code Changes
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)recipesubmodule, please also update the reference to the submodule commit viagit submodule update --remoteorcd recipe && git pull origin main.