Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add message history and retransmission #3199

Draft
wants to merge 28 commits into
base: main
Choose a base branch
from

Conversation

afullerx
Copy link
Contributor

@afullerx afullerx commented Jun 10, 2024

This PR attempts to resolve #3143 by adding a message history to outbox and providing for the retransmission of missed messages in order to resynchronize the client's state during a reconnection. If this cannot be accomplished, a reload of the page is triggered. The goal of this is to prevent a connected client's state from ever being out of sync with the server.

For the auto-index page, a history duration of 30 seconds was arbitrarily chosen. Since this value only determines when the UI is updated through resending messages instead of a page reload, the UI should stay properly synchronized regardless of this value.

For a ui.page, the history duration is computed based on the expected lifetime of the client object. Currently, with the default reconnect_timeout = 3.0, this is a max of 9 seconds. With this change, a re-evaluation of this default could be warranted. Now that UI state can be resynchronized indefinitely, discarding the user's page after only 5-9s of disconnection seems premature. See #3143 (comment) for more.


Open tasks (October 24, 2024):

  • message_history_length isn't being used
  • handle reconnect when next message ID has already been pruned
  • fix failing pytests
  • fix test_no_object_duplication_on_index_client
  • Should the auto-index client reload when trying to reconnect (because there is no message history)? -> No.
  • thorough test
  • test On Air

@afullerx afullerx marked this pull request as draft June 10, 2024 05:20
@afullerx
Copy link
Contributor Author

I discovered a potential edge case where the client can get out of sync. Converted to draft until I can investigate.

Also, I got an email about a failed test with Python 3.9. It looks like it's from before the tests are even run. Not sure what to do about this.

@falkoschindler
Copy link
Contributor

Thanks for starting this pull request, @afullerx! We're looking forward to reviewing your implementation once it's ready.

Regarding the failing test. Sometimes one of the "startup tests" fails because of some caching that takes longer than expected. This can safely be ignored. Next time the test will probably pass.

@afullerx
Copy link
Contributor Author

OK, I believe this pull request is good to go. The desync I was seeing was caused by two new issues I discovered in the current codebase. 

One is a race condition when multiple clients are connecting to an auto-index page. 

The other is due to a gap in time between when the webpage is generated and when updates can be received. This could actually be fixed using the new message history, but I think it's best left for a future PR. 

I'll submit issues and/or pull requests once this one is done.

@afullerx afullerx marked this pull request as ready for review June 10, 2024 08:32
@falkoschindler falkoschindler self-requested a review June 10, 2024 09:12
@afullerx
Copy link
Contributor Author

Regarding the pre-existing issue with missed updates due to a gap between page render and websocket connection. I realized I could fix it by just including a clients initial last_message_ID in the page render. Now the message history protects the initial connection as well. I'm not sure how long this gap can be, so I set the minimum history durations to 30 seconds. Maybe they should be longer.

Copy link
Contributor

@falkoschindler falkoschindler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I finally had a chance to take a look into your code. Amazing work!
Just a few thoughts:

  • Somehow a retransmission ID is added to every message from the message history, which is then broadcasted to all clients, where it is checked against the expected retransmission ID:
    for i in range(start, len(self._history)):
        args = self._history[i][2]
        args[1]['retransmit_id'] = retransmit_id
        self.enqueue_message('retransmit', args, '')
    if (
      data.message_id <= window.last_message_id ||
      ("retransmit_id" in data && data.retransmit_id != window.retransmitId)
    ) {
      return;
    }
    This seems like a lot of overhead. Can't we pass the socket ID of the handshaking client to synchronize() and send a custom "retransmit" message containing all missed messages? This way we wouldn't need to manipulate messages and filter them on the client.
  • What do you think about additional CPU and memory consumption? Now that we keep every message for at least 30 seconds, this can accumulate quickly when, e.g., streaming 3D data. Should we make the history length configurable?
  • We should check how the new retransmission works with ui.scene and ui.leaflet, because they use a separate "init" message for initialization. (Maybe we can solve their initialization problem more elegantly by introducing an on_handshake method to ui.element that is called whenever a client handshakes... But that's probably out of scope of this pull request.)
  • Before merging, @rodja and I should check if it works seamlessly with NiceGUI On Air.

@afullerx
Copy link
Contributor Author

afullerx commented Jun 17, 2024

Thanks for the feedback. Good idea about bundling the retransmissions into a single special message. However, I didn't see any way to send a message directly to a client connected via Air. We can still get almost all the benefit, as other clients will only need to filter a single infrequent message instead of checking every message.

I did think the history duration deserved a config option, but decided it wasn't my place to make that decision. I'll add a message_history_duration option. Setting it to zero will completely disable it and restore the previous behavior.

I'll also do some testing with ui.scene and ui.leaflet.

@afullerx
Copy link
Contributor Author

After being short on time for a bit, I was finally able to implement the improvements. I should be able to push the changes in the next couple days after I do some final testing.

@afullerx
Copy link
Contributor Author

afullerx commented Jul 4, 2024

I decided it's probably better to allow the user to configure the maximum number of history entries (message_buffer_max) rather than the history duration. This correlates better with both the memory needed and the size of the message backlog the client will have to handle. With a default of 1000 entries, this resulted in, at most, 2-3 MB of additional RAM usage with the message types I tested. 

I did some profiling of the message handling overhead, and it seemed pretty negligible. For example, on average, calls to _append_history() only took ~10μs. 

I realized the message history isn't needed to cover the initial connection for ui.page since outbox will hold messages until the connection is established. So, I set the history duration to the expected lifetime of the page. 

Since core.app.config isn't always available during the initialization of outbox, setting up the history buffer in loop() was the next best option. 

As far as I can tell, ui.scene and ui.leaflet are working fine with these changes. 

As a possible enhancement, when sync fails, instead of reloading the page, we could dump the entire state of the page (as we do on page render) and send it in a message. We would then just replace the element object with the up-to-date one. This is much faster and more seamless than a full page reload and, for a ui.page, doesn't result in the loss of state. I experimented with this, and it worked very well, but it seemed a little too experimental to include. I can imagine problems with components like ui.scene. I'm interested in your thoughts on this. I'd love to go this route if it's not going to cause too many problems.

@afullerx
Copy link
Contributor Author

afullerx commented Jul 7, 2024

Regarding the enhancement I mentioned in my previous post, if we can do a full state resync without a page reload when synchronize() fails, we could probably do without the message_buffer_max config option and make do with a smaller fixed-size history buffer. Since state would be seamlessly synced either way, it would just be about sizing it to the point where it stops being more efficient to sync through replaying messages. 

While ui.scene and ui.leaflet using a special "init" event isn't an issue for this PR in its current form, this would need to be reworked to implement this enhancement. We would need them to transmit their state whenever we do a full state resync in addition to the initial page load. 

Ultimately, I'm not sure if this would work out or not, but I think there's enough merit in the idea that I should take some time to fully explore it.

@afullerx afullerx marked this pull request as draft July 7, 2024 04:09
@falkoschindler
Copy link
Contributor

Oh, wow, this PR keeps on growing... But it is certainly a good idea to re-evaluate our options and to think about the best path forward, before spending more time with the implementation or even merging something that hinders us later.

The special initialization of ui.scene and ui.leaflet is definitely something I'd love to get rid of. I expect this to be quite some work since we need to re-implement (part of) their object model in Python. But maybe I overestimate its complexity and it would be a valuable groundwork before continuing to work on the general message transmission.

@afullerx
Copy link
Contributor Author

afullerx commented Jul 13, 2024

I decided that doing a full-state resync without reloading is going to be a no-go. I was able to get it working pretty well in most cases by having ui.leaflet and ui.scene clear their contents and basically reinitialize. But since they can have their state altered by arbitrary method calls (e.g., the "change icon" example for leaflet), this wouldn't be safe to do in all cases. 

Anyway, I believe this PR is ready for review again. Some other improvements I made: 

  • Fixed compatibility with On Air by also adding synchronize() to its "handshake" handler.
  • The "sync" event is emitted directly by synchronize() now. This fixes an edge case with out-of-order messages when messages are queued while the sync is performed. 
  • The front end now keeps a list of all its past socket IDs. This is then used by synchronize() to filter out messages intended for other targets.

@afullerx afullerx marked this pull request as ready for review July 13, 2024 03:33
Copy link
Contributor

@falkoschindler falkoschindler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @afullerx, I finally found enough time and focus to review your pull request.

I made just a few minor changes:

  • I made use of the Message type to simplify argument lists and type annotations a bit.
  • I think _message_count should always be increased when emitting a message.
  • Instead of ignoring a type error we can safely assert that self._history is not None.
  • I restructured the sync method in JavaScript using early exits and destructuring.

Apart from that, I have some more thoughts I'd like to clarify:

  • As far as I understand setting message_buffer_max to 0 disables the deque, which behaves differently than set maxlen=0? Or could we assume to always work with a deque, just sometimes with zero length?

  • I thought about creating the deque in the initializer with a default length of 1000, and changing it in loop() according to message_buffer_max. The maxlen attribute is readonly, but we could create a copy like d = deque(d, maxlen=...). But what would we do it the current deque already contains more messages than the new maxlen?

  • > The front end now keeps a list of all its past socket IDs. This is then used by synchronize() to filter out messages intended for other targets.

    We should propable prune these socket IDs...

  • Maybe there is a better parameter name than message_buffer_max. Maybe message_history_length?

  • In client.js we compare msg.target against window.socket.id. I think we can avoid sending sync messages to the wrong clients in the first place like this: await self._emit('sync', {...}, socket_ids[-1]).

  • You're adding message_id to data and removing it again on the client. Couldn't this interfer with the other payload? Maybe it's better to keep this attribute separate, even if this would complicate the data structure of a history item once again.

@falkoschindler falkoschindler added the enhancement New feature or request label Jul 27, 2024
@afullerx
Copy link
Contributor Author

afullerx commented Jul 28, 2024

Thanks for the review, @falkoschindler. I like the changes. There's only one issue I saw.

I think _message_count should always be increased when emitting a message.

If we increment _message_count when a "sync" message is sent, we would have to modify synchronize(). It currently relies on _history containing an uninterrupted sequence of message IDs.

Regarding your other points:

  • As far as I understand setting message_buffer_max to 0 disables the deque, which behaves differently than set maxlen=0? Or could we assume to always work with a deque, just sometimes with zero length?
  • I though about creating the deque in the initializer with a default length of 1000, and changing it in loop() according to message_buffer_max. The maxlen attribute is readonly, but we could create a copy like d = deque(d, maxlen=...). But what would we do it the current deque already contains more messages than the new maxlen?

I considered this, but it required hard-coding the default length in outbox and I wasn't sure if this was good practice. Another option would be to initialize it with maxlen=0. This would avoid the hard-coding, but result in the deque being created twice in the typical case.

In either case, we wouldn't actually have to copy the deque or worry about how many messages it contains since _history will always be empty before loop() gets executed.

The front end now keeps a list of all its past socket IDs. This is then used by synchronize() to filter out messages intended for other targets.

We should propable prune these socket IDs...

I figured the number of socket IDs is unlikely to get very high. In the case of many reconnections, a page reload is likely to be triggered before long. The problem with pruning is that it's hard to know when socket IDs age out of the message history. We could just cap the list at a sufficiently large number to make sure any removed IDs are no longer in the history.

Maybe there is a better parameter name than message_buffer_max. Maybe message_history_length?

Whichever you think is best is fine with me. I thought it was basically a toss-up between the two.

In client.js we compare msg.target against window.socket.id. I think we can avoid sending sync messages to the wrong clients in the first place like this: await self._emit('sync', {...}, socket_ids[-1]).

I actually tried this at one point, but it broke compatibility with On Air.

You're adding message_id to data and removing it again on the client. Couldn't this interfer with the other payload? Maybe it's better to keep this attribute separate, even if this would complicate the data structure of a history item once again.

I did consider wrapping the payload in an "envelope" dict with the message ID. I was trying to keep per-message overhead as low as possible, so I ultimately decided against it. If the overhead is acceptable, it would be more robust.

@afullerx
Copy link
Contributor Author

I made additional changes to address some of the remaining concerns. Most of them are explained by my previous post and the commit messages. 

One additional change is that I realized that previous socket IDs only need to be kept by the client temporarily. Once the sync operation is complete, all previous socket IDs become irrelevant.

As far as maintaining the length of the history using the maxlen parameter, I ultimately decided to leave it alone. None of the alternatives seem clearly superior to the current implementation. 

I didn't change it so that the "sync" message is emitted directly to the client's socket ID because this didn't work with On Air in my previous testing

@falkoschindler
Copy link
Contributor

Sorry, @afullerx, your pull request is not forgotten. I just didn't find the time and focus to dive into the details of this implementation once again, especially analyzing the issue with NiceGUI On Air. But it's still on my list. I'll be traveling over the next two weeks, so I hope to continue working on it by the end of September.

@falkoschindler
Copy link
Contributor

falkoschindler commented Oct 12, 2024

I finally had another look into this pull request. It looks like we're almost good. I'm just experimenting with sending sync messages directly to the socket ID doing the handshake, so that we can remove the windows.socketIds array. Apart from that, NiceGUI On Air doesn't seem to work at all:

  File "/Users/falko/Projects/nicegui/nicegui/air.py", line 127, in _handle_handshake
    await client.outbox.synchronize(data['last_message_id'], data['socket_ids'])
                                    ~~~~^^^^^^^^^^^^^^^^^^^
KeyError: 'last_message_id'

It looks like we need to adjust the On Air server implementation test with a local server instance.

@afullerx
Copy link
Contributor Author

Thanks for revisiting this, @falkoschindler. I believe the KeyError exception is due to the browser using the unmodified nicegui.js. I was able to get it working using the "Redirector" and "CORS Unblock" browser extensions to force the use of the updated version. I imagine you have a more straightforward way of testing this, and that is what you meant by "test with a local server instance."

The only thing that didn't work with On Air was emitting the sync message directly to the client using it's socket ID. I assumed this was because the On Air server wasn't properly forwarding messages targeted in this way.

Even if this were remedied, I believe we would still need windows.socketIds to keep track of the clients past SIDs. Since the history can contain messages targeted specifically at other clients, we need a way to filter them out. As an alternative to the current approach, we could send all messages and filter them on the client. 

@falkoschindler falkoschindler self-assigned this Oct 17, 2024
@falkoschindler
Copy link
Contributor

While discussing this pull request with @rodja, we decided to simplify the whole retransmission logic by excluding the shared auto-index page. We can include it later if we really want to. But for the moment we chose simplicity over completeness.

This way every message is sent to one client only and we can simply keep it in the already existing message queue. A message_index marks the current position in the queue from where to send the next message. If a client reconnects, it simply asks the outbox to move the index back to the position of the next expected message ID. All older messages can be pruned.

It still needs some testing with On Air and elements like ui.scene. But I'm optimistic.

@falkoschindler
Copy link
Contributor

Local tests with ui.log and ui.scene seem to work well:

import random
import time
from nicegui import ui

@ui.page('/', reconnect_timeout=10.0)
def page():
    log = ui.log()
    ui.timer(1.0, lambda: log.push(f'{time.time():.0f}'))

    scene = ui.scene()
    ui.timer(1.0, lambda: scene.sphere().scale(0.5).move(random.random() - 0.5, random.random() - 0.5, random.random()))

@falkoschindler falkoschindler marked this pull request as draft October 24, 2024 18:13
@falkoschindler
Copy link
Contributor

I forgot to handle the case when client reconnects too late and the message history isn't long enough. I'll add that tomorrow.

@falkoschindler
Copy link
Contributor

falkoschindler commented Oct 25, 2024

Apparently, updates based on running methods like "update_grid" are broken:

grid = ui.aggrid({'columnDefs': [{'field': 'name'}], 'rowData': []})

def update():
    grid.options['rowData'].append({'name': 'Alice'})
    grid.update()

ui.button('Update', on_click=update)

The update message might be enqueued in a wrong place. But changing

self.messages.append((self.client.id, self.next_message_id, time.time(), 'update', data))

to

self.messages.insert(self._message_index, (self.client.id, self.next_message_id, time.time(), 'update', data))

didn't help immediately.

@falkoschindler
Copy link
Contributor

Ah, inserting the update message is basically correct, but it messes up the order of message IDs.

@falkoschindler
Copy link
Contributor

@rodja Tests are green, ready for review.
But before merging I'd like to test a little more, also On Air and on some of our robots.

@falkoschindler falkoschindler added this to the 2.6 milestone Oct 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

Confusing desync with blocking calls
2 participants