Skip to content

Commit

Permalink
Fix a rare issue with incorrect message order when sending multiple m…
Browse files Browse the repository at this point in the history
…essages while offline
  • Loading branch information
laevandus committed Jul 22, 2024
1 parent 8a9d94f commit f058c43
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 65 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

# Upcoming

### 🔄 Changed
## StreamChat
### 🐞 Fixed
- Fix a rare issue with incorrect message order when sending multiple messages while offline [#3316](https://github.com/GetStream/stream-chat-swift/issues/3316)

# [4.60.0](https://github.com/GetStream/stream-chat-swift/releases/tag/4.60.0)
_July 18, 2024_
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import Foundation
extension EndpointPath {
var shouldBeQueuedOffline: Bool {
switch self {
case .sendMessage, .editMessage, .deleteMessage, .pinMessage, .unpinMessage, .addReaction, .deleteReaction:
case .editMessage, .deleteMessage, .pinMessage, .unpinMessage, .addReaction, .deleteReaction:
return true
case .createChannel, .connect, .sync, .users, .guest, .members, .search, .devices, .channels, .updateChannel,
.deleteChannel, .channelUpdate, .muteChannel, .showChannel, .truncateChannel, .markChannelRead, .markChannelUnread,
.markAllChannelsRead, .channelEvent, .stopWatchingChannel, .pinnedMessages, .uploadAttachment, .message,
.replies, .reactions, .messageAction, .banMember, .flagUser, .flagMessage, .muteUser, .translateMessage,
.callToken, .createCall, .deleteFile, .deleteImage, .og, .appSettings, .threads, .thread, .markThreadRead, .markThreadUnread, .polls, .pollsQuery,
.poll, .pollOption, .pollOptions, .pollVotes, .pollVoteInMessage, .pollVote, .unread, .blockUser, .unblockUser:
.poll, .pollOption, .pollOptions, .pollVotes, .pollVoteInMessage, .pollVote, .unread, .blockUser, .unblockUser,
.sendMessage:
return false
}
}
Expand Down
16 changes: 16 additions & 0 deletions Sources/StreamChat/ChatClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ extension ChatClient: ConnectionStateDelegate {
}
)
connectionRecoveryHandler?.webSocketClient(client, didUpdateConnectionState: state)
try? backgroundWorker(of: MessageSender.self).didUpdateConnectionState(state)
}
}

Expand All @@ -674,6 +675,21 @@ extension ChatClient: ConnectionDetailsProviderDelegate {
}
}

extension ChatClient {
func backgroundWorker<T>(of type: T.Type) throws -> T {
if let worker = backgroundWorkers.compactMap({ $0 as? T }).first {
return worker
}
if currentUserId == nil {
throw ClientError.CurrentUserDoesNotExist()
}
if !config.isClientInActiveMode {
throw ClientError.ClientIsNotInActiveMode()
}
throw ClientError("Background worker of type \(T.self) is not set up")
}
}

extension ClientError {
public final class MissingLocalStorageURL: ClientError {
override public var localizedDescription: String { "The URL provided in ChatClientConfig is `nil`." }
Expand Down
16 changes: 14 additions & 2 deletions Sources/StreamChat/Repositories/MessageRepository.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ enum MessageRepositoryError: LocalizedError {
case messageDoesNotExist
case messageNotPendingSend
case messageDoesNotHaveValidChannel
/// Any other error which requires manual user interaction, message's state is set to failedToSend
case failedToSendMessage(Error)
/// A connection error occured, message's state is set back to pendingSend for automatic retry
case ephemeralConnectionError(Error)
}

class MessageRepository {
Expand Down Expand Up @@ -126,13 +129,22 @@ class MessageRepository {
// error code for duplicated messages.
let isDuplicatedMessageError = errorPayload.code == 4 && errorPayload.message.contains("already exists")
if isDuplicatedMessageError {
database.write {
database.write({
let messageDTO = $0.message(id: messageId)
messageDTO?.markMessageAsSent()
}, completion: { _ in
completion(.failure(.failedToSendMessage(error)))
}
})
return
}
} else if ClientError.isEphemeral(error: error) {
database.write({
let messageDTO = $0.message(id: messageId)
messageDTO?.localMessageState = .pendingSend
}, completion: { _ in
completion(.failure(.ephemeralConnectionError(error)))
})
return
}

markMessageAsFailedToSend(id: messageId) {
Expand Down
17 changes: 0 additions & 17 deletions Sources/StreamChat/StateLayer/Chat.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1418,20 +1418,3 @@ extension Chat {
) -> TypingEventsSender = TypingEventsSender.init
}
}

// MARK: - Chat Client

private extension ChatClient {
func backgroundWorker<T>(of type: T.Type) throws -> T {
if let worker = backgroundWorkers.compactMap({ $0 as? T }).first {
return worker
}
if currentUserId == nil {
throw ClientError.CurrentUserDoesNotExist()
}
if !config.isClientInActiveMode {
throw ClientError.ClientIsNotInActiveMode()
}
throw ClientError("Background worker of type \(T.self) is not set up")
}
}
70 changes: 51 additions & 19 deletions Sources/StreamChat/Workers/Background/MessageSender.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ import Foundation
/// 2. The pending messages are send one by one order by their `locallyCreatedAt` value ascending.
/// 3. When the message is being sent, its local state is changed to `.sending`
/// 4. If the operation is successful, the local state of the message is changed to `nil`. If the operation fails, the local
/// state of is changed to `sendingFailed`.
///
// TODO:
/// - Message send retry
/// - Start sending messages when connection status changes (offline -> online)
/// state of is changed to `sendingFailed` or `.pendingSend`.
///
class MessageSender: Worker {
/// Because we need to be sure messages for every channel are sent in the correct order, we create a sending queue for
Expand Down Expand Up @@ -117,6 +113,15 @@ class MessageSender: Worker {
}
}
}

func didUpdateConnectionState(_ state: WebSocketConnectionState) {
guard case WebSocketConnectionState.connected = state else { return }
sendingDispatchQueue.async { [weak self] in
self?.sendingQueueByCid.forEach { _, messageQueue in
messageQueue.webSocketConnected()
}
}
}
}

// MARK: - Chat State Layer
Expand Down Expand Up @@ -170,6 +175,7 @@ private class MessageSendingQueue {
/// We use Set because the message Id is the main identifier. Thanks to this, it's possible to schedule message for sending
/// multiple times without having to worry about that.
@Atomic private(set) var requests: Set<SendRequest> = []
@Atomic private var isWaitingForConnection = false

/// Schedules sending of the message. All already scheduled messages with `createdLocallyAt` older than these ones will
/// be sent first.
Expand All @@ -184,6 +190,13 @@ private class MessageSendingQueue {
sendNextMessage()
}
}

func webSocketConnected() {
guard isWaitingForConnection else { return }
isWaitingForConnection = false
log.debug("Message sender resumes sending messages after establishing internet connection")
sendNextMessage()
}

/// Gets the oldest message from the queue and tries to send it.
private func sendNextMessage() {
Expand All @@ -194,22 +207,41 @@ private class MessageSendingQueue {
guard let request = self?.requests.sorted(by: { $0.createdLocallyAt < $1.createdLocallyAt }).first else { return }

self?.messageRepository.sendMessage(with: request.messageId) { [weak self] result in
guard let self else { return }
self.removeRequestAndContinue(request)
if let error = result.error {
switch error {
case .messageDoesNotExist,
.messageNotPendingSend,
.messageDoesNotHaveValidChannel:
let event = NewMessageErrorEvent(messageId: request.messageId, error: error)
self.eventsNotificationCenter.process(event)
case let .failedToSendMessage(error):
let event = NewMessageErrorEvent(messageId: request.messageId, error: error)
self.eventsNotificationCenter.process(event)
}
self?.handleSendMessageResult(request, result: result)
}
}
}

private func handleSendMessageResult(_ request: SendRequest, result: Result<ChatMessage, MessageRepositoryError>) {
enum Action {
case waitForConnection
case concludeRequest(NewMessageErrorEvent?)
}
let action: Action = {
switch result {
case .success:
return .concludeRequest(nil)
case .failure(let repositoryError):
switch repositoryError {
case .messageDoesNotExist, .messageNotPendingSend, .messageDoesNotHaveValidChannel:
return .concludeRequest(NewMessageErrorEvent(messageId: request.messageId, error: repositoryError))
case .failedToSendMessage(let clientError):
return .concludeRequest(NewMessageErrorEvent(messageId: request.messageId, error: clientError))
case .ephemeralConnectionError:
return .waitForConnection
}
self.delegate?.messageSendingQueue(self, didProcess: request.messageId, result: result)
}
}()
switch action {
case .waitForConnection:
isWaitingForConnection = true
log.debug("Sending a message \(request.messageId) paused until internet connection is established")
case .concludeRequest(let newMessageErrorEvent):
removeRequestAndContinue(request)
if let event = newMessageErrorEvent {
eventsNotificationCenter.process(event)
}
delegate?.messageSendingQueue(self, didProcess: request.messageId, result: result)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ final class APIClient_Spy: APIClient, Spy {
@discardableResult
func waitForRequest(timeout: Double = defaultTimeout) -> AnyEndpoint? {
XCTWaiter().wait(for: [request_expectation], timeout: timeout)
request_expectation = XCTestExpectation()
return request_endpoint
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import XCTest

final class EndpointPathTests: XCTestCase {
func test_sendMessage_shouldBeQueuedOffline() throws {
XCTAssertTrue(EndpointPath.sendMessage(.unique).shouldBeQueuedOffline)
func test_sendMessage_shouldNOTBeQueuedOffline() throws {
XCTAssertFalse(EndpointPath.sendMessage(.unique).shouldBeQueuedOffline)
}

func test_editMessage_shouldBeQueuedOffline() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ final class OfflineRequestsRepository_Tests: XCTestCase {

func test_runQueuedRequestsWithPendingRequests() throws {
// We add one request to the queue
try createSendMessageRequests(count: 1)
try createAddReactionRequests(count: 1)

let expectation = self.expectation(description: "Running completes")
repository.runQueuedRequests {
Expand Down Expand Up @@ -91,9 +91,9 @@ final class OfflineRequestsRepository_Tests: XCTestCase {
XCTAssertEqual(database.writeSessionCounter, 1)
}

func test_runQueuedRequestsWithPendingRequests_sendMessage() throws {
// We add one .sendMessage request to the queue
try createRequest(id: .unique, path: .sendMessage(.unique))
func test_runQueuedRequestsWithPendingRequests_addReaction() throws {
// We add one .addReaction request to the queue
try createRequest(id: .unique, path: .addReaction(.unique))

let expectation = self.expectation(description: "Running completes")
repository.runQueuedRequests {
Expand All @@ -104,7 +104,7 @@ final class OfflineRequestsRepository_Tests: XCTestCase {
database.writeSessionCounter = 0
AssertAsync.willBeTrue(apiClient.recoveryRequest_endpoint != nil)

let jsonData = XCTestCase.mockData(fromJSONFile: "Message")
let jsonData = XCTestCase.mockData(fromJSONFile: "MessageReactionPayload")
apiClient.test_simulateRecoveryResponse(.success(jsonData))

waitForExpectations(timeout: defaultTimeout, handler: nil)
Expand All @@ -115,7 +115,6 @@ final class OfflineRequestsRepository_Tests: XCTestCase {

// 1 to remove the request from the queue
XCTAssertEqual(database.writeSessionCounter, 1)
XCTAssertCall("saveSuccessfullySentMessage(cid:message:completion:)", on: messageRepository, times: 1)
}

func test_runQueuedRequestsWithPendingRequests_editMessage() throws {
Expand Down Expand Up @@ -172,9 +171,9 @@ final class OfflineRequestsRepository_Tests: XCTestCase {
}

func test_runQueuedRequestsWithManyPendingRequests() throws {
// We put 5 .sendMessage requests in the queue
// We put 5 .addReaction requests in the queue
let count = 5
try createSendMessageRequests(count: count)
try createAddReactionRequests(count: count)

let expectation = self.expectation(description: "Running completes")
repository.runQueuedRequests {
Expand All @@ -198,9 +197,9 @@ final class OfflineRequestsRepository_Tests: XCTestCase {
}

func test_runQueuedRequestsWithManyPendingRequestsOneNetworkFailureShouldBeKept() throws {
// We put 5 .sendMessage requests in the queue
// We put 5 .addReaction requests in the queue
let count = 5
try createSendMessageRequests(count: count)
try createAddReactionRequests(count: count)

let expectation = self.expectation(description: "Running completes")
repository.runQueuedRequests {
Expand All @@ -214,7 +213,7 @@ final class OfflineRequestsRepository_Tests: XCTestCase {
// We make all the requests succeed but 1, which receives a Connection Error
apiClient.recoveryRequest_allRecordedCalls.forEach { endpoint, completion in
let completion = completion as? ((Result<Data, Error>) -> Void)
if case let .sendMessage(id) = endpoint.path, id.id == "request2" {
if case let .addReaction(id) = endpoint.path, id == "request2" {
completion?(.failure(ClientError.ConnectionError()))
} else {
completion?(.success(Data()))
Expand All @@ -229,9 +228,9 @@ final class OfflineRequestsRepository_Tests: XCTestCase {
}

func test_runQueuedRequestsWhichFailShouldBeRemoved() throws {
// We put 5 .sendMessage requests in the queue
// We put 5 .addReaction requests in the queue
let count = 5
try createSendMessageRequests(count: count)
try createAddReactionRequests(count: count)

let expectation = self.expectation(description: "Running completes")
repository.runQueuedRequests {
Expand All @@ -245,7 +244,7 @@ final class OfflineRequestsRepository_Tests: XCTestCase {
// We make all the requests succeed but 1, which receives a random error
apiClient.recoveryRequest_allRecordedCalls.forEach { endpoint, completion in
let completion = completion as? ((Result<Data, Error>) -> Void)
if case let .sendMessage(id) = endpoint.path, id.id == "request2" {
if case let .addReaction(id) = endpoint.path, id == "request2" {
completion?(.failure(NSError(domain: "whatever", code: 1, userInfo: nil)))
} else {
completion?(.success(Data()))
Expand All @@ -263,7 +262,7 @@ final class OfflineRequestsRepository_Tests: XCTestCase {
// We put 3 .sendMessage requests in the queue, 20 hours old.
let count = 3
let date = Date(timeIntervalSinceNow: -3600 * 20)
try createSendMessageRequests(count: count, date: date)
try createAddReactionRequests(count: count, date: date)

let expectation = self.expectation(description: "Running completes")
repository.runQueuedRequests {
Expand All @@ -280,17 +279,17 @@ final class OfflineRequestsRepository_Tests: XCTestCase {
}

func test_runQueuedRequestsMixOldAndNew() throws {
// We put 3 .sendMessage requests in the queue, 20 hours old.
// We put 3 .addReaction requests in the queue, 20 hours old.
let count = 3
let date = Date(timeIntervalSinceNow: -3600 * 20)
try createSendMessageRequests(count: count, date: date)
try createAddReactionRequests(count: count, date: date)

// Create one recent.
let id = "request\(count)"
try createRequest(
id: id,
path: .sendMessage(.init(type: .messaging, id: id)),
body: ["some\(id)": 123],
path: .addReaction(id),
body: ["some\(count)": 123],
date: Date()
)

Expand All @@ -315,12 +314,12 @@ final class OfflineRequestsRepository_Tests: XCTestCase {
XCTAssertEqual(pendingRequests.count, 0)
}

private func createSendMessageRequests(count: Int, date: Date = Date()) throws {
private func createAddReactionRequests(count: Int, date: Date = Date()) throws {
try (1...count).forEach {
let id = "request\($0)"
try self.createRequest(
id: id,
path: .sendMessage(.init(type: .messaging, id: id)),
path: .addReaction(id),
body: ["some\($0)": 123],
date: date
)
Expand Down Expand Up @@ -367,7 +366,7 @@ final class OfflineRequestsRepository_Tests: XCTestCase {

func test_queueOfflineRequestWanted() {
let endpoint = DataEndpoint(
path: .sendMessage(.unique),
path: .addReaction(.unique),
method: .post,
queryItems: nil,
requiresConnectionId: true,
Expand Down
Loading

0 comments on commit f058c43

Please sign in to comment.