Support websocket message coalescing (#2829)

- Add support for the new feature where core might combine multiple responses in to one single websocket message if it's faster. This might return a JSON array of objects instead of a single object.
 - Adjust logging for websocket, no longer prints entire result on non-debug builds.
This commit is contained in:
Joris Pelgröm 2022-08-30 03:01:12 +02:00 committed by GitHub
parent 3908a07c1f
commit 84aa4454b7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 76 additions and 28 deletions

View file

@ -0,0 +1,29 @@
package io.homeassistant.companion.android.common.data
import java.util.regex.Pattern
data class HomeAssistantVersion(
val year: Int,
val month: Int,
val release: Int
) {
companion object {
private val VERSION_PATTERN = Pattern.compile("([0-9]{4})\\.([0-9]{1,2})\\.([0-9]{1,2}).*")
fun fromString(versionString: String): HomeAssistantVersion? {
val matches = VERSION_PATTERN.matcher(versionString)
return if (matches.find() && matches.matches()) {
val coreYear = matches.group(1)?.toIntOrNull() ?: 0
val coreMonth = matches.group(2)?.toIntOrNull() ?: 0
val coreRelease = matches.group(3)?.toIntOrNull() ?: 0
HomeAssistantVersion(coreYear, coreMonth, coreRelease)
} else { // Invalid version
null
}
}
}
fun isAtLeast(minYear: Int, minMonth: Int, minRelease: Int = 0): Boolean =
year > minYear || (year == minYear && (month > minMonth || (month == minMonth && release >= minRelease)))
}

View file

@ -2,6 +2,7 @@ package io.homeassistant.companion.android.common.data.integration.impl
import android.util.Log
import io.homeassistant.companion.android.common.BuildConfig
import io.homeassistant.companion.android.common.data.HomeAssistantVersion
import io.homeassistant.companion.android.common.data.LocalStorage
import io.homeassistant.companion.android.common.data.authentication.AuthenticationRepository
import io.homeassistant.companion.android.common.data.integration.DeviceRegistration
@ -31,7 +32,6 @@ import kotlinx.coroutines.flow.map
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
import org.json.JSONArray
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
import javax.inject.Inject
import javax.inject.Named
@ -77,8 +77,6 @@ class IntegrationRepositoryImpl @Inject constructor(
private const val PREF_SEC_WARNING_NEXT = "sec_warning_last"
private const val TAG = "IntegrationRepository"
private const val RATE_LIMIT_URL = BuildConfig.RATE_LIMIT_URL
private val VERSION_PATTERN = Pattern.compile("([0-9]{4})\\.([0-9]{1,2})\\.([0-9]{1,2}).*")
}
override suspend fun registerDevice(deviceRegistration: DeviceRegistration) {
@ -492,17 +490,8 @@ class IntegrationRepositoryImpl @Inject constructor(
): Boolean {
if (!isRegistered()) return false
val version = getHomeAssistantVersion()
val matches = VERSION_PATTERN.matcher(version)
var result = false
if (matches.find() && matches.matches()) {
val coreYear = matches.group(1)?.toIntOrNull() ?: 0
val coreMonth = matches.group(2)?.toIntOrNull() ?: 0
val coreRelease = matches.group(3)?.toIntOrNull() ?: 0
result =
coreYear > year || (coreYear == year && (coreMonth > month || (coreMonth == month && coreRelease >= release)))
}
return result
val version = HomeAssistantVersion.fromString(getHomeAssistantVersion())
return version?.isAtLeast(year, month, release) ?: false
}
override suspend fun getConfig(): GetConfigResponse {

View file

@ -8,6 +8,8 @@ import com.fasterxml.jackson.module.kotlin.contains
import com.fasterxml.jackson.module.kotlin.convertValue
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import io.homeassistant.companion.android.common.BuildConfig
import io.homeassistant.companion.android.common.data.HomeAssistantVersion
import io.homeassistant.companion.android.common.data.authentication.AuthenticationRepository
import io.homeassistant.companion.android.common.data.authentication.AuthorizationException
import io.homeassistant.companion.android.common.data.integration.ServiceData
@ -82,6 +84,7 @@ class WebSocketRepositoryImpl @Inject constructor(
private val id = AtomicLong(1)
private var connection: WebSocket? = null
private var connectionState: WebSocketState? = null
private var connectionHaVersion: HomeAssistantVersion? = null
private val connectedMutex = Mutex()
private var connected = CompletableDeferred<Boolean>()
private val eventSubscriptionMutex = Mutex()
@ -327,7 +330,23 @@ class WebSocketRepositoryImpl @Inject constructor(
// Wait up to 30 seconds for auth response
return true == withTimeoutOrNull(30000) {
return@withTimeoutOrNull try {
connected.await()
val didConnect = connected.await()
if (didConnect && connectionHaVersion?.isAtLeast(2022, 9) == true) {
connection?.let {
val supportedFeaturesMessage = mapOf(
"type" to "supported_features",
"id" to id.getAndIncrement(),
"features" to mapOf(
"coalesce_messages" to 1
)
)
Log.d(TAG, "Sending message ${supportedFeaturesMessage["id"]}: $supportedFeaturesMessage")
it.send(
mapper.writeValueAsString(supportedFeaturesMessage)
)
}
}
didConnect
} catch (e: Exception) {
Log.e(TAG, "Unable to authenticate", e)
false
@ -363,7 +382,8 @@ class WebSocketRepositoryImpl @Inject constructor(
private inline fun <reified T> mapResponse(response: SocketResponse?): T? =
if (response?.result != null) mapper.convertValue(response.result) else null
private fun handleAuthComplete(successful: Boolean) {
private fun handleAuthComplete(successful: Boolean, haVersion: String?) {
connectionHaVersion = haVersion?.let { HomeAssistantVersion.fromString(it) }
if (successful) {
connectionState = WebSocketState.ACTIVE
connected.complete(true)
@ -436,6 +456,7 @@ class WebSocketRepositoryImpl @Inject constructor(
connectedMutex.withLock {
connected = CompletableDeferred()
connection = null
connectionHaVersion = null
if (connectionState != WebSocketState.CLOSED_AUTH)
connectionState = WebSocketState.CLOSED_OTHER
}
@ -477,18 +498,26 @@ class WebSocketRepositoryImpl @Inject constructor(
}
override fun onMessage(webSocket: WebSocket, text: String) {
Log.d(TAG, "Websocket: onMessage (text)")
val message: SocketResponse = mapper.readValue(text)
Log.d(TAG, "Message number ${message.id} received: $text")
Log.d(TAG, "Websocket: onMessage (${if (BuildConfig.DEBUG) "text: $text" else "text"})")
val textTree = mapper.readTree(text)
val messages: List<SocketResponse> = if (textTree.isArray) {
textTree.elements().asSequence().toList().map { mapper.convertValue(it) }
} else {
listOf(mapper.readValue(text))
}
ioScope.launch {
when (message.type) {
"auth_required" -> Log.d(TAG, "Auth Requested")
"auth_ok" -> handleAuthComplete(true)
"auth_invalid" -> handleAuthComplete(false)
"pong", "result" -> handleMessage(message)
"event" -> handleEvent(message)
else -> Log.d(TAG, "Unknown message type: $text")
messages.forEach { message ->
Log.d(TAG, "Message number ${message.id} received")
ioScope.launch {
when (message.type) {
"auth_required" -> Log.d(TAG, "Auth Requested")
"auth_ok" -> handleAuthComplete(true, message.haVersion)
"auth_invalid" -> handleAuthComplete(false, message.haVersion)
"pong", "result" -> handleMessage(message)
"event" -> handleEvent(message)
else -> Log.d(TAG, "Unknown message type: ${message.type}")
}
}
}
}

View file

@ -9,5 +9,6 @@ data class SocketResponse(
val type: String,
val success: Boolean?,
val result: JsonNode?,
val event: JsonNode?
val event: JsonNode?,
val haVersion: String?
)