From 1a19e743b6c5de536e7f0130a4a0078e434ebad3 Mon Sep 17 00:00:00 2001 From: pedro Date: Wed, 11 Oct 2023 17:21:04 +0200 Subject: [PATCH] fix send ack if needed --- .../com/pedro/rtmp/rtmp/CommandsManager.kt | 20 +++++++++++++++++++ .../java/com/pedro/rtmp/rtmp/RtmpClient.kt | 1 + .../java/com/pedro/rtmp/utils/RtmpConfig.kt | 2 +- 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt b/rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt index cff396006..004d0d822 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt @@ -55,6 +55,8 @@ abstract class CommandsManager { var readChunkSize = RtmpConfig.DEFAULT_CHUNK_SIZE var audioDisabled = false var videoDisabled = false + private var bytesRead = 0 + private var acknowledgementSequence = 0 protected var width = 640 protected var height = 480 @@ -126,6 +128,7 @@ abstract class CommandsManager { val message = RtmpMessage.getRtmpMessage(input, readChunkSize, sessionHistory) sessionHistory.setReadHeader(message.header) Log.i(TAG, "read $message") + bytesRead += message.header.getPacketLength() return message } @@ -178,6 +181,21 @@ abstract class CommandsManager { } } + suspend fun checkAndSendAcknowledgement(socket: RtmpSocket) { + writeSync.withLock { + if (bytesRead >= RtmpConfig.acknowledgementWindowSize) { + acknowledgementSequence += bytesRead + bytesRead -= RtmpConfig.acknowledgementWindowSize + val output = socket.getOutStream() + val acknowledgement = Acknowledgement(acknowledgementSequence) + acknowledgement.writeHeader(output) + acknowledgement.writeBody(output) + output.flush() + Log.i(TAG, "send $acknowledgement") + } + } + } + @Throws(IOException::class) suspend fun sendVideoPacket(flvPacket: FlvPacket, socket: RtmpSocket): Int { writeSync.withLock { @@ -221,5 +239,7 @@ abstract class CommandsManager { commandId = 0 readChunkSize = RtmpConfig.DEFAULT_CHUNK_SIZE sessionHistory.reset() + acknowledgementSequence = 0 + bytesRead = 0 } } diff --git a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt index f298d8a57..6634341fa 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt @@ -321,6 +321,7 @@ class RtmpClient(private val connectCheckerRtmp: ConnectCheckerRtmp) { var socket = this.socket ?: throw IOException("Invalid socket, Connection failed") val message = commandsManager.readMessageResponse(socket) + commandsManager.checkAndSendAcknowledgement(socket) when (message.getType()) { MessageType.SET_CHUNK_SIZE -> { val setChunkSize = message as SetChunkSize diff --git a/rtmp/src/main/java/com/pedro/rtmp/utils/RtmpConfig.kt b/rtmp/src/main/java/com/pedro/rtmp/utils/RtmpConfig.kt index db5474412..162b3cd03 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/utils/RtmpConfig.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/utils/RtmpConfig.kt @@ -22,5 +22,5 @@ package com.pedro.rtmp.utils object RtmpConfig { const val DEFAULT_CHUNK_SIZE = 128 var writeChunkSize = DEFAULT_CHUNK_SIZE - var acknowledgementWindowSize = 0 + var acknowledgementWindowSize = Int.MAX_VALUE } \ No newline at end of file