Add spannable tracking around SyncResponseHandler (#7514)

* Add spannable tracking around SyncResponseHandler

* Update LICENSE header

* Refactor handleResponse and MetricsExtensions

* Update changelog.d

* Improve code docs and comments

* Check if Sentry is enabled before tracking
This commit is contained in:
Amit Kumar 2022-11-10 16:13:09 +05:30 committed by GitHub
parent 76b179e738
commit c07b110b99
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 323 additions and 56 deletions

1
changelog.d/7514.sdk Normal file
View file

@ -0,0 +1 @@
[Metrics] Add `SpannableMetricPlugin` to support spans within transactions.

View file

@ -17,25 +17,51 @@
package org.matrix.android.sdk.api.extensions
import org.matrix.android.sdk.api.metrics.MetricPlugin
import org.matrix.android.sdk.api.metrics.SpannableMetricPlugin
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
/**
* Executes the given [block] while measuring the transaction.
*
* @param block Action/Task to be executed within this span.
*/
@OptIn(ExperimentalContracts::class)
inline fun measureMetric(metricMeasurementPlugins: List<MetricPlugin>, block: () -> Unit) {
inline fun List<MetricPlugin>.measureMetric(block: () -> Unit) {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
try {
metricMeasurementPlugins.forEach { plugin -> plugin.startTransaction() } // Start the transaction.
this.forEach { plugin -> plugin.startTransaction() } // Start the transaction.
block()
} catch (throwable: Throwable) {
metricMeasurementPlugins.forEach { plugin -> plugin.onError(throwable) } // Capture if there is any exception thrown.
this.forEach { plugin -> plugin.onError(throwable) } // Capture if there is any exception thrown.
throw throwable
} finally {
metricMeasurementPlugins.forEach { plugin -> plugin.finishTransaction() } // Finally, finish this transaction.
this.forEach { plugin -> plugin.finishTransaction() } // Finally, finish this transaction.
}
}
/**
* Executes the given [block] while measuring a span.
*
* @param operation Name of the new span.
* @param description Description of the new span.
* @param block Action/Task to be executed within this span.
*/
@OptIn(ExperimentalContracts::class)
inline fun List<SpannableMetricPlugin>.measureSpan(operation: String, description: String, block: () -> Unit) {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
try {
this.forEach { plugin -> plugin.startSpan(operation, description) } // Start the transaction.
block()
} catch (throwable: Throwable) {
this.forEach { plugin -> plugin.onError(throwable) } // Capture if there is any exception thrown.
throw throwable
} finally {
this.forEach { plugin -> plugin.finishSpan() } // Finally, finish this transaction.
}
}

View file

@ -0,0 +1,36 @@
/*
* Copyright (c) 2022 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.api.metrics
/**
* A plugin that tracks span along with transactions.
*/
interface SpannableMetricPlugin : MetricPlugin {
/**
* Starts the span for a sub-task.
*
* @param operation Name of the new span.
* @param description Description of the new span.
*/
fun startSpan(operation: String, description: String)
/**
* Finish the span when sub-task is completed.
*/
fun finishSpan()
}

View file

@ -0,0 +1,32 @@
/*
* Copyright 2022 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.api.metrics
import org.matrix.android.sdk.api.logger.LoggerTag
import timber.log.Timber
private val loggerTag = LoggerTag("SyncDurationMetricPlugin", LoggerTag.CRYPTO)
/**
* An spannable metric plugin for sync response handling task.
*/
interface SyncDurationMetricPlugin : SpannableMetricPlugin {
override fun logTransaction(message: String?) {
Timber.tag(loggerTag.value).v("## syncResponseHandler() : $message")
}
}

View file

@ -355,7 +355,7 @@ internal class DeviceListManager @Inject constructor(
val relevantPlugins = metricPlugins.filterIsInstance<DownloadDeviceKeysMetricsPlugin>()
val response: KeysQueryResponse
measureMetric(relevantPlugins) {
relevantPlugins.measureMetric {
response = try {
downloadKeysForUsersTask.execute(params)
} catch (throwable: Throwable) {

View file

@ -17,6 +17,11 @@
package org.matrix.android.sdk.internal.session.sync
import com.zhuinden.monarchy.Monarchy
import io.realm.Realm
import org.matrix.android.sdk.api.MatrixConfiguration
import org.matrix.android.sdk.api.extensions.measureMetric
import org.matrix.android.sdk.api.extensions.measureSpan
import org.matrix.android.sdk.api.metrics.SyncDurationMetricPlugin
import org.matrix.android.sdk.api.session.pushrules.PushRuleService
import org.matrix.android.sdk.api.session.pushrules.RuleScope
import org.matrix.android.sdk.api.session.sync.InitialSyncStep
@ -52,9 +57,12 @@ internal class SyncResponseHandler @Inject constructor(
private val tokenStore: SyncTokenStore,
private val processEventForPushTask: ProcessEventForPushTask,
private val pushRuleService: PushRuleService,
private val presenceSyncHandler: PresenceSyncHandler
private val presenceSyncHandler: PresenceSyncHandler,
matrixConfiguration: MatrixConfiguration,
) {
private val relevantPlugins = matrixConfiguration.metricPlugins.filterIsInstance<SyncDurationMetricPlugin>()
suspend fun handleResponse(
syncResponse: SyncResponse,
fromToken: String?,
@ -63,39 +71,91 @@ internal class SyncResponseHandler @Inject constructor(
val isInitialSync = fromToken == null
Timber.v("Start handling sync, is InitialSync: $isInitialSync")
measureTimeMillis {
if (!cryptoService.isStarted()) {
Timber.v("Should start cryptoService")
cryptoService.start()
}
cryptoService.onSyncWillProcess(isInitialSync)
}.also {
Timber.v("Finish handling start cryptoService in $it ms")
}
relevantPlugins.measureMetric {
startCryptoService(isInitialSync)
// Handle the to device events before the room ones
// to ensure to decrypt them properly
measureTimeMillis {
Timber.v("Handle toDevice")
reportSubtask(reporter, InitialSyncStep.ImportingAccountCrypto, 100, 0.1f) {
if (syncResponse.toDevice != null) {
cryptoSyncHandler.handleToDevice(syncResponse.toDevice, reporter)
// Handle the to device events before the room ones
// to ensure to decrypt them properly
handleToDevice(syncResponse, reporter)
val aggregator = SyncResponsePostTreatmentAggregator()
// Prerequisite for thread events handling in RoomSyncHandler
// Disabled due to the new fallback
// if (!lightweightSettingsStorage.areThreadMessagesEnabled()) {
// threadsAwarenessHandler.fetchRootThreadEventsIfNeeded(syncResponse)
// }
startMonarchyTransaction(syncResponse, isInitialSync, reporter, aggregator)
aggregateSyncResponse(aggregator)
postTreatmentSyncResponse(syncResponse, isInitialSync)
markCryptoSyncCompleted(syncResponse)
handlePostSync()
Timber.v("On sync completed")
}
}
private fun startCryptoService(isInitialSync: Boolean) {
relevantPlugins.measureSpan("task", "start_crypto_service") {
measureTimeMillis {
if (!cryptoService.isStarted()) {
Timber.v("Should start cryptoService")
cryptoService.start()
}
cryptoService.onSyncWillProcess(isInitialSync)
}.also {
Timber.v("Finish handling start cryptoService in $it ms")
}
}.also {
Timber.v("Finish handling toDevice in $it ms")
}
val aggregator = SyncResponsePostTreatmentAggregator()
}
// Prerequisite for thread events handling in RoomSyncHandler
// Disabled due to the new fallback
// if (!lightweightSettingsStorage.areThreadMessagesEnabled()) {
// threadsAwarenessHandler.fetchRootThreadEventsIfNeeded(syncResponse)
// }
private suspend fun handleToDevice(syncResponse: SyncResponse, reporter: ProgressReporter?) {
relevantPlugins.measureSpan("task", "handle_to_device") {
measureTimeMillis {
Timber.v("Handle toDevice")
reportSubtask(reporter, InitialSyncStep.ImportingAccountCrypto, 100, 0.1f) {
if (syncResponse.toDevice != null) {
cryptoSyncHandler.handleToDevice(syncResponse.toDevice, reporter)
}
}
}.also {
Timber.v("Finish handling toDevice in $it ms")
}
}
}
private suspend fun startMonarchyTransaction(
syncResponse: SyncResponse,
isInitialSync: Boolean,
reporter: ProgressReporter?,
aggregator: SyncResponsePostTreatmentAggregator
) {
// Start one big transaction
monarchy.awaitTransaction { realm ->
// IMPORTANT nothing should be suspend here as we are accessing the realm instance (thread local)
relevantPlugins.measureSpan("task", "monarchy_transaction") {
monarchy.awaitTransaction { realm ->
// IMPORTANT nothing should be suspend here as we are accessing the realm instance (thread local)
handleRooms(reporter, syncResponse, realm, isInitialSync, aggregator)
handleAccountData(reporter, realm, syncResponse)
handlePresence(realm, syncResponse)
tokenStore.saveToken(realm, syncResponse.nextBatch)
}
}
}
private fun handleRooms(
reporter: ProgressReporter?,
syncResponse: SyncResponse,
realm: Realm,
isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator
) {
relevantPlugins.measureSpan("task", "handle_rooms") {
measureTimeMillis {
Timber.v("Handle rooms")
reportSubtask(reporter, InitialSyncStep.ImportingAccountRoom, 1, 0.8f) {
@ -106,7 +166,11 @@ internal class SyncResponseHandler @Inject constructor(
}.also {
Timber.v("Finish handling rooms in $it ms")
}
}
}
private fun handleAccountData(reporter: ProgressReporter?, realm: Realm, syncResponse: SyncResponse) {
relevantPlugins.measureSpan("task", "handle_account_data") {
measureTimeMillis {
reportSubtask(reporter, InitialSyncStep.ImportingAccountData, 1, 0.1f) {
Timber.v("Handle accountData")
@ -115,44 +179,59 @@ internal class SyncResponseHandler @Inject constructor(
}.also {
Timber.v("Finish handling accountData in $it ms")
}
}
}
private fun handlePresence(realm: Realm, syncResponse: SyncResponse) {
relevantPlugins.measureSpan("task", "handle_presence") {
measureTimeMillis {
Timber.v("Handle Presence")
presenceSyncHandler.handle(realm, syncResponse.presence)
}.also {
Timber.v("Finish handling Presence in $it ms")
}
tokenStore.saveToken(realm, syncResponse.nextBatch)
}
}
// Everything else we need to do outside the transaction
measureTimeMillis {
aggregatorHandler.handle(aggregator)
}.also {
Timber.v("Aggregator management took $it ms")
}
measureTimeMillis {
syncResponse.rooms?.let {
checkPushRules(it, isInitialSync)
userAccountDataSyncHandler.synchronizeWithServerIfNeeded(it.invite)
dispatchInvitedRoom(it)
private suspend fun aggregateSyncResponse(aggregator: SyncResponsePostTreatmentAggregator) {
relevantPlugins.measureSpan("task", "aggregator_management") {
// Everything else we need to do outside the transaction
measureTimeMillis {
aggregatorHandler.handle(aggregator)
}.also {
Timber.v("Aggregator management took $it ms")
}
}.also {
Timber.v("SyncResponse.rooms post treatment took $it ms")
}
}
measureTimeMillis {
cryptoSyncHandler.onSyncCompleted(syncResponse)
}.also {
Timber.v("cryptoSyncHandler.onSyncCompleted took $it ms")
private suspend fun postTreatmentSyncResponse(syncResponse: SyncResponse, isInitialSync: Boolean) {
relevantPlugins.measureSpan("task", "sync_response_post_treatment") {
measureTimeMillis {
syncResponse.rooms?.let {
checkPushRules(it, isInitialSync)
userAccountDataSyncHandler.synchronizeWithServerIfNeeded(it.invite)
dispatchInvitedRoom(it)
}
}.also {
Timber.v("SyncResponse.rooms post treatment took $it ms")
}
}
}
// post sync stuffs
private fun markCryptoSyncCompleted(syncResponse: SyncResponse) {
relevantPlugins.measureSpan("task", "crypto_sync_handler_onSyncCompleted") {
measureTimeMillis {
cryptoSyncHandler.onSyncCompleted(syncResponse)
}.also {
Timber.v("cryptoSyncHandler.onSyncCompleted took $it ms")
}
}
}
private fun handlePostSync() {
monarchy.writeAsync {
roomSyncHandler.postSyncSpaceHierarchyHandle(it)
}
Timber.v("On sync completed")
}
private fun dispatchInvitedRoom(roomsSyncResponse: RoomsSyncResponse) {

View file

@ -17,6 +17,7 @@
package im.vector.app.features.analytics.metrics
import im.vector.app.features.analytics.metrics.sentry.SentryDownloadDeviceKeysMetrics
import im.vector.app.features.analytics.metrics.sentry.SentrySyncDurationMetrics
import org.matrix.android.sdk.api.metrics.MetricPlugin
import javax.inject.Inject
import javax.inject.Singleton
@ -27,9 +28,10 @@ import javax.inject.Singleton
@Singleton
data class VectorPlugins @Inject constructor(
val sentryDownloadDeviceKeysMetrics: SentryDownloadDeviceKeysMetrics,
val sentrySyncDurationMetrics: SentrySyncDurationMetrics,
) {
/**
* Returns [List] of all [MetricPlugin] hold by this class.
*/
fun plugins(): List<MetricPlugin> = listOf(sentryDownloadDeviceKeysMetrics)
fun plugins(): List<MetricPlugin> = listOf(sentryDownloadDeviceKeysMetrics, sentrySyncDurationMetrics)
}

View file

@ -26,8 +26,10 @@ class SentryDownloadDeviceKeysMetrics @Inject constructor() : DownloadDeviceKeys
private var transaction: ITransaction? = null
override fun startTransaction() {
transaction = Sentry.startTransaction("download_device_keys", "task")
logTransaction("Sentry transaction started")
if (Sentry.isEnabled()) {
transaction = Sentry.startTransaction("download_device_keys", "task")
logTransaction("Sentry transaction started")
}
}
override fun finishTransaction() {

View file

@ -0,0 +1,89 @@
/*
* Copyright (c) 2022 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.app.features.analytics.metrics.sentry
import io.sentry.ISpan
import io.sentry.ITransaction
import io.sentry.Sentry
import io.sentry.SpanStatus
import org.matrix.android.sdk.api.metrics.SyncDurationMetricPlugin
import java.util.EmptyStackException
import java.util.Stack
import javax.inject.Inject
/**
* Sentry based implementation of SyncDurationMetricPlugin.
*/
class SentrySyncDurationMetrics @Inject constructor() : SyncDurationMetricPlugin {
private var transaction: ITransaction? = null
// Stacks to keep spans in LIFO order.
private var spans: Stack<ISpan> = Stack()
/**
* Starts the span for a sub-task.
*
* @param operation Name of the new span.
* @param description Description of the new span.
*
* @throws IllegalStateException if this is called without starting a transaction ie. `measureSpan` must be called within `measureMetric`.
*/
override fun startSpan(operation: String, description: String) {
if (Sentry.isEnabled()) {
val span = Sentry.getSpan() ?: throw IllegalStateException("measureSpan block must be called within measureMetric")
val innerSpan = span.startChild(operation, description)
spans.push(innerSpan)
logTransaction("Sentry span started: operation=[$operation], description=[$description]")
}
}
override fun finishSpan() {
try {
spans.pop()
} catch (e: EmptyStackException) {
null
}?.finish()
logTransaction("Sentry span finished")
}
override fun startTransaction() {
if (Sentry.isEnabled()) {
transaction = Sentry.startTransaction("sync_response_handler", "task", true)
logTransaction("Sentry transaction started")
}
}
override fun finishTransaction() {
transaction?.finish()
logTransaction("Sentry transaction finished")
}
override fun onError(throwable: Throwable) {
try {
spans.peek()
} catch (e: EmptyStackException) {
null
}?.apply {
this.throwable = throwable
this.status = SpanStatus.INTERNAL_ERROR
} ?: transaction?.apply {
this.throwable = throwable
this.status = SpanStatus.INTERNAL_ERROR
}
logTransaction("Sentry transaction encountered error ${throwable.message}")
}
}