Skip to content

Dispatch Experiment#65

Merged
Simn merged 23 commits intomasterfrom
dispatcher_element
Jan 21, 2026
Merged

Dispatch Experiment#65
Simn merged 23 commits intomasterfrom
dispatcher_element

Conversation

@Aidan63
Copy link
Contributor

@Aidan63 Aidan63 commented Jan 17, 2026

Closes #63

Here's a bit of a dispatcher experiment, this removes the scheduleObject out of the scheduler. Using the blocking run builders the default is the usual EventLoopScheduler and a TrampolineDispatcher (This is a slightly fancier SelfDispatcher which avoids stack overflow / recursion issues).
The Async convenience functions now use the dispatcher for the async-ness, so when something like delay uses the scheduler to schedule execution in the future and then resumes asynchronously it's doing a scheduler -> dispatcher execution. In my mind the scheduler is the "when does this run" and the dispatcher decides "where this runs".

I'm not sure this is useful or what it solves. Maybe if you have some complex situations where you have one thread running a libuv loop and another running a SDL loop you could have a Libuv scheduler which then dispatches onto the SDL event loop?

@Aidan63 Aidan63 marked this pull request as draft January 17, 2026 15:29
@Aidan63 Aidan63 changed the title [Draft] Dispatch Experiment Dispatch Experiment Jan 17, 2026
@Simn
Copy link
Member

Simn commented Jan 17, 2026

I think I like this. The part that has been tripping me up is how to go from the scheduler to the dispatcher, but by scheduling a function that then resumes the continuation asynchronously we achieve that, as you explain. That means we have to document that what we pass to schedule is always executed in the loop thread, but other than that I see no problem with this approach.

By the way, you can do with(schedulerComponent, dispatcherComponent).

@Aidan63
Copy link
Contributor Author

Aidan63 commented Jan 17, 2026

Yeah, when you schedule a function through Scheduler where it runs is "implementation defined", so we bounce through the dispatcher to make sure we're in the right place.

One area I'm not sure about is the callSync and callAsync helpers, isn't the asynchronisitc now the responsibility of the dispatcher so we should always go through it?

only 6 left
@Simn
Copy link
Member

Simn commented Jan 17, 2026

We'll definitely have to be careful with the sync/async thing here, but I do think synchronous succeeding and failing has a purpose. Here in CoroBaseTask for example:

	public function awaitContinuation(cont:IContinuation<T>) {
		switch (state.load()) {
			case Completed:
				cont.succeedSync(result);
			case Cancelled:
				cont.failSync(error);
			case _:
				awaitingContinuations ??= [];
				awaitingContinuations.push(cont);
				start();
		}
	}

In this case we want to stay on the synchronous path because our task is already completed, so there's nothing to resume asynchronously.

@Simn
Copy link
Member

Simn commented Jan 17, 2026

Oh, just when I thought I understood the basic idea you revert the exact part that I thought made it all work...

Also, we need a way to set up a working Haxe/hxcoro combination with two PRs. I thought that we could just open a Haxe PR from your branch in order to get a Haxe build and use it here, but Haxe CI is going to fail without this PR too.

@kLabz Could you modify the setup on the Haxe side so that we can point to a specific hxcoro branch in a single place? That way we could make Haxe point to the branch here in order to get a build, and then use that build here.

... of course we only get a build once the tests here are actually passing, but I suppose that's part of the challenge.

This reverts commit cecaff7.
@Aidan63
Copy link
Contributor Author

Aidan63 commented Jan 17, 2026

I only changed that to see if it changed one of the failing tests (it did not), so I've reverted it as I think it does make sense. The last few failing tests seem to be around cancellation "ordering", such as #47's tests.

final cancelCause2 = new CancellationException();
task.cancel(cancelCause2);
Assert.isTrue(task.isActive());

isActive now returns false as the cancel call only only goes through the trampoline dispatcher rather than a trip through the event loop. Need to see if this new behaviour is actually good and the test needs updating, or we need the old behaviour the test is checking for.

@Simn
Copy link
Member

Simn commented Jan 17, 2026

I think that test is bad, I see no reason why the task would still need to be considered active after that cancel call.

@Simn
Copy link
Member

Simn commented Jan 18, 2026

I'm not sure if we should specify the isFalse either. Maybe it's better to make assumptions about the activity state of a task only after an await call.

@Aidan63
Copy link
Contributor Author

Aidan63 commented Jan 18, 2026

I've updated that test to expect the active state to be false instead, I've also ignored two unbounded channel tests related to prompt cancellation. Looking at why those tests are failing reminded me that I meant to open an issue about cancellation polling (#67). With that the tests are passing when combined with the haxe side changes.

@Aidan63
Copy link
Contributor Author

Aidan63 commented Jan 18, 2026

That does make sense, I've removed the asserts which were occuring before the await call.

My one reservation with this is how the user interacts with it, take the sample I posted in #63, the user now need to create a thread pool dispatcher instead of a thread pool scheduler.

final channel   = Channel.createUnbounded();
final dispatcher = new ThreadPoolDispatcher();

@:coroutine function foo(node) {
	// pretend this coroutine is running on some frameworks UI thread
	// the current scheduler would be MyUiFrameworkScheduler or whatever

	scope(node -> {

		// Spin up a couple of coroutines to compress the file bytes on the task pool

		for (i in 0...4) {

			node.with(scheduler).async(_ -> {
				final out = new Out();
				while (channel.reader.waitForRead()) {
					if (channel.reader.tryRead(out)) {
						final data       = File.readAllBytes(out.get()); // suspension
						final compressed = Zip.compress(data) // blocking

						File.writeAllBytes(out.get(), compressed); // suspension
					}
				}
			});

		}
	});
}

If the dispatcher were replaced but not the scheduler then anything which calls delay or invokes the scheduler will go through that (which might be a UI thread / event loop / different thread) before being bounced back to the dispatcher. I think that should be fine in terms of it will work (as long as the scheduling mechanism is continually pumped), but does it seem a bit odd?

@Aidan63
Copy link
Contributor Author

Aidan63 commented Jan 18, 2026

yield is also an odd one with these changes. On paper I think it makes sense for it to be implemented as the following instead of being delay(0).

@:coroutine @:coroutine.nothrow public static function yield():Void {
	suspend(cont -> cont.callAsync());
}

There are two major changes with this though, first, yield is no longer cancellable, second is that depending on the dispatcher it may not actually do anything "async", e.g. with our default TrampolineDispatcher. Some tests around cancellation also hang with this change.
Maybe these are fine and more "correct" and we've just been using yield in odd ways this entire time.

@Simn
Copy link
Member

Simn commented Jan 18, 2026

I think yield should remain cancellable because that's what's expected of built-in suspension points.

As for the scheduler thing, I'm still not convinced that this should be a distinct context element. What's the use case for custom schedulers if we have have custom dispatchers?

@kLabz
Copy link
Contributor

kLabz commented Jan 19, 2026

@kLabz Could you modify the setup on the Haxe side so that we can point to a specific hxcoro branch in a single place? That way we could make Haxe point to the branch here in order to get a build, and then use that build here.

... of course we only get a build once the tests here are actually passing, but I suppose that's part of the challenge.

Uh. The setup modification sounds easy enough, but not sure what to do with that chicken and egg situation.. 🤔

Maybe we could proceed with deploy if only 3rd party tests fail, for example?

@Simn
Copy link
Member

Simn commented Jan 19, 2026

I was worried about that too, but isn't it the case that the Haxe side will just work if it points to the correct hxcoro branch? In that case what we set as the Haxe version here doesn't matter because this CI isn't involved. And then once Haxe CI succeeds we can update the Haxe version here to make it work.

@kLabz
Copy link
Contributor

kLabz commented Jan 19, 2026

Are tests expected to fail atm with this branch? Maybe I need a specific Haxe branch too?

https://github.com/HaxeFoundation/haxe/actions/runs/21140877335/job/60794478583#step:10:341

@Simn
Copy link
Member

Simn commented Jan 19, 2026

Yes, the branch is this one: https://github.com/HaxeFoundation/haxe/tree/coro_dispatcher

@kLabz
Copy link
Contributor

kLabz commented Jan 19, 2026

@Simn
Copy link
Member

Simn commented Jan 19, 2026

Indeed, I have updated the package. Which also made me realize that IScheduleObject is no longer a fitting name for what we're doing here, but we can update that once we have a working CI setup.

And yes, utest, argh... I guess that needs another custom branch for now.

@Aidan63
Copy link
Contributor Author

Aidan63 commented Jan 19, 2026

Renaming IScheduleObject was on my mind as well.

As for the scheduler thing, I'm still not convinced that this should be a distinct context element. What's the use case for custom schedulers if we have have custom dispatchers?

Are you thinking that we shouldn't expose changing the scheduler or merge the two concepts back together somehow?

@Simn
Copy link
Member

Simn commented Jan 19, 2026

My current thinking is that the scheduler should be a property of the dispatcher, so that we can do context.get(Dispatcher).scheduler. Schedulers can be passed into the dispatcher constructors, which allows, among other things, the VirtualTimeScheduler usage by creating a new TrampolineDispatcher(new VirtualTimeScheduler()). This should probably be optional though, I think most dispatchers could use a default scheduler if none is provided.

@Simn
Copy link
Member

Simn commented Jan 20, 2026

That looks quite good to me now!

I kind of forgot that Dispatcher is an abstract class and not an interface, which makes me wonder about the scheduler field. It's not really a problem except that it means we can't use a real field named scheduler in any concrete class, which seems a little annoying. But that's perhaps more of a compiler problem.

Either way, I'm happy to proceed with this approach if you have no further reservations. It means that I'll have to update some documentation mentioning schedulers, but I'll survive that.

@Simn
Copy link
Member

Simn commented Jan 20, 2026

I just ran the benchmarks because I expect this to be faster (higher is better):

what master dispatcher_element
yield
100 10,630 15,440
1000 1,676 2,771
10000 164 268
asyncYield
100 6,457 10,071
1000 727 881
10000 69 91
lazyField
100 8,170 10,804
1000 840 967
10000 79 103
hoisting
100 15,434 25,998
1000 1,595 2,813
10000 160 264
channel
100 2,585 5,007
1000 255 599
10000 22 61

It is indeed faster across the board, which makes sense because these dispatch calls no longer have to go through the heap.

This also makes me realize that yield() has to go through the scheduler. If it went through the dispatcher, a while(true) { yield(); } loop would eternally bounce on the trampoline in single-threaded execution.

@Aidan63
Copy link
Contributor Author

Aidan63 commented Jan 20, 2026

Yield is a weird one, having it be a schedule 0 feels weird but with our trampoline dispatcher that while true does bounce forever. But it being dispatch based would probably make more sense on other dispatchers, e.g. I was thinking an SDL dispatcher would push the function onto the event loop which wouldn't have this "bounce forever" issue.

@Simn
Copy link
Member

Simn commented Jan 20, 2026

Wouldn't the new SDL dispatcher push everything onto the event loop anyway? In that case I don't see much value in bypassing the scheduler for this specific case. If anything always going through the scheduler for delay and yield gives us more consistency.

@Aidan63
Copy link
Contributor Author

Aidan63 commented Jan 20, 2026

Another thing I just remembed that needs to be done, the trampoline needs to be thread local. Each thread needs it's own trampoline so different threads can't throw stuff onto a different threads trampoline. This is another thing I took from Rx and in those libraries you don't create a trampoline scheduler, you get it through calls like CurrentThreadScheduler.Instance in Rx.NET and Schedulers.trampoline() in RxJava, so maybe we want something like that as well.

# Conflicts:
#	src/hxcoro/CoroRun.hx
@Simn
Copy link
Member

Simn commented Jan 20, 2026

The utest situation has been resolved with #54 and haxe-utest/utest#136. It now uses CoroRun.promise so it should work independently of what we're doing here.

Now we only make the two branches work with each other.

@kLabz
Copy link
Contributor

kLabz commented Jan 20, 2026

js tests now pass https://github.com/HaxeFoundation/haxe/actions/runs/21176832134

Party tests still have an error

 14 | class ThreadPoolDispatcher extends Dispatcher implements IDispatcher {
    |       ^^^^^^^^^^^^^^^^^^^^
    | This class extends abstract class haxe.coro.dispatchers.Dispatcher but doesn't implement the following method

        | Implement it or make hxcoro.dispatchers.ThreadPoolDispatcher abstract as well

         ->  /usr/local/share/haxe/std/haxe/coro/dispatchers/Dispatcher.hx:14: characters 27-40

         14 |  public abstract function get_scheduler() : Scheduler;
            |                           ^^^^^^^^^^^^^
            | get_scheduler()

https://github.com/HaxeFoundation/haxe/actions/runs/21176832134/job/60908043960#step:10:2098

Should I push to HaxeFoundation/haxe#12522 ?

but don't use it because LuvScheduler needs an update too
@Simn
Copy link
Member

Simn commented Jan 20, 2026

I've updated ThreadPoolDispatcher. Had to disable its actual usage for Eval because LuvScheduler needs an update first, but it should work now.

And yes, please push to that PR's branch!

@Simn
Copy link
Member

Simn commented Jan 20, 2026

LuvScheduler doesn't work anymore at the moment, somehow these async_t things don't do what I want them to do. But there's a chance that this never properly worked in the first place, so it's not every important right now. I'll figure it out at some point.

@Simn
Copy link
Member

Simn commented Jan 20, 2026

8787aab made Eval hang on TestCallStack.test. Will check tomorrow what's happening.

@Simn
Copy link
Member

Simn commented Jan 20, 2026

Found it already. That's probably the only test where we have a nested CoroRun entrypoint, and with the static value the dispatchers then shared the same trampoline. I'm not sure about the details but I think with trampoline.running being true from the start it only pushes the object onto the queue and nothing else ever happens.

Simn added 3 commits January 21, 2026 08:31
I don't think this is completely correct yet because we likely need a better synchronization between AsyncDeque add/close, but it's enough to get the tests working again.
I think I understand why, but let's deal with this later.
@Simn Simn marked this pull request as ready for review January 21, 2026 08:20
@Simn
Copy link
Member

Simn commented Jan 21, 2026

This should be good to go. We can have fun with trampolines and the luv scheduler in a separate PR. I'll look into merging things in the correct order.

@Simn Simn merged commit c5d433b into master Jan 21, 2026
46 checks passed
@Aidan63 Aidan63 deleted the dispatcher_element branch January 21, 2026 21:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Dispatcher Context Element

3 participants