Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS S3: Add getObjectByRanges to S3 API #2982

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

gael-ft
Copy link
Contributor

@gael-ft gael-ft commented May 23, 2023

Fixes #2981

Note I created a MergeOrderedN flow graph, because I didn't find a way to do the same using existing ones.
Happy to remove it if there's a way.

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that trying to control if the upstreams pre-fetch like you try to do is tricky since the sources could always contain their own buffers and start right away.

There's also a risk that the request is triggered right away and the response body stream times out with a subscription timeout from being queued based on the stream consumption speed.

A more safe solution, If I understand correctly what we are aiming for here, might be a ranges.mapAsync(parallelism)(range => fetchEntireRangeIntoFutureByteString)

*
* '''Cancels when''' downstream cancels
*/
@InternalApi private[impl] final class MergeOrderedN[T](val inputPorts: Int, val breadth: Int) extends GraphStage[UniformFanInShape[T, T]] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcatPreFetch or something like that would be more correct , merge in Akka streams generally mean emit in any order.

@gael-ft
Copy link
Contributor Author

gael-ft commented May 24, 2023

Hi @johanandren

Thanks for the review !

Note that trying to control if the upstreams pre-fetch like you try to do is tricky since the sources could always contain their own buffers and start right away.

There's also a risk that the request is triggered right away and the response body stream times out with a subscription timeout from being queued based on the stream consumption speed.

I assume this is the pupose of GraphStages.withDetachedInputs(...) right ?


A more safe solution, If I understand correctly what we are aiming for here, might be a ranges.mapAsync(parallelism)(range => fetchEntireRangeIntoFutureByteString)

Anyway I'll try this as it would be much shorter (no need MergeOrderedN) and looks like it does the same in the end

@gael-ft
Copy link
Contributor Author

gael-ft commented May 24, 2023

Well, that being said, don't like much ranges.mapAsync(parallelism)(range => fetchEntireRangeIntoFutureByteString).

If I am correct, it means no ByteString will be pushed downstream until the range is completely fetched, which would work, but waiting for it to complete is (sometimes) useless.

Maybe GraphStages.withDetachedInputs(...) would solve the problem ?

@johanandren
Copy link
Member

Good point on not getting the bytes of the first chunk in a streaming fashion. Hmmm. Possibly some combination of mapAsync + adding a buffer to the response byte source and then flatMapConcat to concatenate the results into the stream.

GraphStages.withDetachedInputs does not help as far as I can see, it adds a one element buffer on each input so that they will all be eagerly started.

@gael-ft
Copy link
Contributor Author

gael-ft commented May 24, 2023

Just want to be sure to fully understand the issue to fix it correctly (and to know what I should take care of in the future 😉):

Note that trying to control if the upstreams pre-fetch like you try to do is tricky since the sources could always contain their own buffers and start right away.

If they are, elements will come from the source buffer, not directly from the remote (S3 here), ot sure what is the problem here ?

There's also a risk that the request is triggered right away and the response body stream times out with a subscription timeout from being queued based on the stream consumption speed.

In the MergeOrderedN I am not pulling all inputs ports at the same time.
I only pull breadth inputs ports.
As well, I either push them downstream directly if I can (and should) or I put them in a queue and continue to process new elements.
There is no "wait until something happen to continue processing ongoing sources"

Example:

sources: S1, S2, S3
parallelism: 2

  • (preStart) -> pulling S1 & S2 only
  • S1 push & out available -> push downstream and pull it again
  • S2 push -> buffer and pull it again
  • S1 push & out not available -> buffer and pull it again
  • S1 complete (S1 has buffered elements, so we do nothing)
  • S2 push -> buffer and pull it again
  • out available and pull -> dequeue buffer of S1 -> S1 completed and buffer empty -> S2 become the "current" and we start to pull S3
  • out available and pull -> dequeue elements from buffer of S2
  • ...

I can't figure how I could trigger subscription timeout here ?
If source has its own buffer then problem is not there
If source has not we won't pull it until its time comes so no requests body pending subscription

With some imagination ... we could compare it with .concat no ?
Second source won't be pulled until the first completed, whetever the current state of the second source is. Am I missing something?


Good point on not getting the bytes of the first chunk in a streaming fashion. Hmmm. Possibly some combination of mapAsync + adding a buffer to the response byte source and then flatMapConcat to concatenate the results into the stream.

Hmmm ok, I'll play a bit and see if I can find something

@johanandren
Copy link
Member

All the range requests are done immediately on materialization, then your scheme is to only pull from the first N of the requests and backpressure the rest. The are two potential problems with it:

Any buffer introduced explicitly, or implicitly, for example by an async boundary, will detach the response stream and start consuming it right away for all requests.

If there is no boundary and the backpressure works: if you don't subscribe to those within a timeout (default 1 second, response-entity-subscription-timeout) those request streams will fail.

The only way to make sure the requests are only done N at a time would be to not trigger each request at all until it should run and start fetch data.

@gael-ft
Copy link
Contributor Author

gael-ft commented May 26, 2023

All the range requests are done immediately on materialization

Ok thanks clear enough to me now.

@gael-ft gael-ft requested a review from johanandren May 29, 2023 08:04
val endMarker = Source.single(ByteString("$END$"))
getObject(s3Location, Some(br), versionId, s3Headers).concat(endMarker).map(_ -> idx)
})
.statefulMapConcat(RangeMapConcat)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Composing each range-source with a buffer to allow parallel fetching, and then concatenating the resulting streams to get the resulting bytes out in the right order seems like it would achieve the same but much simpler.

Am I missing something clever that this does?

Copy link
Contributor Author

@gael-ft gael-ft May 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood correctly, you are thinking about conflate or something similar to buffer range sources.
Something like:

getObject(s3Location, Some(br), versionId, s3Headers).conflate(_ ++ _).concat(endMarker).map(_ -> idx)

As flatMapMerge may emit in any order, I still need the range idx to order (possibly buffered) bytes.
So output item of flatMapMerge will look like (ByteString, Long) and can be in any order (regarding the Long).

How can I order them back, without statefulMapConcat ? Range2 could emit before range1 is complete and range2 could be complete before range1.

Note I am not trying to buffer "next" range, if bytes of the "next" range are pushed, I'll push them directly downstream as buffering those bytes is useless (?).


As well, regarding buffers, was not sure if it was useful to "hard pull" upstreams until parallelism * rangeSize is consumed.
Something like:

//...
  .statefulMapConcat(RangeMapConcat)
  // Might be useful to consume elements of all flatMapMerge materialized upstreams 
  .batchWeighted(parallelism * rangeSize, _.size, identity)(_ ++ _)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking something like

Source(byteRanges)
  .mapAsync(parallelism)(range => 
    getObjectByRanges(...).buffer(size, Backpressure)
  ).flatMapConcat(identity)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But ofc that may not be good enough with buffer sized in chunks instead of bytes, we don't have a buffer with weighted size calculation though, maybe batchWeighted could do, not sure.

Copy link
Contributor Author

@gael-ft gael-ft Jun 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm can't make it work:

Tried with:

Source(byteRanges)
  .mapAsync(parallelism)(br => Future.successful(
    getObject(s3Location, Some(br), versionId, s3Headers).batchWeighted(rangeSize, _.size, identity)(_ ++ _)
  ))
  .flatMapConcat(identity)

and

Source(byteRanges)
  .mapAsync(parallelism)(br => Future.successful(
    Source.fromMaterializer { case (mat, _) =>
      getObject(s3Location, Some(br), versionId, s3Headers)
        .preMaterialize()(mat)
        ._2
        .batchWeighted(rangeSize, _.size, identity)(_ ++ _)
    }
  ))
  .flatMapConcat(identity)

But in both situations, ranges are fetched one by one and download perf looks like getObject.
Just like if .mapAsync(P)(_ => someSource).flatMapConcat(identity) was not enough to materalize P sources at the same time.
Leaving us with the flatMapMerge ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ofc, they aren't materialized so they can start consume bytes until flatMapConcat:ed, didn't think of that. Pre-materialization creates a running source but the downstream is not materialized until it is used, so you would need to put the batching before preMaterialize.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record I created an upstream issue with an idea that could make this kind of thing easier: akka/akka#31958 (continue with the current solution here though)

@He-Pin
Copy link

He-Pin commented Jul 30, 2023

@gael-ft I have submited a PR for flatmapConcat with inflight materialization, would you like to do some review, thanks.

case None =>
val exc = new NoSuchElementException(s"Object does not exist at location [${s3Location.mkString}]")
objectMetadataMat.failure(exc)
Source.failed(exc)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will close the downstream ASAP, do you want to defer it?

Copy link
Contributor Author

@gael-ft gael-ft Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It happens after getObjectMetadata result has been pulled so I am not sure of what that implies in this context ?

The idea is that no ObjectMetadata means no S3 object, so I think the source should fail as well as the materialized Future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add supports for Byte-Range Fetches when downloading from S3
3 participants