Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade OnlineBuffer to LeRobotDatasetV2 and use it in the train script. #445

Open
wants to merge 51 commits into
base: main
Choose a base branch
from

Conversation

alexander-soare
Copy link
Collaborator

@alexander-soare alexander-soare commented Sep 18, 2024

What this does

This PR promotes OnlineBuffer to the more widely scoped LeRobotDatasetV2. In other words, the class is generalized to be used for all purposes, rather than just as a data buffer for the online training loop. In a series of follow-up PR's LerobotDataset will gradually be superseded by LeRobotDatasetV2. Changes to look out for:

  • online_buffer.py::OnlineBuffer -> data_buffer.py::LeRobotDatasetV2.
  • Generalized more intuitive interface in LeRobotDatasetV2.
  • A new LeRobotDatasetV2.from_hugging_face_hub method. This downloads a HF hub dataset then siphons it into a LeRobotDatasetV2 (in a follow up we might try streaming from the hub to avoid doubling up on disk space requirements).
  • LeRobotDatsetV2 supports multiple image modes including videos, PNG files and memmaps.
  • Changes to the train and eval scripts to make the LeRobotDatasetV2. Of particular interest is that the training script has two new options added which can be used to configure the usage of the LeRobotDatasetV2.
  • Minor changes to some utilities, mostly around the idea that LeRobotDatasetV2 works primarily with numpy arrays, not torch tensors.

Note: For the LeRobot library user, this change will be silent. The options to use the LeRobotDatasetV2 are not added to the yaml configs and need to be explicitly specified. The plan is for us to road test it internally, or allow users to try it out if they really want, while follow-up PRs proceed.

TODO before merging

  • Points in testing section. Particularly, run the backwards compatibility script as a last step before merging.
  • Change file names from online_buffer to lerobot_dataset_v2.

Why? (high-level context)

These are the main reasons we are switching to LeRobotDatasetV2 (backed by numpy.memmaps) from datasets.Dataset (backed by PyArrow)

  1. For the online training loop, we frequently update the data in the buffer, including overwriting old data. numpy.memmaps are much more amenable to this (it's no contest, datasets.Datasets were not designed for in-place updates).
  2. In general, for iterating through the dataset, numpy.memmaps are faster than dataset.Datasets in the way we currently use them (on the order of 10x faster). Some work can be done both on our end, and on the datasets end to speed dataset.Datasets up, but even at a fundamental level PyArrow can't be made to be faster than numpy.memmaps (although it can get close).
  3. The interface to numpy.memmaps is dead simple for anyone who knows how to use numpy (everyone...). They can be sliced and mutated the same way as regular arrays. In comparison, the interface for dataset.Datasets requires some getting used to (certainly worth it if the use case is right, but not in LeRobot).

It should be noted that points 1 and 2 above are particularly salient in LeRobot, where each data item in a training loop typically involves taking slices over the temporal dimension multiple data arrays. For example, the default settings for TD-MPC require slices of 5 time-steps over actions and observations with a batch size of 256. With dataset.Datasets, the data loading bottleneck slows training to a crawl.

Follow-ups

  • Refactor online training sampler to use the LeRobotDatasetV2 for the offline dataset.
  • Generate video paths on the fly rather than storing them explicitly in the data.
  • Incorporate multi-dataset support
  • In from_huggingface_hub("lerobot/image_dataset", decode_images=True) avoid decoding and re-encoding the PNG files.
  • Don't make it compulsory to provide the buffer capacity. Just double the size of the memmaps as and when needed.
  • In from_hugging_face_hub, stream the HF Dataset and siphon it into the LeRobot DataBuffer one episode at a time.
  • Completely remove LeRobotDataset or replace it with LeRobotDatasetV2 (this has lots of sub-points)

How it was tested

  • Added tests for CI in both pytest and end-to-end testing

WIP

  • Check that iterating through the LeRobotDatasetV2 gives the same items as iterating through the LeRobotDataset for a representative selection of datasets. I won't add B/C tests for this as LeRobotDataset will be completely phased out. Instead, I'll leave a script here. I'll run the script again right before merging.
  • Train a model with the offline training loop with the LeRobotDatasetV2 (shouldn't be a big deal if the point above is done properly)
  • Train TD-MPC pusht with online training loop.

How to checkout & try? (for the reviewer)

Run this script with/without the +use_lerobot_data_buffer=true, and when +use_lerobot_data_buffer=true, try it with/without +lerobot_data_buffer_decode_video=true. Observe the data_s log item in the terminal.

JOB_NAME=vqbet_dubug

# python -m debugpy --listen localhost:51355 --wait-for-client lerobot/scripts/train.py \
python lerobot/scripts/train.py \
    hydra.job.name=$JOB_NAME \
    hydra.run.dir=outputs/train/$(date +'%Y-%m-%d/%H-%M-%S')_${JOB_NAME} \
    env=pusht \
    dataset_repo_id=lerobot/pusht_image \
    policy=vqbet \
    training.num_workers=0 \
    training.log_freq=1 \
    training.offline_steps=400000 \
    training.save_checkpoint=true \
    training.save_freq=50000 \
    wandb.enable=false \
    wandb.disable_artifact=true \
    device=cuda \
    use_amp=true \
    +use_lerobot_data_buffer=true \
    +lerobot_data_buffer_decode_video=true

This change is Reviewable

@alexander-soare alexander-soare marked this pull request as draft September 18, 2024 17:39
return Path(f"/tmp/{repo_id}_{fingerprint}{'_decoded' if decode_video else ''}")


def compute_sampler_weights(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: This will look very different once we drop LeRobotDataset, has decent test coverage for now, and was already here before the PR. Feel free to skim or skip.

pass


class DataBuffer(torch.utils.data.Dataset):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: Totally happy for other name suggestions.



# Arbitrarily set small dataset sizes, making sure to have uneven sizes.
@pytest.mark.parametrize("offline_dataset_size", [0, 6])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: all tests from here and below were here before the PR. Feel free to skim / skip.

Copy link
Collaborator Author

@alexander-soare alexander-soare left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 10 files reviewed, 1 unresolved discussion (waiting on @aliberts)


tests/test_data_buffer.py line 16 at r1 (raw file):

# See the License for the specific language governing permissions and
# limitations under the License.d
from copy import deepcopy

Test comment

@alexander-soare alexander-soare marked this pull request as ready for review September 19, 2024 16:04
@alexander-soare
Copy link
Collaborator Author

alexander-soare commented Sep 19, 2024

Backwards compat check script:

"""
(this will take < 20 mins, covers FULL datasets)
python scripts/data_buffer_bc_checks.py

OR

(this will take hours, covers lots of datasets, probs overkill)
DATA_DIR=tests/data python scripts/data_buffer_bc_checks.py
"""

import os
from itertools import product

import torch
import torch.utils
from tqdm import trange

from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
from lerobot.common.datasets.online_buffer import LeRobotDatasetV2

if os.environ.get("DATA_DIR", "") == "tests/data":
    datasets_to_test = [
        "lerobot/pusht",
        "lerobot/pusht_image",
        "lerobot/pusht_keypoints",
        "lerobot/aloha_mobile_cabinet",
        "lerobot/aloha_mobile_chair",
        "lerobot/aloha_mobile_elevator",
        "lerobot/aloha_mobile_shrimp",
        "lerobot/aloha_mobile_wash_pan",
        "lerobot/aloha_mobile_wipe_wine",
        "lerobot/aloha_sim_insertion_human",
        "lerobot/aloha_sim_insertion_human_image",
        "lerobot/aloha_sim_insertion_scripted",
        "lerobot/aloha_sim_insertion_scripted_image",
        "lerobot/aloha_sim_transfer_cube_human",
        "lerobot/aloha_sim_transfer_cube_human_image",
        "lerobot/aloha_sim_transfer_cube_scripted",
        "lerobot/aloha_sim_transfer_cube_scripted_image",
        "lerobot/aloha_static_battery",
        "lerobot/aloha_static_candy",
        "lerobot/aloha_static_coffee",
        "lerobot/aloha_static_coffee_new",
        "lerobot/aloha_static_cups_open",
        "lerobot/aloha_static_fork_pick_up",
        "lerobot/aloha_static_pingpong_test",
        "lerobot/aloha_static_pro_pencil",
        "lerobot/aloha_static_screw_driver",
        "lerobot/aloha_static_tape",
        "lerobot/aloha_static_thread_velcro",
        "lerobot/aloha_static_towel",
        "lerobot/aloha_static_vinh_cup",
        "lerobot/aloha_static_vinh_cup_left",
        "lerobot/aloha_static_ziploc_slide",
        "lerobot/umi_cup_in_the_wild",
        "lerobot/unitreeh1_fold_clothes",
        "lerobot/unitreeh1_rearrange_objects",
        "lerobot/unitreeh1_two_robot_greeting",
        "lerobot/unitreeh1_warehouse",
        "lerobot/xarm_lift_medium",
        "lerobot/xarm_lift_medium_image",
        "lerobot/xarm_lift_medium_replay",
        "lerobot/xarm_lift_medium_replay_image",
        "lerobot/xarm_push_medium",
        "lerobot/xarm_push_medium_image",
        "lerobot/xarm_push_medium_replay",
        "lerobot/xarm_push_medium_replay_image",
    ]
else:
    # Reduced test set
    datasets_to_test = [
        "lerobot/pusht",
        "lerobot/pusht_image",
        "lerobot/pusht_keypoints",
        "lerobot/unitreeh1_two_robot_greeting",  # chosen because it contains multiple image keys
    ]

for dataset_repo_id in datasets_to_test:
    lerobot_dataset = LeRobotDataset(dataset_repo_id)
    fps = lerobot_dataset.fps

    assert "observation.state" in lerobot_dataset[0]

    delta_timestamps = {
        "action": [-1 / fps, 0, 1 / fps],
        "observation.state": [-1 / fps, 0, 1 / fps],
        **{k: [-1 / fps, 0, 1 / fps] for k in lerobot_dataset.camera_keys},
    }
    delta_timestamps_options = [None, delta_timestamps]

    decode_images_options = [False]
    if os.environ.get("DATA_DIR", "") == "tests/data" and len(lerobot_dataset.camera_keys) > 0:
        decode_images_options.append(True)

    for decode_images, delta_timestamps in product(decode_images_options, delta_timestamps_options):
        lerobot_dataset = LeRobotDataset(dataset_repo_id, delta_timestamps=delta_timestamps)
        buffer = LeRobotDatasetV2.from_huggingface_hub(
            dataset_repo_id, decode_images=decode_images, delta_timestamps=delta_timestamps
        )
        assert len(lerobot_dataset) == len(buffer)

        lerobot_dataset_dataloader = iter(
            torch.utils.data.DataLoader(
                lerobot_dataset, batch_size=16, num_workers=0, shuffle=False, drop_last=False
            )
        )
        buffer_dataloader = iter(
            torch.utils.data.DataLoader(buffer, batch_size=16, num_workers=0, shuffle=False, drop_last=False)
        )

        for _ in trange(
            len(lerobot_dataset_dataloader),
            desc=f"{dataset_repo_id}{'_dt' if delta_timestamps is not None else ''}{'_decoded' if decode_images else ''}",
        ):
            buffer_item = next(buffer_dataloader)
            lerobot_dataset_item = next(lerobot_dataset_dataloader)
            assert set(buffer_item) == set(lerobot_dataset_item)
            for k in buffer_item:
                assert buffer_item[k].dtype == lerobot_dataset_item[k].dtype
                assert torch.equal(buffer_item[k], lerobot_dataset_item[k])

Copy link
Collaborator

@aliberts aliberts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First review where I focused mainly on the from_huggingface_hub method. I don't think I should review further until we discuss my comment about the video buffering as I think there was a misunderstanding on the feature-set/perimeter of this dataset reshape. I simply don't think we should buffer any videos at all, this defeats a lot of the purpose of video encoding/decoding (except for new online episodes where it makes sense). Why do you think it is necessary?

Reviewable status: 0 of 10 files reviewed, 5 unresolved discussions (waiting on @alexander-soare)


lerobot/common/datasets/online_buffer.py line 69 at r2 (raw file):

    Data is considered to come in the form of "episodes" (an instance of a robot performing a task). Episodes
    are made up of "frames", which are chronoligically ordered and contain timestamp aligned data, potentially

Suggestion:

chronologically

lerobot/common/datasets/online_buffer.py line 91 at r2 (raw file):

    The `add_episodes` method can be used to insert more data in the form of integral episodes (starting from
    frame 0 and with the frames ordered). The buffer has a compulsory size limit, which must be provided when

Is there a limit to how big this can be?
Is there an observable decline in performance passed a certain threshold?

Code quote:

The buffer has a compulsory size limit

lerobot/common/datasets/online_buffer.py line 93 at r2 (raw file):

    frame 0 and with the frames ordered). The buffer has a compulsory size limit, which must be provided when
    creating a new one. Data is inserted in a circular fashion, inserting after the most recently added frame,
    and wrapping around to the start when necessary (in which case older episodes are overwritten).

Does that mean that if the buffer is full and I add_episodes with, say 5 frames, this will scrap the first 5 frames from another episode? Why leave not remove it entirely?

EDIT: nevermind I just read your docstring in that method. Feels weird to leave half episodes but I guess that makes sense as long as this is just a buffer and not a drop-in replacement for LeRobotDataset.

Code quote:

in which case older episodes are overwritten

lerobot/common/datasets/online_buffer.py line 545 at r2 (raw file):

            repo_id: The dataset repository ID.
            decode_video: If repo_id refers to a video dataset (the image observations are encoded as videos),
                decode the videos and store the frames in a numpy memmap.

I did not have that understanding about this new format. Sorry if I've not been very clear about that but what I had in mind was that it should only take care of the non-video data (and store that as np.memmap).

There is the question of how to handle/add new episodes in the online dataset where I think it does make sense to store frames temporarily as memmaps, but I don't think existing videos should be buffered.

Code quote:

            decode_video: If repo_id refers to a video dataset (the image observations are encoded as videos),
                decode the videos and store the frames in a numpy memmap.

lerobot/common/datasets/online_buffer.py line 558 at r2 (raw file):

        hf_dataset = load_hf_dataset(repo_id, version=CODEBASE_VERSION, root=root, split="train")
        hf_dataset.set_transform(lambda x: x)

Why does this need to exist?

Code quote:

        hf_dataset.set_transform(lambda x: x)

Copy link
Collaborator Author

@alexander-soare alexander-soare left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 10 files reviewed, 5 unresolved discussions (waiting on @aliberts)


lerobot/common/datasets/online_buffer.py line 69 at r2 (raw file):

    Data is considered to come in the form of "episodes" (an instance of a robot performing a task). Episodes
    are made up of "frames", which are chronoligically ordered and contain timestamp aligned data, potentially

Done.


lerobot/common/datasets/online_buffer.py line 91 at r2 (raw file):

Previously, aliberts (Simon Alibert) wrote…

Is there a limit to how big this can be?
Is there an observable decline in performance passed a certain threshold?

If I explain that this is because "you need to specify in advance how much disk space to reserve", is that clear enough?
Then I hope it would be implicit that it's not got to do with any other things like performance.

Maybe I should have worded it like that instead of "The buffer has a compulsory size limit", which I see now can be subtly misleading.


lerobot/common/datasets/online_buffer.py line 93 at r2 (raw file):

Why leave not remove it entirely?

I actually had some code to do this, but it doesn't play nicely at all with the data loader when num_workers > 0, and an online training loop. This is because it ends up reducing the dataset length, and the dataloader has already decided which indices to fetch in advance (meaning you could get an IndexError).
Does that explanation satisfy you, and if so, do you think I should explain it somehow here?


lerobot/common/datasets/online_buffer.py line 545 at r2 (raw file):

Previously, aliberts (Simon Alibert) wrote…

I did not have that understanding about this new format. Sorry if I've not been very clear about that but what I had in mind was that it should only take care of the non-video data (and store that as np.memmap).

There is the question of how to handle/add new episodes in the online dataset where I think it does make sense to store frames temporarily as memmaps, but I don't think existing videos should be buffered.

Think of this feature as a nice-to-have extra. It gives you a massive speedup over video decoding (10x +), and if your dataset is small, it's definitely worth it. For example, try training vqbet as I shared in the main PR comment with and without it.

If we get video decoding faster, I'd remove this feature.


lerobot/common/datasets/online_buffer.py line 558 at r2 (raw file):

Previously, aliberts (Simon Alibert) wrote…

Why does this need to exist?

There is some default transform in place. I know this because when I comment this out, my tests fail due to dtype issues. I could figure out exactly what that is, but no transform is better than transform for speed and the degree of control we have on our end.

Thoughts?

Copy link
Collaborator

@aliberts aliberts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 10 files reviewed, 9 unresolved discussions (waiting on @alexander-soare)


Makefile line 71 at r2 (raw file):

		policy.chunk_size=20 \
		training.batch_size=2 \
		training.image_transforms.enable=true \

If I read your code correctly, I don't think this is actually enabled.


lerobot/common/datasets/online_buffer.py line 91 at r2 (raw file):

"you need to specify in advance how much disk space to reserve", is that clear enough?

I understood that, so I'll take that the limit is simply your disk space.
Additionally, I was wondering if performance can vary with the size of the buffer (?)


lerobot/common/datasets/online_buffer.py line 93 at r2 (raw file):

Previously, alexander-soare (Alexander Soare) wrote…

Why leave not remove it entirely?

I actually had some code to do this, but it doesn't play nicely at all with the data loader when num_workers > 0, and an online training loop. This is because it ends up reducing the dataset length, and the dataloader has already decided which indices to fetch in advance (meaning you could get an IndexError).
Does that explanation satisfy you, and if so, do you think I should explain it somehow here?

Makes sense, I think the docstring in add_episodes is enough, thanks.


lerobot/common/datasets/online_buffer.py line 134 at r2 (raw file):

                created with the first call to `add_episodes`.
            buffer_capacity: How many frames should be stored in the buffer as a maximum. Be aware of your
                system's available disk space when choosing this. Note that if `storage_dir` references an

From a practical standpoint, should we rather define that as a disk-space amount?
I'm wondering how as, a user, I should determine this size in a number of frames given my disk space.

Code quote:

            buffer_capacity: How many frames should be stored in the buffer as a maximum. Be aware of your
                system's available disk space when choosing this. Note that if `storage_dir` references an

lerobot/common/datasets/online_buffer.py line 369 at r2 (raw file):

                    )
            else:
                _, h, w, c = data[k].shape

I think we should also retrieve that from the metadata in the future as well (ok for now).

Code quote:

_, h, w, c = data[k].shape

lerobot/common/datasets/online_buffer.py line 545 at r2 (raw file):

Previously, alexander-soare (Alexander Soare) wrote…

Think of this feature as a nice-to-have extra. It gives you a massive speedup over video decoding (10x +), and if your dataset is small, it's definitely worth it. For example, try training vqbet as I shared in the main PR comment with and without it.

If we get video decoding faster, I'd remove this feature.

I think this is a lot of complexity for a nice-to-have, but setting that aside, the issue is that speed is not the only factor here. What if we have hundreds of GB of video (as we do on some open X datasets)? We can't really scale with that.

Does this 10x factor take into account initial decoding? And will that work with streaming?

Video is the biggest chunk of the data by far (>99% in most of our repos), it's expected to take the largest amount of time during data loading. Even if we're able to further improve decoding time in the future by fiddling with encoding, it won't be by a factor of 10x because of the decompression — which you took out of the equation iiuc.
I do see a way that we could significantly improve it in the future by training sequentially on each frame in a per-episode loop. That way, we could decode and buffer the video to memory at the start of an episode and use that throughout that episode until the next one. I believe this is one of the features Torchcodec is working on (don't quote me on that though).

I'm also curious about the absolute gains (so not relative improvement). How much time does this shave off a 5h training run for example ?

I get your point that it can be useful in certain scenarios, but that has to be weighted-in against those questions as well. Wdyt?


lerobot/common/datasets/online_buffer.py line 558 at r2 (raw file):

Previously, alexander-soare (Alexander Soare) wrote…

There is some default transform in place. I know this because when I comment this out, my tests fail due to dtype issues. I could figure out exactly what that is, but no transform is better than transform for speed and the degree of control we have on our end.

Thoughts?

I see, could you add a comment about that?
We should probably fix that later on during the dataset rework. Parquet does support types so once written properly we shouldn't have the need for that.


lerobot/common/datasets/online_buffer.py line 620 at r2 (raw file):

                else:
                    data_dict[k] = np.stack(
                        [np.array(dct["path"], dtype=f"S{MAX_VIDEO_PATH_LENGTH}") for dct in hf_dataset[k]]

I think we should avoid using storage space for that and have a single template path (which will take episode number and camera key) in the metadata instead. That way we also avoid the annoying max length issue.

Code quote:

                    data_dict[k] = np.stack(
                        [np.array(dct["path"], dtype=f"S{MAX_VIDEO_PATH_LENGTH}") for dct in hf_dataset[k]]

@alexander-soare alexander-soare changed the title Upgrade OnlineBuffer to DataBuffer and use it in the train script. Upgrade OnlineBuffer to LeRobotDatasetV2 and use it in the train script. Sep 30, 2024
@alexander-soare alexander-soare marked this pull request as ready for review October 2, 2024 12:45
@alexander-soare alexander-soare self-assigned this Oct 3, 2024
@alexander-soare alexander-soare added the 🗃️ Dataset Something dataset-related label Oct 3, 2024
@alexander-soare
Copy link
Collaborator Author

@aliberts @Cadene I just want to clarify something about the API here. It's designed to work like the previous LeRobotDataset when adding episodes, in that the episode and data index is provided by the user but is essentially ignored, because internally the logic just continues the indices already in the dataset. I'm not a fan of this approach but wasn't yet ready to change it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🗃️ Dataset Something dataset-related
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants