Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WithLatestFrom] implement operator for arity 1 and 2 #147

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

twittemb
Copy link
Contributor

@twittemb twittemb commented Apr 15, 2022

Hi.

This PR aims to propose a new operator: withLatest(from:).
It has been discussed in the forum here.

I am new to the evolution proposal process. This PR contains:

  • a proposal based on the sample + the guides.
  • the implemented operator with unit tests and documentation.

Don't hesitate to guide me through the process 😀.

Thanks.

Copy link
Member

@phausler phausler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a fantastic start!

@@ -26,7 +26,7 @@ extension _ErrorMechanism {
fatalError("materialized error without being in a throwing context")
}

internal func _rethrowGet() rethrows -> Output {
@usableFromInline internal func _rethrowGet() rethrows -> Output {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should avoid this; the _ErrorMechanism is a work-around for a compiler feature that is not yet there so ideally it should not be exposed into ABI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed :-)


func test_zipLatestFrom_uses_latest_element_from_other_when_base_produces_first_elements() async {
// Timeline
// base: |-1-2---3------4---------5-----
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these diagrams are really close to the validation diagram, however the use of | is kinda confusing.

Perhaps we can use some sort of combing function that is easily understandable to represent the tuple? e.g. we can use emoji so a tuple of a shape out of 🤍⬜️⚪️ along side a color out of 🟥🟧🟨🟩🟦🟪⬛️ results in a symbol of ❤️🟥🔴🧡🟧🟠... etc?

Doing a 3 variant might be hard; I still need to think on how to accomplish that.

Copy link
Contributor Author

@twittemb twittemb Apr 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refactored the diagrams a bit to make them clearer, tuple-wise

self.other.task.cancel()
throw error
}
lastKnownOtherState = await self.other.next()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this does not run the two sequences concurrently for iteration. effectively it is blocking iteration from one to the other. Perhaps a TaskGroup is in order here (like how zip does it)?

Copy link
Contributor Author

@twittemb twittemb Apr 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Other sequence is being iterated over in its own Task. The call to self.other.next() will return a value right away (either noElement, element(value) or nil). With the way it is implemented right now I don't see the benefit of having a TaskGroup.

However, the current implementation might not be the good one! What do you think of iterating over Other in a separate Task so that we can always have its latest known value at our disposal ? It comes with the drawback that the latest known value might not be the one that we think because the Task's execution is scheduled by the system and it might be paused for some reason.

Other question: with the current implementation if Other fails or finishes, we will have to wait for the next Base value to know it and end the async sequence. We might pick another strategy where we can end the async sequence as soon as the Other sequence ends.

We will have the same type of questions for operators like switchLatest()`.

/// - Parameter other: the other ``AsyncSequence``
/// - Returns: an ``AsyncZipLatestFromSequence`` where elements are a tuple of an element from `self` and the
/// latest known element (if any) from the `other` sequence.
@inlinable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling that these probably should not be inlinable. Usually the bar I used was "could this reasonably be used for byte streams?" If the answer was yes then it should be inlined/frozen etc. If the answer is meh that doesn't make any sense then they are ok to be resilient boundaries. If the answer is on the fence then it is done if and only if it can be done w/o exposing hacks/workarounds for the language etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed :-)

// Created by Thibault Wittemberg on 01/04/2022.
//

@preconcurrency import AsyncAlgorithms
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oof wat?
XCTest I get, but the framework we are building for async/await stuff; im sure this is just a copy/paste error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed :-) It was due to a lack of conformance to Sendable


/// ``AsyncZipLatestFromSequence`` is an ``AsyncSequence`` where elements are a tuple of an element from `base` and the
/// latest known element (if any) from the `other` sequence.
public struct AsyncZipLatestFromSequence<Base: AsyncSequence, Other: AsyncSequence>: AsyncSequence where Other: Sendable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these types probably need to be audited for being sendable. conditionally or not; my guess is that they will have similar constraints to zip/combineLatest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've conformed the types to Sendable, taking zip as a reference

final class TestZipLatestFrom: XCTestCase {}

// MARK: test for AsyncZipLatestSequence
extension TestZipLatestFrom {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to get some coverage numbers so that we can get an idea of how much is addressed in the implementation by these tests; generally we aimed to have 90%+ coverage. Obviously coverage does not mean we cover ALL permutations of usage but it definitely assuages stability concerns etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having some trouble getting the coverage in Xcode because of an error "Failed to merge raw profiles ...". Does that ring a bell?


@inlinable
public mutating func next() async rethrows -> Element? {
guard !Task.isCancelled else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the cancellation only happens at the start of the iteration? it does not respond immediately while the base here is going too iterated (or when the other is iterated?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am reading it correctly; I think that is fine... just wanted to make sure that was the intent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reworked the cancellation handling so that every iteration can catch it

@phausler phausler added the enhancement New feature or request label Apr 15, 2022
@parkera parkera added the v1.1 Post-1.0 work label May 4, 2022
@parkera
Copy link
Member

parkera commented May 4, 2022

Marking this as v1.1 because we should strongly consider it after our initial API stable 1.0 release.

Copy link
Member

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just left some smaller comments here already that I spotted

Sources/AsyncAlgorithms/AsyncWithLatestFrom2Sequence.swift Outdated Show resolved Hide resolved
self.other2 = other2
}

public func makeAsyncIterator() -> Iterator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure that only a single Iterator can be created. The problem with multiple iterators is that it leads to non-deterministic behaviour where elements are ending up in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be a bit more specific, I don't understand what you are asking for ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, the problem here is a bit broader to the AsyncSequence ecosystem and I am planning to write something up soon. Right now in all the AsyncSequence implementations the makeAsyncIterator method can be called multiple times. Each of these iterators is almost always using the same underlying storage. So the actual implementation is that the algorithms are unicast AsyncSequences however since they allow multiple iterators are getting into an awkward state where multiple Tasks can consume them now. This causes all kinds of issues like non-deterministic behaviour who gets an element. Furthermore, cancellation handling is becoming problematic since if two Tasks are consuming the same AsyncSequence and one gets cancelled it almost always cancels the other consumer as well. I currently consider it best practice to just guard that only a single Iterator can be created and fatalError on subsequent calls.

The current behaviour is fine for Sequences since the they don't have this time aspect to them and are most of the time referencing an underlying content. For our AsyncSequences the content is almost never buffered but consumed on the time stream.

As I said, I am planning to write more up on this soonish just wanted to make sure to put this out here already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation. I'm not sure I see a problem since a call to makeAsyncIterator() creates a new iterator that has its own properties and storage. Nothing is shared between several iterators.

Am I missing something ?

For AsyncSequences that have a storage shared across consumers (like AsyncChannel or an hypothetical AsyncShareSequence), the locking mechanism enforces the good behaviour right ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry I missed that the whole core implementation is inside the Iterator. While this makes things a bit better since the state doesn't interfere it makes still for some weird behaviour IMO:

let other = AsyncChannel<>
let base = AsyncStream<>

let sequence = base.withLatest(from: other)

Task {
    for await element in sequence {}
}


Task {
    for await element in sequence {}
}

You cannot tell from this code what is going to happen since the way all of the sequences involved in this little snippet right now allow creating multiple iterators. While your implementation is multi-cast all the other ones are not. IMO, what I personally think we should do is implement almost all our algorithms and base sequences as unicast (unless they have different semantics) and provide share/multicast algorithms. This makes reasoning about the code in the end way easier

let other = AsyncChannel<>
let base = AsyncStream<>

let sequence = base.shared().withLatest(from: other.shared())

I don't think you have to change your PR here right away since this is still in discussion and like I said I wanna open up a broader discussion around this soonish. Just wanted to share more details with you in why I think we need to be careful around these things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I understand. This is something I struggled with also. But it is mainly due to the fact that although AsyncChannel and AsyncStream are compatible with a multi producer/multi consumer paradigm, their output is not shared and the elements will be distributed across the consumers.

I guess this is what channels are for. If I refer to the Kotlin implementation it does exactly that.

I've open sourced a project AsyncExtensions before swift-async-algorithms was announced :-) and I'm trying to address notably the share operator in there, as well as shared "channels". Now that swift-async-algorithms has been announced, I'm rethinking my repo as a companion to swift-async-algorithms. It means I will add things that don't yet exist in swift-async-algorithms and try to suggest them to this main repo step by step.

The repo is a work in progress (https://github.com/sideeffect-io/AsyncExtensions/tree/refactor) and the README is not up-to-date but there are some interesting operators in there, that were challenging, like .switchToLatest().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the AsyncChannel being MPMC makes sense; however, I am still very torn if AsyncStream should be like that. AsyncChannel does handle cancellation better since it only removes the continuations for that specific Task whereas AsyncStream cancels the whole thing which makes it very hard to use safely.

But we are learning all of this along the way so we should just come up with some rules around what to expect!

W.r.t. your package. I really like it! I also have a rudimentary implementation of share lying around; however, as @phausler said we first need to stabilise the algos that are in here and then add more.

Copy link
Contributor Author

@twittemb twittemb Sep 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FranzBusch I've addressed all the comments that could be addressed without an external opinion.
Thanks again for your time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, speaking of AsyncChannel cancellation, I have on ongoing PR regarding cancellation when several producers in different tasks -> #184

Sources/AsyncAlgorithms/AsyncWithLatestFrom2Sequence.swift Outdated Show resolved Hide resolved
Comment on lines +90 to +99
var onBaseElement: (@Sendable (Base.Element) -> Void)?
var onOtherElement: (@Sendable (Other.Element?) -> Void)?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly believe we shouldn't insert code into these things just for testing. The behaviour should be fully observable from the public APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phausler do you have an opinion about that ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the more exposed APIs we have the more potential bugs/bad-interactions there could be. If we need some sort of mechanism for testing then we should figure out a way to observe the issue via the tests. Since the tests can control the stepping of the global executors, and we also control both the inputs AND the demand, we should be able to observe everything that is of importance in the tests.

Comment on lines +120 to +129
var onBaseElement: (@Sendable (Base.Element) -> Void)?
var onOther1Element: (@Sendable (Other1.Element?) -> Void)?
var onOther2Element: (@Sendable (Other2.Element?) -> Void)?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Sources/AsyncAlgorithms/AsyncWithLatestFromSequence.swift Outdated Show resolved Hide resolved
Sources/AsyncAlgorithms/AsyncWithLatestFrom2Sequence.swift Outdated Show resolved Hide resolved
@twittemb twittemb changed the title operators: add zipLatest(from:) operators: add withLatest(from:) Sep 12, 2022
@twittemb
Copy link
Contributor Author

twittemb commented Sep 12, 2022

Just left some smaller comments here already that I spotted

Hi @FranzBusch thanks for the review. I'll address them soon. As for the "ad-hoc" internal testing properties, I followed some advices given in this thread by Joe Groff (https://forums.swift.org/t/reliably-testing-code-that-adopts-swift-concurrency/57304/32). Testing concurrently executing code involving a time line is really challenging. In the case of withLatest(from:) we have to have a precise coordination that is deterministic because we have no control on the inner task execution for the other sequence.

What would you suggest to improve that ?

@FranzBusch
Copy link
Member

Hi @FranzBusch thanks for the review. I'll address them soon. As for the "ad-hoc" internal testing properties, I followed some advices given in this thread by Joe Groff (https://forums.swift.org/t/reliably-testing-code-that-adopts-swift-concurrency/57304/32). Testing concurrently executing code involving a time line is really challenging. In the case of withLatest(from:) we have to have a precise coordination that is deterministic because we have no control on the inner task execution for the other sequence.

What would you suggest to improve that ?

I am really torn on that suggestion, but I feel your pain points because I experienced the same in the merge and debounce implementation where certain scenarios can't be tested since you are not controlling the flow in the spawned Tasks. I think if we go forward with hooks for tests we should #if DEBUG to only have them in the tests but even that produces problems since we want to run the benchmarking tests in release mode.

I would like to get @phausler opinion here about test hooks for simulating timings.

@phausler
Copy link
Member

phausler commented Sep 12, 2022

So we can control the spawning of tasks! https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncSequenceValidation/Test.swift#L313 is where the tests do it today - and I could imagine that we could add either extra functionality to that to adjust the way we need... or we can use that dark magic to build some other really neat tools to control that in different ways.

What I think would be a good plan is to favor testing via what is public surface area as much as possible and then collect info on exactly what we need such that either a) we can get effort from a language/runtime layer or b) (ab)use some of the runtime functions to manipulate the execution into what we want to test. But that really requires some really good outlines of precisely what needs to occur in the most general sense.

@twittemb twittemb changed the title operators: add withLatest(from:) [Operators] add withLatest(from:) Nov 12, 2022
@twittemb twittemb changed the title [Operators] add withLatest(from:) [WithLatestFrom] implement operator for arity 1 and 2 Jan 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request v1.1 Post-1.0 work
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants