Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition epoch as a multi-GPU dataset distribution method #712

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

patrick-wilken
Copy link
Contributor

@patrick-wilken patrick-wilken commented Oct 11, 2021

The main commit is 704ee2b. I propose to reuse the partition epoch logic to distribute different parts of the data to different GPUs. The reason I needed an alternative to the random_seed_offset method is that this one does not work in cases where the sequence ordering is not, or at least in some aspects not random. In my case this was true for the following setup: several HDFDatasets in default sequence ordering (too costly to shuffle at run-time because of huge number of sequences), combined with CombinedDataset with laplace sequence ordering, using the sampling_sizes parameter, i.e. taking a fixed number of sequences from the HDFDatasets per epoch. random_seed_offset has the desired effect on the CombinedDataset level, however the sequences that are sampled from the HDFDatasets are the same for all GPUs, which is bad.
With the new partition method, the sampling and shuffling will be done identical for all GPUs, but then a different partition is selected per GPU. This is done using the sequence ordering, so no data is loaded and then thrown away as with the shard method. An additional advantage over random_seed_offset is that the original meaning of an epoch is preserved.

Implementation-wise this is not 100% optimal, because it only works for datasets using get_seq_order_for_epoch().
Also, I had to add a nasty disable_horovod_partition attribute to only apply the partitioning on CombinedDataset level and not in the sub-datasets. But this is a very special case for this sampling sequence ordering, I think all other current meta dataset configurations should work with partitioning being done in the sub-datasets.

@patrick-wilken patrick-wilken requested review from albertz and a team as code owners October 11, 2021 17:13
@albertz

This comment has been minimized.

@albertz
Copy link
Member

albertz commented Oct 13, 2021

HDFDataset ... too costly to shuffle at run-time because of huge number of sequences

Why? What number? This sounds wrong. If the array of offsets fits into memory (which must be the case with our implementation), it should be fast to shuffle this.

It is this code:

seq_index = list(range(num_seqs))  # this is already big
...
from random import Random
rnd = Random(rnd_seed + self.random_seed_offset)
rnd.shuffle(seq_index)  # should not be a problem

If random.Random.shuffle is really a bottleneck here, we could use the Numpy version instead, which should be even much faster. If there is really an issue with this code, please open a separate issue on this. This should not be the case.

I would assume that list(range(num_seqs)) is also not really optimal, and the Numpy equivalent would be much faster. Although that would not work for all the other code. But we could maybe have some efficient code path.

The API of get_seq_order_for_epoch also returns a list[int] and not a Numpy array, which might confuse some other code. Not sure how much this is a problem.

But anyway, maybe you don't refer to "random" seg order but to "laplace"? This is slow because it needs to call get_seq_len for every single sequence.

However, this can also be solved in a different way, more like TF is doing it. We can do the laplace sorting on-the-fly (just like TF does bucketing on-the-fly). So you use "random" seq order on the HDFDataset, and then have some buffering queue which reads in N sequences (e.g. into memory) and then it sorts those N sequences. This logic could be implemented as another dataset (LaplaceSortingBufferDataset or so).

@albertz
Copy link
Member

albertz commented Oct 13, 2021

random_seed_offset has the desired effect on the CombinedDataset level, however the sequences that are sampled from the HDFDatasets are the same for all GPUs

You mean because you stick to "default" seq order for the HDFDataset?

But if you use "random" seq order for the HDFDataset, it would be fine? What's the problem with this?

@albertz
Copy link
Member

albertz commented Oct 13, 2021

With the new partition method, the sampling and shuffling will be done identical for all GPUs, but then a different partition is selected per GPU. This is done using the sequence ordering, so no data is loaded and then thrown away as with the shard method.

But this doesn't need to be the case for shard either, for this case. It's just that this is not really implemented and optimized well. See FeedDictDataProvider.get_next_batch.

Currently it does self.dataset.load_seqs(batch.start_seq, batch.end_seq), which has this problem you mention in some cases, for some datasets. Actually, if you use HDFDataset and disable its cache (which is anyway what I recommend), it does not have the problem, because load_seqs does nothing then.

Instead, we could do this only if no batch slicing (shard; specifically consider_batch_slice and self.batch_slice is not None here) is used.

Otherwise, the load_seqs would move inside the loop for seq in batch.seqs: and there we call self.dataset.load_seqs(seq.seq_idx, seq.seq_idx + 1) (continuous calls to load_seqs with the same seq idx should not be an issue).

And then it calls self.dataset.get_data(seq.seq_idx, k). So for HDFDataset without caching, it already should really only load those sequences which are really used.

With caching (which doesn't make sense though), or for some other datasets, maybe it needs the small changes I described.

So, in any case, I don't really see an advantage of the proposed partition multi-GPU dataset distribution method here.

@patrick-wilken patrick-wilken force-pushed the feature/horovod_dataset_partition branch from f40d178 to 69dacf4 Compare October 13, 2021 10:43
@patrick-wilken
Copy link
Contributor Author

You mentioned exactly the things I have been working on: 😄

  1. Sequence ordering, see More efficient get_seq_order_for_epoch() #568: I don't have a clean implementation yet, because there's a lot of duck typing going on, but using numpy arrays and ops, and python ranges where applicable improves things. Both speed and memory consumption.

  2. Array of offsets in HDFDataset (file_seq_start)

HDFDataset ... too costly to shuffle at run-time because of huge number of sequences

Why? What number? This sounds wrong. If the array of offsets fits into memory (which must be the case with our implementation), it should be fast to shuffle this.

Yes, besides the sequence order, this is the other O(total_num_seq) data structure when using HDFDataset. I got rid of it by storing the offsets themselves in the HDF file instead of the sequence lengths. This works well, but makes HDFDataset slower again because of the larger number of file accesses. Still fast enough to train large networks though.

But regardless of these improvements, if you have a huge dataset and you want to shuffle it you will always need to store an O(total_num_seq) list of integers. And for a large enough number of sequences and possibly multiple dataset instances when using horovod, memory consumption eventually becomes a problem.

Actually, if you use HDFDataset and disable its cache, it does not have the problem, because load_seqs does nothing then.

Oh, I didn't think of that. But then using CombinedDataset (or actually any CachedDataset2) is "the problem", as it actually loads and caches all the sequences from start to end in _load_seqs(). I tried the shard method and only got 80% computation time, but close to 100% with random_seed_offset and partition. So maybe improving get_next_batch as you suggested works for me. But still this improved shard method has some overhead, in particular get_seq_length is called in Dataset.iterate_seqs() for sequences that are not used eventually.

@albertz
Copy link
Member

albertz commented Oct 13, 2021

if you have a huge dataset and you want to shuffle it you will always need to store an O(total_num_seq) list of integers.

Ok, with #568, you get rid of the list(range(num_seqs)).

But what is num_seqs in your case? Even if this is very large, this should still not be too much a problem.

And then shuffling this also not.

@albertz
Copy link
Member

albertz commented Oct 13, 2021

Actually, if you use HDFDataset and disable its cache, it does not have the problem, because load_seqs does nothing then.

Oh, I didn't think of that. But then using CombinedDataset (or actually any CachedDataset2) is "the problem", as it actually loads and caches all the sequences from start to end in _load_seqs().

Ah yes. But with my proposed changes for FeedDictDataProvider.get_next_batch, this should be fixed.

Alternatively, we could also change CachedDataset2 such that it doesn't really do anything in load_seqs and the internally _load_seqs logic is done in get_data.

@albertz
Copy link
Member

albertz commented Oct 13, 2021

Btw, I just saw this Twitter post on large data shuffling behavior, which might be relevant for you.

@patrick-wilken
Copy link
Contributor Author

But what is num_seqs in your case? Even if this is very large, this should still not be too much a problem.
And then shuffling this also not.

Let's say 100M sequences. This is very-high ressource, but definitely realistic for MT data nowadays.

I used this script to simulate what is currently done in get_seq_order_for_epoch():

import random
from memory_profiler import profile

@profile
def profile_seq_order(num_seqs):
    seq_order = list(range(num_seqs))
    random.shuffle(seq_order)

profile_seq_order(100000000)

For me, it runs 53 minutes and uses 3.8 GB of RAM. (For 10M sequences it is 5 minutes, 0.4GB)

#568 proposes numpy.random.permutation(num_seqs) instead of line 6 and 7 in the script above, which for 100M sequences runs in 19s and uses 0.8GB of RAM. So still pretty acceptable. But let's say it is 500M sequences and you want to train on 8 GPUs, so 8 instances of the dataset...
I think it would be neat to be independent of total_num_seqs so we don't have to worry about this no matter how large the HDF files are. But ok, using #568, datasets up to around this size, and random_seed_offset should work for me.

I will also try your fix for shard.

@albertz
Copy link
Member

albertz commented Oct 13, 2021

Let's say 100M sequences. This is very-high ressource, but definitely realistic for MT data nowadays.

I used this script to simulate what is currently done in get_seq_order_for_epoch():

import random
from memory_profiler import profile

@profile
def profile_seq_order(num_seqs):
    seq_order = list(range(num_seqs))
    random.shuffle(seq_order)

profile_seq_order(100000000)

For me, it runs 53 minutes and uses 3.8 GB of RAM. (For 10M sequences it is 5 minutes, 0.4GB)

#568 proposes numpy.random.permutation(num_seqs) instead of line 6 and 7 in the script above, which for 100M sequences runs in 19s and uses 0.8GB of RAM.

For reference, using Numpy, uint32:

$  time python3 test.py
Filename: /Users/az/test.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     4     48.4 MiB     48.4 MiB           1   @profile
     5                                         def profile_seq_order(num_seqs):
     6    429.9 MiB    381.5 MiB           1       seq_order = numpy.arange(num_seqs, dtype="uint32")
     7    429.9 MiB      0.0 MiB           1       numpy.random.shuffle(seq_order)



________________________________________________________
Executed in   11.69 secs    fish           external
   usr time   11.72 secs  156.00 micros   11.72 secs
   sys time    0.45 secs  916.00 micros    0.45 secs

Numpy, uint64:

time python3 test.py
Filename: /Users/az/test.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     4     48.3 MiB     48.3 MiB           1   @profile
     5                                         def profile_seq_order(num_seqs):
     6    811.3 MiB    762.9 MiB           1       seq_order = numpy.arange(num_seqs, dtype="uint64")
     7    811.3 MiB      0.0 MiB           1       numpy.random.shuffle(seq_order)



________________________________________________________
Executed in    6.72 secs    fish           external
   usr time    6.76 secs  118.00 micros    6.76 secs
   sys time    0.52 secs  842.00 micros    0.52 secs

(Faster but more memory.)

This seems to be fast enough.

If you want it even faster, there are shuffling algorithms which would work more on-the-fly.

In the extreme case, we could also simply introduce another "random" sort scheme where you just do random.randint(0, num_seqs) on-the-fly for every new sequence. So get_seq_order_for_epoch would not be used. Or would return some list-like dynamic object. Like:

class _RandomList:
  def __len__(self):
    return num_seqs
  def __getitem__(self, i):
    return random.randint(0, num_seqs)

(Probably needs to be slightly more complex to cover some edge cases, but you get the idea...)

(If you want to implement and test this, please make this a separate PR, so we do not mix up things.)

@albertz
Copy link
Member

albertz commented Oct 13, 2021

I will also try your fix for shard.

Please also make this as a separate PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants