Use compressed entity changes for device controls (#2942)

* Use compressed entity changes for device controls

 - Add support for the subscribe_entities websocket subscription which delivers initial state + changes in a compressed format to reduce data usage
 - Use the new subscribe_entities format in device controls on supported Home Assistant versions

* Less duplicate code
This commit is contained in:
Joris Pelgröm 2022-10-19 20:47:10 +02:00 committed by GitHub
parent 983dbd58e4
commit dd800d00ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 274 additions and 60 deletions

View file

@ -10,6 +10,7 @@ import dagger.hilt.android.AndroidEntryPoint
import io.homeassistant.companion.android.common.data.integration.ControlsAuthRequiredSetting
import io.homeassistant.companion.android.common.data.integration.Entity
import io.homeassistant.companion.android.common.data.integration.IntegrationRepository
import io.homeassistant.companion.android.common.data.integration.applyCompressedStateDiff
import io.homeassistant.companion.android.common.data.integration.domain
import io.homeassistant.companion.android.common.data.url.UrlRepository
import io.homeassistant.companion.android.common.data.websocket.WebSocketRepository
@ -23,7 +24,10 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import okhttp3.ResponseBody
import okhttp3.ResponseBody.Companion.toResponseBody
import retrofit2.HttpException
import retrofit2.Response
import java.util.Calendar
import java.util.concurrent.Flow
import java.util.function.Consumer
@ -81,6 +85,10 @@ class HaControlsProviderService : ControlsProviderService() {
private val ioScope: CoroutineScope = CoroutineScope(Dispatchers.IO)
private var areaRegistry: List<AreaRegistryResponse>? = null
private var deviceRegistry: List<DeviceRegistryResponse>? = null
private var entityRegistry: List<EntityRegistryResponse>? = null
override fun createPublisherForAllAvailable(): Flow.Publisher<Control> {
return Flow.Publisher { subscriber ->
ioScope.launch {
@ -90,9 +98,9 @@ class HaControlsProviderService : ControlsProviderService() {
val getEntityRegistry = async { webSocketRepository.getEntityRegistry() }
val getEntities = async { integrationRepository.getEntities() }
val areaRegistry = getAreaRegistry.await()
val deviceRegistry = getDeviceRegistry.await()
val entityRegistry = getEntityRegistry.await()
areaRegistry = getAreaRegistry.await()
deviceRegistry = getDeviceRegistry.await()
entityRegistry = getEntityRegistry.await()
val entities = getEntities.await()
val areaForEntity = entities.orEmpty().associate {
@ -145,58 +153,126 @@ class HaControlsProviderService : ControlsProviderService() {
val getDeviceRegistry = async { webSocketRepository.getDeviceRegistry() }
val getEntityRegistry = async { webSocketRepository.getEntityRegistry() }
val entities = mutableMapOf<String, Entity<Map<String, Any>>>()
controlIds.forEach {
try {
val entity = integrationRepository.getEntity(it)
if (entity != null) {
entities[it] = entity
} else {
Log.e(TAG, "Unable to get $it from Home Assistant, null response.")
}
} catch (e: Exception) {
entities["ha_failed.$it"] = getFailedEntity(it, e)
Log.e(TAG, "Unable to get $it from Home Assistant, caught exception.", e)
}
}
var areaRegistry = getAreaRegistry.await()
var deviceRegistry = getDeviceRegistry.await()
var entityRegistry = getEntityRegistry.await()
val baseUrl = urlRepository.getUrl().toString().removeSuffix("/")
sendEntitiesToSubscriber(subscriber, entities, areaRegistry, deviceRegistry, entityRegistry, baseUrl)
areaRegistry = getAreaRegistry.await()
deviceRegistry = getDeviceRegistry.await()
entityRegistry = getEntityRegistry.await()
// Listen for the state changed events.
webSocketScope.launch {
integrationRepository.getEntityUpdates(controlIds)?.collect {
val control = domainToHaControl[it.domain]?.createControl(
applicationContext,
it as Entity<Map<String, Any>>,
RegistriesDataHandler.getAreaForEntity(it.entityId, areaRegistry, deviceRegistry, entityRegistry),
entityRequiresAuth(it.entityId),
baseUrl
)
if (control != null)
subscriber.onNext(control)
if (integrationRepository.isHomeAssistantVersionAtLeast(2022, 4, 0)) {
webSocketScope.launch {
var sentInitial = false
val error404 = HttpException(Response.error<ResponseBody>(404, byteArrayOf().toResponseBody()))
webSocketRepository.getCompressedStateAndChanges(controlIds.toList())
?.collect { event ->
val toSend = mutableMapOf<String, Entity<Map<String, Any>>>()
event.added?.forEach {
val entity = it.value.toEntity(it.key)
entities.remove("ha_failed.$it")
entities[it.key] = entity
toSend[it.key] = entity
}
event.changed?.forEach {
val entity = entities[it.key]?.applyCompressedStateDiff(it.value)
entity?.let { thisEntity ->
entities[it.key] = thisEntity
toSend[it.key] = entity
}
}
event.removed?.forEach {
entities.remove(it)
val entity = getFailedEntity(it, error404)
entities["ha_failed.$it"] = entity
toSend["ha_failed.$it"] = entity
}
if (!sentInitial) {
// All initial states will be in the first message
sentInitial = true
(controlIds - entities.keys).forEach { missingEntity ->
Log.e(TAG, "Unable to get $missingEntity from Home Assistant, not returned in subscribe_entities.")
val entity = getFailedEntity(missingEntity, error404)
entities["ha_failed.$missingEntity"] = entity
toSend["ha_failed.$missingEntity"] = entity
}
}
Log.d(TAG, "Sending ${toSend.size} entities to subscriber")
sendEntitiesToSubscriber(subscriber, toSend, webSocketScope, baseUrl)
} ?: run {
controlIds.forEach {
val entity = getFailedEntity(it, Exception())
entities["ha_failed.$it"] = entity
domainToHaControl["ha_failed"]?.createControl(
applicationContext,
entity,
RegistriesDataHandler.getAreaForEntity(entity.entityId, areaRegistry, deviceRegistry, entityRegistry),
entityRequiresAuth(entity.entityId),
baseUrl
)?.let { control -> subscriber.onNext(control) }
}
}
}
} else {
// Set up initial states
controlIds.forEach {
launch { // using launch to create controls async
var id = it
try {
val entity = integrationRepository.getEntity(it)
if (entity != null) {
entities[it] = entity
} else {
Log.e(TAG, "Unable to get $it from Home Assistant, null response.")
}
} catch (e: Exception) {
Log.e(TAG, "Unable to get $it from Home Assistant, caught exception.", e)
entities["ha_failed.$it"] = getFailedEntity(it, e)
id = "ha_failed.$it"
}
entities[id]?.let {
domainToHaControl[id.split(".")[0]]?.createControl(
applicationContext,
it,
RegistriesDataHandler.getAreaForEntity(it.entityId, areaRegistry, deviceRegistry, entityRegistry),
entityRequiresAuth(it.entityId),
baseUrl
)?.let { control -> subscriber.onNext(control) }
}
}
}
// Listen for the state changed events.
webSocketScope.launch {
integrationRepository.getEntityUpdates(controlIds)?.collect {
val control = domainToHaControl[it.domain]?.createControl(
applicationContext,
it as Entity<Map<String, Any>>,
RegistriesDataHandler.getAreaForEntity(it.entityId, areaRegistry, deviceRegistry, entityRegistry),
entityRequiresAuth(it.entityId),
baseUrl
)
if (control != null)
subscriber.onNext(control)
}
}
}
webSocketScope.launch {
webSocketRepository.getAreaRegistryUpdates()?.collect {
areaRegistry = webSocketRepository.getAreaRegistry()
sendEntitiesToSubscriber(subscriber, entities, areaRegistry, deviceRegistry, entityRegistry, baseUrl)
sendEntitiesToSubscriber(subscriber, entities, webSocketScope, baseUrl)
}
}
webSocketScope.launch {
webSocketRepository.getDeviceRegistryUpdates()?.collect {
deviceRegistry = webSocketRepository.getDeviceRegistry()
sendEntitiesToSubscriber(subscriber, entities, areaRegistry, deviceRegistry, entityRegistry, baseUrl)
sendEntitiesToSubscriber(subscriber, entities, webSocketScope, baseUrl)
}
}
webSocketScope.launch {
webSocketRepository.getEntityRegistryUpdates()?.collect { event ->
if (event.action == "update" && controlIds.contains(event.entityId)) {
entityRegistry = webSocketRepository.getEntityRegistry()
sendEntitiesToSubscriber(subscriber, entities, areaRegistry, deviceRegistry, entityRegistry, baseUrl)
sendEntitiesToSubscriber(subscriber, entities, webSocketScope, baseUrl)
}
}
}
@ -243,32 +319,32 @@ class HaControlsProviderService : ControlsProviderService() {
private suspend fun sendEntitiesToSubscriber(
subscriber: Flow.Subscriber<in Control>,
entities: Map<String, Entity<Map<String, Any>>>,
areaRegistry: List<AreaRegistryResponse>?,
deviceRegistry: List<DeviceRegistryResponse>?,
entityRegistry: List<EntityRegistryResponse>?,
coroutineScope: CoroutineScope,
baseUrl: String
) {
entities.forEach {
val control = try {
domainToHaControl[it.key.split(".")[0]]?.createControl(
applicationContext,
it.value,
RegistriesDataHandler.getAreaForEntity(it.value.entityId, areaRegistry, deviceRegistry, entityRegistry),
entityRequiresAuth(it.value.entityId),
baseUrl
)
} catch (e: Exception) {
Log.e(TAG, "Unable to create control for ${it.value.domain} entity, sending error entity", e)
domainToHaControl["ha_failed"]?.createControl(
applicationContext,
getFailedEntity(it.value.entityId, e),
RegistriesDataHandler.getAreaForEntity(it.value.entityId, areaRegistry, deviceRegistry, entityRegistry),
entityRequiresAuth(it.value.entityId),
baseUrl
)
coroutineScope.launch {
val control = try {
domainToHaControl[it.key.split(".")[0]]?.createControl(
applicationContext,
it.value,
RegistriesDataHandler.getAreaForEntity(it.value.entityId, areaRegistry, deviceRegistry, entityRegistry),
entityRequiresAuth(it.value.entityId),
baseUrl
)
} catch (e: Exception) {
Log.e(TAG, "Unable to create control for ${it.value.domain} entity, sending error entity", e)
domainToHaControl["ha_failed"]?.createControl(
applicationContext,
getFailedEntity(it.value.entityId, e),
RegistriesDataHandler.getAreaForEntity(it.value.entityId, areaRegistry, deviceRegistry, entityRegistry),
entityRequiresAuth(it.value.entityId),
baseUrl
)
}
if (control != null)
subscriber.onNext(control)
}
if (control != null)
subscriber.onNext(control)
}
}

View file

@ -2,6 +2,7 @@ package io.homeassistant.companion.android.common.data.integration
import android.graphics.Color
import android.util.Log
import io.homeassistant.companion.android.common.data.websocket.impl.entities.CompressedStateDiff
import java.util.Calendar
import kotlin.math.round
@ -11,7 +12,7 @@ data class Entity<T>(
val attributes: T,
val lastChanged: Calendar,
val lastUpdated: Calendar,
val context: Map<String, Any>?
val context: Map<String, Any?>?
)
data class EntityPosition(
@ -33,6 +34,48 @@ object EntityExt {
val <T> Entity<T>.domain: String
get() = this.entityId.split(".")[0]
/**
* Apply a [CompressedStateDiff] to this Entity, and return the [Entity] with updated properties.
* Based on home-assistant-js-websocket entities `processEvent` function:
* https://github.com/home-assistant/home-assistant-js-websocket/blob/449fa43668f5316eb31609cd36088c5e82c818e2/lib/entities.ts#L47
*/
fun Entity<Map<String, Any>>.applyCompressedStateDiff(diff: CompressedStateDiff): Entity<Map<String, Any>> {
var (_, newState, newAttributes, newLastChanged, newLastUpdated, newContext) = this
diff.plus?.let { plus ->
plus.state?.let {
newState = it
}
plus.context?.let {
newContext = if (it is String) {
newContext?.toMutableMap()?.apply { set("id", it) }
} else {
newContext?.plus(it as Map<String, Any?>)
}
}
plus.lastChanged?.let {
val calendar = Calendar.getInstance().apply { timeInMillis = round(it * 1000).toLong() }
newLastChanged = calendar
newLastUpdated = calendar
} ?: plus.lastUpdated?.let {
newLastUpdated = Calendar.getInstance().apply { timeInMillis = round(it * 1000).toLong() }
}
plus.attributes?.let {
newAttributes = newAttributes.plus(it)
}
}
diff.minus?.attributes?.let {
newAttributes = newAttributes.minus(it.toSet())
}
return Entity(
entityId = entityId,
state = newState,
attributes = newAttributes,
lastChanged = newLastChanged,
lastUpdated = newLastUpdated,
context = newContext
)
}
fun <T> Entity<T>.getCoverPosition(): EntityPosition? {
// https://github.com/home-assistant/frontend/blob/dev/src/dialogs/more-info/controls/more-info-cover.ts#L33
return try {

View file

@ -3,6 +3,7 @@ package io.homeassistant.companion.android.common.data.websocket
import io.homeassistant.companion.android.common.data.integration.impl.entities.EntityResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.AreaRegistryResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.AreaRegistryUpdatedEvent
import io.homeassistant.companion.android.common.data.websocket.impl.entities.CompressedStateChangedEvent
import io.homeassistant.companion.android.common.data.websocket.impl.entities.DeviceRegistryResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.DeviceRegistryUpdatedEvent
import io.homeassistant.companion.android.common.data.websocket.impl.entities.DomainResponse
@ -25,6 +26,8 @@ interface WebSocketRepository {
suspend fun getServices(): List<DomainResponse>?
suspend fun getStateChanges(): Flow<StateChangedEvent>?
suspend fun getStateChanges(entityIds: List<String>): Flow<TriggerEvent>?
suspend fun getCompressedStateAndChanges(): Flow<CompressedStateChangedEvent>?
suspend fun getCompressedStateAndChanges(entityIds: List<String>): Flow<CompressedStateChangedEvent>?
suspend fun getAreaRegistryUpdates(): Flow<AreaRegistryUpdatedEvent>?
suspend fun getDeviceRegistryUpdates(): Flow<DeviceRegistryUpdatedEvent>?
suspend fun getEntityRegistryUpdates(): Flow<EntityRegistryUpdatedEvent>?

View file

@ -20,6 +20,7 @@ import io.homeassistant.companion.android.common.data.websocket.WebSocketRequest
import io.homeassistant.companion.android.common.data.websocket.WebSocketState
import io.homeassistant.companion.android.common.data.websocket.impl.entities.AreaRegistryResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.AreaRegistryUpdatedEvent
import io.homeassistant.companion.android.common.data.websocket.impl.entities.CompressedStateChangedEvent
import io.homeassistant.companion.android.common.data.websocket.impl.entities.DeviceRegistryResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.DeviceRegistryUpdatedEvent
import io.homeassistant.companion.android.common.data.websocket.impl.entities.DomainResponse
@ -70,6 +71,7 @@ class WebSocketRepositoryImpl @Inject constructor(
private const val TAG = "WebSocketRepository"
private const val SUBSCRIBE_TYPE_SUBSCRIBE_EVENTS = "subscribe_events"
private const val SUBSCRIBE_TYPE_SUBSCRIBE_ENTITIES = "subscribe_entities"
private const val SUBSCRIBE_TYPE_SUBSCRIBE_TRIGGER = "subscribe_trigger"
private const val SUBSCRIBE_TYPE_RENDER_TEMPLATE = "render_template"
private const val SUBSCRIBE_TYPE_PUSH_NOTIFICATION_CHANNEL =
@ -179,6 +181,12 @@ class WebSocketRepositoryImpl @Inject constructor(
override suspend fun getStateChanges(entityIds: List<String>): Flow<TriggerEvent>? =
subscribeToTrigger("state", mapOf("entity_id" to entityIds))
override suspend fun getCompressedStateAndChanges(): Flow<CompressedStateChangedEvent>? =
subscribeTo(SUBSCRIBE_TYPE_SUBSCRIBE_ENTITIES)
override suspend fun getCompressedStateAndChanges(entityIds: List<String>): Flow<CompressedStateChangedEvent>? =
subscribeTo(SUBSCRIBE_TYPE_SUBSCRIBE_ENTITIES, mapOf("entity_ids" to entityIds))
override suspend fun getAreaRegistryUpdates(): Flow<AreaRegistryUpdatedEvent>? =
subscribeToEventsForType(EVENT_AREA_REGISTRY_UPDATED)
@ -215,7 +223,7 @@ class WebSocketRepositoryImpl @Inject constructor(
*/
private suspend fun <T : Any> subscribeTo(
type: String,
data: Map<Any, Any>,
data: Map<Any, Any> = mapOf(),
timeout: Long = 0
): Flow<T>? {
val subscribeMessage = mapOf(
@ -429,6 +437,8 @@ class WebSocketRepositoryImpl @Inject constructor(
response.event,
object : TypeReference<Map<String, Any>>() {}
)
} else if (subscriptionType == SUBSCRIBE_TYPE_SUBSCRIBE_ENTITIES) {
mapper.convertValue(response.event, CompressedStateChangedEvent::class.java)
} else if (subscriptionType == SUBSCRIBE_TYPE_RENDER_TEMPLATE) {
mapper.convertValue(response.event, TemplateUpdatedEvent::class.java)
} else if (subscriptionType == SUBSCRIBE_TYPE_SUBSCRIBE_TRIGGER) {

View file

@ -0,0 +1,82 @@
package io.homeassistant.companion.android.common.data.websocket.impl.entities
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.annotation.JsonProperty
import io.homeassistant.companion.android.common.data.integration.Entity
import java.util.Calendar
import kotlin.math.round
/**
* Represents a single event emitted in a `subscribe_entities` websocket subscription. One event can
* contain state changes for multiple entities; properties map them as entity id -> state.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
data class CompressedStateChangedEvent(
@JsonProperty("a")
val added: Map<String, CompressedEntityState>?,
@JsonProperty("c")
val changed: Map<String, CompressedStateDiff>?,
@JsonProperty("r")
val removed: List<String>?
)
/**
* Describes the difference in an [Entity] state in a `subscribe_entities` websocket subscription.
* It will only include properties that have been changed, values that haven't changed will not be
* set (in Kotlin: `null`). Apply it to an existing Entity to get the new state.
*/
data class CompressedStateDiff(
@JsonProperty("+")
val plus: CompressedEntityState?,
@JsonProperty("-")
val minus: CompressedEntityRemoved?
)
/**
* A compressed version of [Entity] used for additions or changes in the entity's state in a
* `subscribe_entities` websocket subscription.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
data class CompressedEntityState(
@JsonProperty("s")
val state: String?,
@JsonProperty("a")
val attributes: Map<String, Any>?,
@JsonProperty("lc")
val lastChanged: Double?,
@JsonProperty("lu")
val lastUpdated: Double?,
@JsonProperty("c")
val context: Any?
) {
/**
* Convert a compressed entity state to a normal [Entity]. This function can be used for new
* entities that are delivered in a compressed format.
*/
fun toEntity(entityId: String): Entity<Map<String, Any>> {
return Entity(
entityId = entityId,
state = state!!,
attributes = attributes ?: mapOf(),
lastChanged = Calendar.getInstance().apply { timeInMillis = round(lastChanged!! * 1000).toLong() },
lastUpdated = Calendar.getInstance().apply {
timeInMillis =
if (lastUpdated != null) round(lastUpdated * 1000).toLong()
else round(lastChanged!! * 1000).toLong()
},
context =
if (context is String) mapOf("id" to context, "parent_id" to null, "user_id" to null)
else context as? Map<String, Any?>
)
}
}
/**
* A compressed version of [Entity] used for removed properties from the entity's state in a
* `subscribe_entities` websocket subscription. Only attributes are expected to be removed.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
data class CompressedEntityRemoved(
@JsonProperty("a")
val attributes: List<String>?
)