Websocket onMessage: prevent out of order delivery due to launch (#3990)

Websocket onMessage: prevent out of order delivery due to launch
This commit is contained in:
Joris Pelgröm 2023-11-02 00:01:20 +01:00 committed by GitHub
parent 15574f7531
commit 2da1a36c55
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -54,11 +54,13 @@ import io.homeassistant.companion.android.common.util.toHexString
import io.homeassistant.companion.android.database.server.ServerUserInfo
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharingStarted
@ -122,6 +124,12 @@ class WebSocketRepositoryImpl @AssistedInject constructor(
private val server get() = serverManager.getServer(serverId)
private val messageQueue = Channel<Job>(capacity = Channel.UNLIMITED).apply {
ioScope.launch {
consumeEach { it.join() } // Run a job, and wait for it to complete before starting the next one
}
}
override fun getConnectionState(): WebSocketState? = connectionState
override suspend fun sendPing(): Boolean {
@ -840,10 +848,11 @@ class WebSocketRepositoryImpl @AssistedInject constructor(
listOf(mapper.readValue(text))
}
messages.groupBy { it.id }.values.forEach { messagesForId ->
ioScope.launch {
messagesForId.forEach { message ->
Log.d(TAG, "Message number ${message.id} received")
// Send messages to the queue to ensure they are handled in order and don't block the function
messages.forEach { message ->
Log.d(TAG, "Message number ${message.id} received")
val success = messageQueue.trySend(
ioScope.launch(start = CoroutineStart.LAZY) {
when (message.type) {
"auth_required" -> Log.d(TAG, "Auth Requested")
"auth_ok" -> handleAuthComplete(true, message.haVersion)
@ -853,7 +862,8 @@ class WebSocketRepositoryImpl @AssistedInject constructor(
else -> Log.d(TAG, "Unknown message type: ${message.type}")
}
}
}
)
if (!success.isSuccess) Log.w(TAG, "Message number ${message.id} not being processed")
}
}