Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sessionQueues is full #871

Open
Yunustt opened this issue Nov 21, 2024 · 30 comments
Open

sessionQueues is full #871

Yunustt opened this issue Nov 21, 2024 · 30 comments

Comments

@Yunustt
Copy link

Yunustt commented Nov 21, 2024

Session command queue is full,Session command queue 1 is full executing action CONN LOST,In this case, the connection status is not accurate, the actual client is no longer connected, but listConnectedClients can still get the data of the client's connected status, how to deal with this, can only increase SESSION_QUEUE_SIZE

@Yunustt
Copy link
Author

Yunustt commented Nov 21, 2024

And MoquetteIdleTimeoutHandler also not triggered automatically kick the timeout of the client, I tested, the new client, or can be connected

@hylkevds
Copy link
Collaborator

The session queue being full is very bad and causes the dropped actions not being handled by the broker. So if the dropped action is CONN LOST, this means the broker never processes the fact that a connection was lost.

The default queue size of 1024 is not large enough for busy brokers, but there is no harm in significantly increasing this size. Try setting it to 5000 or 10000 or so.

@andsel Maybe we should try adding OpenMetrics/Prometheus support to be able to monitor the queue status?

@Yunustt
Copy link
Author

Yunustt commented Nov 21, 2024

I am now testing 3w client connections, disconnected, the default queue size has been set at 10000, but it still appears

@Yunustt
Copy link
Author

Yunustt commented Nov 21, 2024

Session command queue 0 is full executing action CONN Session command queue 0 is full executing action flushQueues
Session command queue 0 is full executing action flushQueues

@hylkevds
Copy link
Collaborator

3w is 30000, right?
Are these disconnects happening in a very short time?
Can you try setting the command queue size to 30000 for this test? That should ensure that at least the entire batch of disconnects fits in the queue. If the error still appears with that setting something else may be amiss.

@Yunustt
Copy link
Author

Yunustt commented Nov 21, 2024

yes

@Yunustt
Copy link
Author

Yunustt commented Nov 21, 2024

This inability to add queues dynamically is a problem that can't be solved if you don't know how much data the client is sending

@hylkevds
Copy link
Collaborator

A single client should never be able to cause a problem here, since each client is limited by the in-flight-message limit.
At the same time, the group of all clients is limited by the network stack. In almost all cases the network will naturally limit the number of actions arriving at the broker.

When testing, this is different, since there is no network stack, and many clients take the same action at the same time in a very unnatural way.

Dynamic queues have their own problems, especially when it comes to performance.

@Yunustt
Copy link
Author

Yunustt commented Nov 22, 2024

The broker is certainly not only for a single client, multiple data clients will take up SESSION_QUEUE_SIZE, especially if the number of cpu cores is low, would it be better to handle the logic directly in the older version of postoffice

@Yunustt
Copy link
Author

Yunustt commented Nov 22, 2024

The number of broker tests we have can vary from one hardware configuration to another

@sigangan
Copy link

hi all, I also got the 'queue is full' problem, maybe it is a bug in the code, show as follow:
the code snippet in the io.moquette.broker.SessionEventLoop class:

public void run() {
        while (!Thread.interrupted() || (Thread.interrupted() && !sessionQueue.isEmpty() && flushOnExit)) {
            try {
                // blocking call
                final FutureTask<String> task = this.sessionQueue.take();
                executeTask(task);
            } catch (InterruptedException e) {
                LOG.info("SessionEventLoop {} interrupted", Thread.currentThread().getName());
                Thread.currentThread().interrupt();
            }
        }
        LOG.info("SessionEventLoop {} exit", Thread.currentThread().getName());
    }

    public static void executeTask(final FutureTask<String> task) {
        if (!task.isCancelled()) {
            try {
                task.run();

                // we ran it, but we have to grab the exception if raised
                task.get();
            } catch (Throwable th) {
                LOG.warn("SessionEventLoop {} reached exception in processing command", Thread.currentThread().getName(), th);
                throw new RuntimeException(th);
            }
        }
    }

describe:
when the executeTask() method throw new RuntimeException(th) , then the while loop in the run() method will get out,
so it will never consume the commands from the sessionQueue, but the sessionQueue in that SessionEventLoop now is still accept new commands as the routeCommand() method in the SessionEventLoopGroup class is still work, so as more and
more commands added to the sessionQueue, it will eventually go to 'queue is full' as the while loop is never consume from the queue. so is it a bug? sorry for my poor english.

@hylkevds
Copy link
Collaborator

hylkevds commented Nov 22, 2024

hi all, I also got the 'queue is full' problem, maybe it is a bug in the code, show as follow:
the code snippet in the io.moquette.broker.SessionEventLoop class:

public void run() {
        while (!Thread.interrupted() || (Thread.interrupted() && !sessionQueue.isEmpty() && flushOnExit)) {
            try {
                // blocking call
                final FutureTask<String> task = this.sessionQueue.take();
                executeTask(task);
            } catch (InterruptedException e) {
                LOG.info("SessionEventLoop {} interrupted", Thread.currentThread().getName());
                Thread.currentThread().interrupt();
            }
        }
        LOG.info("SessionEventLoop {} exit", Thread.currentThread().getName());
    }

    public static void executeTask(final FutureTask<String> task) {
        if (!task.isCancelled()) {
            try {
                task.run();

                // we ran it, but we have to grab the exception if raised
                task.get();
            } catch (Throwable th) {
                LOG.warn("SessionEventLoop {} reached exception in processing command", Thread.currentThread().getName(), th);
                throw new RuntimeException(th);
            }
        }
    }

describe: when the executeTask() method throw new RuntimeException(th) , then the while loop in the run() method will get out, so it will never consume the commands from the sessionQueue, but the sessionQueue in that SessionEventLoop now is still accept new commands as the routeCommand() method in the SessionEventLoopGroup class is still work, so as more and more commands added to the sessionQueue, it will eventually go to 'queue is full' as the while loop is never consume from the queue. so is it a bug? sorry for my poor english.

Correct, but if a command throws an exception, we have an error state that can't be recovered from, so trying to continue will just make things worse.

@hylkevds
Copy link
Collaborator

The broker is certainly not only for a single client, multiple data clients will take up SESSION_QUEUE_SIZE, especially if the number of cpu cores is low, would it be better to handle the logic directly in the older version of postoffice

The older version was worse in every way, especially when it comes to behaviour under load. Plus, it had memory leaks and race conditions...

Like I stated, in a real-world scenario it is very unlikely that all clients take the same action at exactly the same time. If you run into queue overruns in a real-world scenario (which tests very much are not) then you can try to increase the queue size. If that doesn't help, then your broker needs better hardware to handle the load you're putting on it.

There may be other places in the code that need optimisation (like #841), but they are not caused by the queuing system, the queueing system may just be the first spot that notices it.

@Yunustt
Copy link
Author

Yunustt commented Nov 22, 2024

I get to 4w and I still get Session command queue 0 is full

@Yunustt
Copy link
Author

Yunustt commented Nov 22, 2024

The same operation may exist in batches on the simulated client

@Yunustt
Copy link
Author

Yunustt commented Nov 22, 2024

Whether the queue can be adapted to nosql, it can have persistence

@hylkevds
Copy link
Collaborator

If you can make a minimal demonstrator we can have a look, but with the little the information you've given we can't say more than we have.

@hylkevds
Copy link
Collaborator

是否这个队列可以适配nosql,就能够有持久化

Sorry, other than the word nosql I can't read that.

@Yunustt
Copy link
Author

Yunustt commented Nov 22, 2024

The main problem is that we can't process the lost operational information once it is full

@Yunustt
Copy link
Author

Yunustt commented Nov 22, 2024

是否这个队列可以适配nosql,就能够有持久化

Sorry, other than the word nosql I can't read that.

Whether the queue can be adapted to nosql, it can have persistence

@hylkevds
Copy link
Collaborator

Of course not. If the broker is overloaded, data is lost. There is no way around that in any architecture.
Moving the queue to any other implementation will be slower and will make things worse.

This is the internal command processing queue, it can not be persisted. If it overruns you are putting more load on your broker than your hardware can handle.

@Yunustt
Copy link
Author

Yunustt commented Nov 22, 2024

Do you have the results of the test hardware configuration and the number of clients

@sigangan
Copy link

hi all, I also got the 'queue is full' problem, maybe it is a bug in the code, show as follow:
the code snippet in the io.moquette.broker.SessionEventLoop class:

public void run() {
        while (!Thread.interrupted() || (Thread.interrupted() && !sessionQueue.isEmpty() && flushOnExit)) {
            try {
                // blocking call
                final FutureTask<String> task = this.sessionQueue.take();
                executeTask(task);
            } catch (InterruptedException e) {
                LOG.info("SessionEventLoop {} interrupted", Thread.currentThread().getName());
                Thread.currentThread().interrupt();
            }
        }
        LOG.info("SessionEventLoop {} exit", Thread.currentThread().getName());
    }

    public static void executeTask(final FutureTask<String> task) {
        if (!task.isCancelled()) {
            try {
                task.run();

                // we ran it, but we have to grab the exception if raised
                task.get();
            } catch (Throwable th) {
                LOG.warn("SessionEventLoop {} reached exception in processing command", Thread.currentThread().getName(), th);
                throw new RuntimeException(th);
            }
        }
    }

describe: when the executeTask() method throw new RuntimeException(th) , then the while loop in the run() method will get out, so it will never consume the commands from the sessionQueue, but the sessionQueue in that SessionEventLoop now is still accept new commands as the routeCommand() method in the SessionEventLoopGroup class is still work, so as more and more commands added to the sessionQueue, it will eventually go to 'queue is full' as the while loop is never consume from the queue. so is it a bug? sorry for my poor english.

Correct, but if a command throws an exception, we have an error state that can't be recovered from, so trying to continue will just make things worse.


I got the following exception log from the SessionEventLoop class ( the error maybe due to the bad network or there are two or more clients which has the same clientId, and they both will try to reconnect again and again, kick down each other from the broker every 20 seconds), after run sometime, I found the exception tip in the log, then i found the size of the queue is growing , and finally it got to full;

aaaaa

I add some log to show the size of the queue as follow ...

ba713f1e94057f1bf16fd1d53a44f90

@Yunustt
Copy link
Author

Yunustt commented Nov 22, 2024

当然不是。如果代理过载,数据就会丢失。任何架构都无法解决这个问题。 将队列移至任何其他实现都会更慢,并使情况变得更糟。

这是内部命令处理队列,无法持久化。如果超出,则代理的负载将超过硬件的处理能力。

In my opinion, client connection and disconnection should be separated from the SessionEvent used for pushing content, and connection and disconnection should be more important

@Yunustt
Copy link
Author

Yunustt commented Nov 22, 2024

大家好,我也遇到了“队列已满”的问题,也许是代码中的一个错误,如下所示:
io.moquette.broker.SessionEventLoop 类中的代码片段:

public void run() {
        while (!Thread.interrupted() || (Thread.interrupted() && !sessionQueue.isEmpty() && flushOnExit)) {
            try {
                // blocking call
                final FutureTask<String> task = this.sessionQueue.take();
                executeTask(task);
            } catch (InterruptedException e) {
                LOG.info("SessionEventLoop {} interrupted", Thread.currentThread().getName());
                Thread.currentThread().interrupt();
            }
        }
        LOG.info("SessionEventLoop {} exit", Thread.currentThread().getName());
    }

    public static void executeTask(final FutureTask<String> task) {
        if (!task.isCancelled()) {
            try {
                task.run();

                // we ran it, but we have to grab the exception if raised
                task.get();
            } catch (Throwable th) {
                LOG.warn("SessionEventLoop {} reached exception in processing command", Thread.currentThread().getName(), th);
                throw new RuntimeException(th);
            }
        }
    }

描述:当 executeTask() 方法抛出新的 RuntimeException(th) 时,run() 方法中的 while 循环将退出,因此它永远不会使用 sessionQueue 中的命令,但是该 SessionEventLoop 中的 sessionQueue 现在仍然接受新命令,因为 SessionEventLoopGroup 类中的 routeCommand() 方法仍然有效,因此随着越来越多的命令添加到 sessionQueue,它最终将进入“队列已满”状态,因为 while 循环永远不会从队列中使用。所以这是一个错误吗?抱歉我的英语不好。

正确,但是如果命令引发异常,我们就会陷入无法恢复的错误状态,因此尝试继续只会让事情变得更糟。

我从SessionEventLoop类中得到了如下异常日志(这个错误可能是由于网络不好或者有两个或多个客户端有相同的clientId,并且它们都会一次又一次地尝试重新连接,每隔20秒就会从broker上踢出对方),运行一段时间后,我在日志中发现异常提示,然后我发现队列的大小在不断增长,最后它已经满了;

啊啊啊 我添加了一些日志来显示队列的大小,如下所示... ba713f1e94057f1bf16fd1d53a44f90

What SESSION_QUEUE_SIZE is set to

@Yunustt
Copy link
Author

Yunustt commented Nov 22, 2024

Broker肯定不是只针对单个客户端的,多个数据客户端会占用SESSION_QUEUE_SIZE,特别是cpu核心数较低的情况下,直接在老版本的邮局里处理这个逻辑是不会太好

旧版本在各方面都比较糟糕,尤其是在负载下的表现。此外,它还存在内存泄漏和竞争条件……

正如我所说,在现实世界中,所有客户端几乎不可能同时采取相同的操作。如果您在现实世界中遇到队列超限(测试中很少遇到这种情况),那么您可以尝试增加队列大小。如果这没有帮助,那么您的代理需要更好的硬件来处理您施加的负载。

代码中可能还有其他位置需要优化(如#841),但这些不是由队列系统引起的,队列系统可能只是第一个注意到它的位置。

I would like to ask if the memory leak problem of the old version refers to the following code
public void internalPublish(MqttPublishMessage msg, final String clientId) {
final int messageID = msg.variableHeader().packetId();
if (!initialized) {
log.error("Moquette is not started, internal message cannot be published. CId: {}, messageId: {}", clientId,
messageID);
throw new IllegalStateException("Can't publish on a integration is not yet started");
}
log.info("Internal publishing message CId: {}, messageId: {}", clientId, messageID);
dispatcher.internalPublish(msg);
}
msg is not released?

@hylkevds
Copy link
Collaborator

If you run into exceptions, you should make separate issues for those, with the full stack trace. Otherwise we can't fix the underlying problem. So in your case the queue overrun is most likely a symptom of this exception.
In this case I'm guessing you're manually releasing the message you send to internalPublish, but the javadoc of that method states:

msg - the message to forward. The ByteBuf in the message will be released.

There were many issues with the reference counting of message buffers. If you search the older issues you'll find the related ones. internalPublish was also affected.

@Yunustt
Copy link
Author

Yunustt commented Nov 22, 2024

I changed the test server cpu to 4 cores, 50,000 sessionQueueSize, 30,000 client operation connections quickly, so far it seems stable, I will watch overnight.

@Yunustt
Copy link
Author

Yunustt commented Nov 22, 2024

如果遇到异常,您应该为这些异常单独创建问题,并提供完整的堆栈跟踪。否则我们无法修复底层问题。因此,在您的情况下,队列溢出很可能是此异常的症状。在 这种情况下,我猜您正在手动释放发送给的消息internalPublish,但该方法的 javadoc 指出:

msg - 要转发的消息。消息中的 ByteBuf 将被释放。

消息缓冲区的引用计数存在许多问题。如果您搜索较旧的问题,您会找到相关问题。internalPublish也受到了影响。

When will the next Release be released? I see a lot of code has been submitted since version 0.17

@andsel
Copy link
Collaborator

andsel commented Dec 28, 2024

@Yunustt version 0.18.0 has been released, you can grab from JitPack https://github.com/moquette-io/moquette/?tab=readme-ov-file#embedding-in-other-projects, it's a lot easier than publishing on Maven Central Repository

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants