From 2da1a36c5529c3bedc47f55a34e96101a3839480 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joris=20Pelgr=C3=B6m?= Date: Thu, 2 Nov 2023 00:01:20 +0100 Subject: [PATCH] Websocket `onMessage`: prevent out of order delivery due to `launch` (#3990) Websocket onMessage: prevent out of order delivery due to launch --- .../websocket/impl/WebSocketRepositoryImpl.kt | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketRepositoryImpl.kt b/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketRepositoryImpl.kt index e9200f9c2..6529d3738 100644 --- a/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketRepositoryImpl.kt +++ b/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketRepositoryImpl.kt @@ -54,11 +54,13 @@ import io.homeassistant.companion.android.common.util.toHexString import io.homeassistant.companion.android.database.server.ServerUserInfo import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ProducerScope import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharingStarted @@ -122,6 +124,12 @@ class WebSocketRepositoryImpl @AssistedInject constructor( private val server get() = serverManager.getServer(serverId) + private val messageQueue = Channel(capacity = Channel.UNLIMITED).apply { + ioScope.launch { + consumeEach { it.join() } // Run a job, and wait for it to complete before starting the next one + } + } + override fun getConnectionState(): WebSocketState? = connectionState override suspend fun sendPing(): Boolean { @@ -840,10 +848,11 @@ class WebSocketRepositoryImpl @AssistedInject constructor( listOf(mapper.readValue(text)) } - messages.groupBy { it.id }.values.forEach { messagesForId -> - ioScope.launch { - messagesForId.forEach { message -> - Log.d(TAG, "Message number ${message.id} received") + // Send messages to the queue to ensure they are handled in order and don't block the function + messages.forEach { message -> + Log.d(TAG, "Message number ${message.id} received") + val success = messageQueue.trySend( + ioScope.launch(start = CoroutineStart.LAZY) { when (message.type) { "auth_required" -> Log.d(TAG, "Auth Requested") "auth_ok" -> handleAuthComplete(true, message.haVersion) @@ -853,7 +862,8 @@ class WebSocketRepositoryImpl @AssistedInject constructor( else -> Log.d(TAG, "Unknown message type: ${message.type}") } } - } + ) + if (!success.isSuccess) Log.w(TAG, "Message number ${message.id} not being processed") } }