Initial support for websocket state changed events! (#1906)

This commit is contained in:
Justin Bassett 2021-11-11 16:09:44 -05:00 committed by GitHub
parent 3d909c621d
commit 70e441f9ce
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 135 additions and 56 deletions

View file

@ -1,21 +1,19 @@
package io.homeassistant.companion.android.controls
import android.os.Build
import android.os.Handler
import android.os.Looper
import android.service.controls.Control
import android.service.controls.ControlsProviderService
import android.service.controls.actions.ControlAction
import android.util.Log
import androidx.annotation.RequiresApi
import androidx.core.os.postDelayed
import io.homeassistant.companion.android.common.dagger.GraphComponentAccessor
import io.homeassistant.companion.android.common.data.integration.Entity
import io.homeassistant.companion.android.common.data.integration.IntegrationRepository
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.Flow
import java.util.function.Consumer
import javax.inject.Inject
@ -32,31 +30,6 @@ class HaControlsProviderService : ControlsProviderService() {
private val ioScope: CoroutineScope = CoroutineScope(Dispatchers.IO)
private val monitoredEntities = mutableListOf<String>()
private val handler = Handler(Looper.getMainLooper())
// This is the poor mans way to do this. We should really connect via websocket and update
// on events. But now we get updates every 5 seconds while on power menu.
private val refresh = object : Runnable {
override fun run() {
monitoredEntities.forEach { entityId ->
ioScope.launch {
try {
val entity = integrationRepository.getEntity(entityId)
val domain = entity.entityId.split(".")[0]
val control =
domainToHaControl[domain]?.createControl(applicationContext, entity)
updateSubscriber?.onNext(control)
} catch (e: Exception) {
Log.e(TAG, "Unable to get entity information", e)
}
}
}
handler.postDelayed(this, 5000)
}
}
private var updateSubscriber: Flow.Subscriber<in Control>? = null
private val domainToHaControl = mapOf(
"automation" to DefaultSwitchControl,
"camera" to null,
@ -100,10 +73,10 @@ class HaControlsProviderService : ControlsProviderService() {
.forEach {
subscriber.onNext(it)
}
subscriber.onComplete()
} catch (e: Exception) {
Log.e(TAG, "Error getting list of entities", e)
}
subscriber.onComplete()
}
}
}
@ -112,19 +85,43 @@ class HaControlsProviderService : ControlsProviderService() {
Log.d(TAG, "publisherFor $controlIds")
return Flow.Publisher { subscriber ->
subscriber.onSubscribe(object : Flow.Subscription {
var running = true
override fun request(n: Long) {
Log.d(TAG, "request $n")
updateSubscriber = subscriber
ioScope.launch {
val entityFlow = integrationRepository.getEntityUpdates()
// Load up initial values
// This should use the cached values that we should store in the DB.
// For now we'll use the rest API
controlIds.forEach {
val entity = integrationRepository.getEntity(it)
val domain = it.split(".")[0]
val control = domainToHaControl[domain]?.createControl(
applicationContext,
entity
)
subscriber.onNext(control)
}
// Listen for the state changed events.
entityFlow.takeWhile { running }.collect {
if (controlIds.contains(it.entityId)) {
val domain = it.entityId.split(".")[0]
val control = domainToHaControl[domain]?.createControl(
applicationContext,
it as Entity<Map<String, Any>>
)
subscriber.onNext(control)
}
}
}
}
override fun cancel() {
Log.d(TAG, "cancel")
updateSubscriber = null
handler.removeCallbacks(refresh)
running = false
}
})
monitoredEntities.addAll(controlIds)
handler.post(refresh)
}
}
@ -139,25 +136,10 @@ class HaControlsProviderService : ControlsProviderService() {
var actionSuccess = false
if (haControl != null) {
runBlocking {
try {
actionSuccess = haControl.performAction(integrationRepository, action)
val entity = integrationRepository.getEntity(controlId)
updateSubscriber?.onNext(haControl.createControl(applicationContext, entity))
handler.postDelayed(750) {
// This is here because the state isn't aways instantly updated. This should
// cause us to update a second time rapidly to ensure we display the correct state
updateSubscriber?.onNext(
haControl.createControl(
applicationContext,
entity
)
)
}
} catch (e: Exception) {
Log.e(TAG, "Unable to control or get entity information", e)
}
try {
actionSuccess = haControl.performAction(integrationRepository, action)
} catch (e: Exception) {
Log.e(TAG, "Unable to control or get entity information", e)
}
}
if (actionSuccess) {

View file

@ -1,6 +1,7 @@
package io.homeassistant.companion.android.common.data.integration
import io.homeassistant.companion.android.common.data.integration.impl.entities.RateLimitResponse
import kotlinx.coroutines.flow.Flow
interface IntegrationRepository {
@ -47,6 +48,7 @@ interface IntegrationRepository {
suspend fun getEntities(): List<Entity<Any>>
suspend fun getEntity(entityId: String): Entity<Map<String, Any>>
suspend fun getEntityUpdates(): Flow<Entity<*>>
suspend fun callService(domain: String, service: String, serviceData: HashMap<String, Any>)

View file

@ -25,6 +25,8 @@ import io.homeassistant.companion.android.common.data.integration.impl.entities.
import io.homeassistant.companion.android.common.data.url.UrlRepository
import io.homeassistant.companion.android.common.data.websocket.WebSocketRepository
import io.homeassistant.companion.android.common.data.websocket.impl.entities.GetConfigResponse
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
import org.json.JSONArray
import java.util.regex.Pattern
@ -462,6 +464,19 @@ class IntegrationRepositoryImpl @Inject constructor(
)
}
override suspend fun getEntityUpdates(): Flow<Entity<*>> {
return webSocketRepository.getStateChanges().map {
Entity(
it.newState.entityId,
it.newState.state,
it.newState.attributes,
it.newState.lastChanged,
it.newState.lastUpdated,
it.newState.context
)
}
}
private suspend fun canRegisterEntityCategoryStateClass(): Boolean {
val version = getHomeAssistantVersion()
val matches = VERSION_PATTERN.matcher(version)

View file

@ -4,6 +4,8 @@ import io.homeassistant.companion.android.common.data.integration.impl.entities.
import io.homeassistant.companion.android.common.data.integration.impl.entities.ServiceCallRequest
import io.homeassistant.companion.android.common.data.websocket.impl.entities.DomainResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.GetConfigResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.StateChangedEvent
import kotlinx.coroutines.flow.Flow
interface WebSocketRepository {
suspend fun sendPing(): Boolean
@ -12,4 +14,5 @@ interface WebSocketRepository {
suspend fun getServices(): List<DomainResponse>
suspend fun getPanels(): List<String>
suspend fun callService(request: ServiceCallRequest)
suspend fun getStateChanges(): Flow<StateChangedEvent>
}

View file

@ -12,12 +12,22 @@ import io.homeassistant.companion.android.common.data.integration.impl.entities.
import io.homeassistant.companion.android.common.data.url.UrlRepository
import io.homeassistant.companion.android.common.data.websocket.WebSocketRepository
import io.homeassistant.companion.android.common.data.websocket.impl.entities.DomainResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.EventResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.GetConfigResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.SocketResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.StateChangedEvent
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withTimeout
@ -44,10 +54,13 @@ class WebSocketRepositoryImpl @Inject constructor(
private val mapper = jacksonObjectMapper()
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
private val responseCallbackJobs = mutableMapOf<Long, CancellableContinuation<SocketResponse>>()
private val subscriptionCallbacks = mutableMapOf<Long, (Boolean) -> Unit>()
private val id = AtomicLong(1)
private var connection: WebSocket? = null
private var connected = Job()
private var stateChangedFlow: SharedFlow<StateChangedEvent>? = null
@ExperimentalCoroutinesApi
private var producerScope: ProducerScope<StateChangedEvent>? = null
override suspend fun sendPing(): Boolean {
val socketResponse = sendMessage(
@ -107,6 +120,38 @@ class WebSocketRepositoryImpl @Inject constructor(
TODO("Not yet implemented")
}
@ExperimentalCoroutinesApi
override suspend fun getStateChanges(): Flow<StateChangedEvent> {
if (stateChangedFlow == null) {
val response = sendMessage(
mapOf(
"type" to "subscribe_events",
"event_type" to "state_changed"
)
)
stateChangedFlow = callbackFlow {
producerScope = this
awaitClose {
Log.d(TAG, "Unsubscribing from state_changes")
ioScope.launch {
sendMessage(
mapOf(
"type" to "unsubscribe_events",
"subscription" to response.id
)
)
}
producerScope = null
stateChangedFlow = null
}
}.shareIn(ioScope, SharingStarted.WhileSubscribed())
}
return stateChangedFlow!!
}
/**
* This method will
*/
@ -181,6 +226,15 @@ class WebSocketRepositoryImpl @Inject constructor(
responseCallbackJobs.remove(id)
}
@ExperimentalCoroutinesApi
private suspend fun handleEvent(response: SocketResponse) {
val eventResponse = mapper.convertValue(
response.event,
object : TypeReference<EventResponse>() {}
)
producerScope?.send(eventResponse.data)
}
override fun onOpen(webSocket: WebSocket, response: Response) {
Log.d(TAG, "Websocket: onOpen")
}
@ -196,6 +250,7 @@ class WebSocketRepositoryImpl @Inject constructor(
"auth_ok" -> handleAuthComplete(true)
"auth_invalid" -> handleAuthComplete(false)
"pong", "result" -> handleMessage(message)
"event" -> handleEvent(message)
else -> Log.d(TAG, "Unknown message type: $text")
}
}

View file

@ -0,0 +1,10 @@
package io.homeassistant.companion.android.common.data.websocket.impl.entities
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
@JsonIgnoreProperties(ignoreUnknown = true)
data class EventResponse(
val eventType: String,
val timeFired: String,
val data: StateChangedEvent
)

View file

@ -8,5 +8,6 @@ data class SocketResponse(
val id: Long?,
val type: String,
val success: Boolean?,
val result: JsonNode?
val result: JsonNode?,
val event: JsonNode?
)

View file

@ -0,0 +1,11 @@
package io.homeassistant.companion.android.common.data.websocket.impl.entities
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import io.homeassistant.companion.android.common.data.integration.Entity
@JsonIgnoreProperties(ignoreUnknown = true)
data class StateChangedEvent(
val entityId: String,
val oldState: Entity<*>,
val newState: Entity<*>
)