bluez5: iso-io: track and apply corrections to tx latency

Use TX timestamps to get accurate reading of queue length and latency on
kernel + controller side.

This is new kernel BT feature, so requires kernel with the necessary
patches, available currently only in bluetooth-next/master branch.
Enabling Poll Errqueue kernel experimental Bluetooth feature is also
required for this.

Use the latency information to mitigate controller issues where ISO
streams are desynchronized due to tx problems or spontaneously when some
packets that should have been sent are left sitting in the queue, and
transmission is off by a multiple of the ISO interval.  This state is
visible in the latency information, so if we see streams in a group have
persistently different latencies, drop packets to resynchronize them.

Also make corrections if the kernel/controller queues get too long, so
that we don't have too big latency there.

Since BlueZ watches the same socket for errors, and TX timestamps arrive
via the socket error queue, we need to set BT_POLL_ERRQUEUE in addition
to SO_TIMESTAMPING so that BlueZ doesn't think TX timestamps are errors.

Link: https://github.com/bluez/bluez/issues/515
Link: https://lore.kernel.org/linux-bluetooth/cover.1710440392.git.pav@iki.fi/
Link: https://lore.kernel.org/linux-bluetooth/f57e065bb571d633f811610d273711c7047af335.1712499936.git.pav@iki.fi/
This commit is contained in:
Pauli Virtanen 2024-02-20 23:01:07 +02:00
parent 9165291c43
commit a6dcdfae0c
7 changed files with 298 additions and 23 deletions

View file

@ -0,0 +1,175 @@
/* Spa Bluez5 ISO I/O */
/* SPDX-FileCopyrightText: Copyright © 2024 Pauli Virtanen */
/* SPDX-License-Identifier: MIT */
#ifndef SPA_BLUEZ5_BT_LATENCY_H
#define SPA_BLUEZ5_BT_LATENCY_H
#include <sys/socket.h>
#include <linux/net_tstamp.h>
#include <linux/errqueue.h>
#include <linux/sockios.h>
#include <spa/utils/defs.h>
#include <spa/support/log.h>
#include "rate-control.h"
/* New kernel API */
#ifndef BT_SCM_ERROR
#define BT_SCM_ERROR 0x04
#endif
#ifndef BT_POLL_ERRQUEUE
#define BT_POLL_ERRQUEUE 21
#endif
/**
* Bluetooth latency tracking.
*/
struct spa_bt_latency
{
uint64_t value;
struct spa_bt_ptp ptp;
bool valid;
bool disabled;
struct {
int64_t send[64];
uint32_t pos;
int64_t prev_tx;
} impl;
};
static inline void spa_bt_latency_init(struct spa_bt_latency *lat, int fd,
uint32_t period, struct spa_log *log)
{
int so_timestamping = (SOF_TIMESTAMPING_TX_SOFTWARE | SOF_TIMESTAMPING_SOFTWARE |
SOF_TIMESTAMPING_OPT_ID | SOF_TIMESTAMPING_OPT_TSONLY);
uint32_t flag;
int res;
spa_zero(*lat);
flag = 0;
res = setsockopt(fd, SOL_BLUETOOTH, BT_POLL_ERRQUEUE, &flag, sizeof(flag));
if (res < 0) {
spa_log_warn(log, "setsockopt(BT_POLL_ERRQUEUE) failed (kernel feature not enabled?): %d (%m)", errno);
lat->disabled = true;
return;
}
res = setsockopt(fd, SOL_SOCKET, SO_TIMESTAMPING, &so_timestamping, sizeof(so_timestamping));
if (res < 0) {
spa_log_warn(log, "setsockopt(SO_TIMESTAMPING) failed (kernel feature not enabled?): %d (%m)", errno);
lat->disabled = true;
return;
}
/* Flush errqueue on start */
do {
res = recv(fd, NULL, 0, MSG_ERRQUEUE | MSG_DONTWAIT | MSG_TRUNC);
} while (res == 0);
spa_bt_ptp_init(&lat->ptp, period, period / 2);
}
static inline void spa_bt_latency_reset(struct spa_bt_latency *lat)
{
lat->value = 0;
lat->valid = false;
spa_bt_ptp_init(&lat->ptp, lat->ptp.period, lat->ptp.period / 2);
}
static inline void spa_bt_latency_sent(struct spa_bt_latency *lat, uint64_t now)
{
const unsigned int n = SPA_N_ELEMENTS(lat->impl.send);
if (lat->disabled)
return;
lat->impl.send[lat->impl.pos++] = now;
if (lat->impl.pos >= n)
lat->impl.pos = 0;
}
static inline int spa_bt_latency_recv_errqueue(struct spa_bt_latency *lat, int fd, struct spa_log *log)
{
struct {
struct cmsghdr cm;
char control[512];
} control;
if (lat->disabled)
return -EOPNOTSUPP;
do {
struct iovec data = {
.iov_base = NULL,
.iov_len = 0
};
struct msghdr msg = {
.msg_iov = &data,
.msg_iovlen = 1,
.msg_control = &control,
.msg_controllen = sizeof(control),
};
struct cmsghdr *cmsg;
struct scm_timestamping *tss = NULL;
struct sock_extended_err *serr = NULL;
int res;
res = recvmsg(fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT);
if (res < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
return -errno;
}
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_TIMESTAMPING)
tss = (void *)CMSG_DATA(cmsg);
else if (cmsg->cmsg_level == SOL_BLUETOOTH && cmsg->cmsg_type == BT_SCM_ERROR)
serr = (void *)CMSG_DATA(cmsg);
else
continue;
}
if (!tss || !serr || serr->ee_errno != ENOMSG || serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING)
return -EINVAL;
if (serr->ee_info != SCM_TSTAMP_SND)
continue;
struct timespec *ts = &tss->ts[0];
int64_t tx_time = SPA_TIMESPEC_TO_NSEC(ts);
uint32_t tx_pos = serr->ee_data % SPA_N_ELEMENTS(lat->impl.send);
lat->value = tx_time - lat->impl.send[tx_pos];
if (lat->impl.prev_tx && tx_time > lat->impl.prev_tx)
spa_bt_ptp_update(&lat->ptp, lat->value, tx_time - lat->impl.prev_tx);
lat->impl.prev_tx = tx_time;
spa_log_trace(log, "fd:%d latency[%d] nsec:%"PRIu64" range:%d..%d ms",
fd, tx_pos, lat->value,
(int)(spa_bt_ptp_valid(&lat->ptp) ? lat->ptp.min / SPA_NSEC_PER_MSEC : -1),
(int)(spa_bt_ptp_valid(&lat->ptp) ? lat->ptp.max / SPA_NSEC_PER_MSEC : -1));
} while (true);
lat->valid = spa_bt_ptp_valid(&lat->ptp);
return 0;
}
static inline void spa_bt_latency_flush(struct spa_bt_latency *lat, int fd, struct spa_log *log)
{
int so_timestamping = 0;
/* Disable timestamping and flush errqueue */
setsockopt(fd, SOL_SOCKET, SO_TIMESTAMPING, &so_timestamping, sizeof(so_timestamping));
spa_bt_latency_recv_errqueue(lat, fd, log);
lat->disabled = true;
}
#endif

View file

@ -104,8 +104,8 @@ static int spa_bt_decode_buffer_init(struct spa_bt_decode_buffer *this, struct s
spa_bt_rate_control_init(&this->ctl, 0);
spa_bt_ptp_init(&this->spike, (uint64_t)this->rate * BUFFERING_LONG_MSEC / 1000);
spa_bt_ptp_init(&this->packet_size, (uint64_t)this->rate * BUFFERING_SHORT_MSEC / 1000);
spa_bt_ptp_init(&this->spike, (uint64_t)this->rate * BUFFERING_LONG_MSEC / 1000, 0);
spa_bt_ptp_init(&this->packet_size, (uint64_t)this->rate * BUFFERING_SHORT_MSEC / 1000, 0);
if ((this->buffer_decoded = malloc(this->buffer_size)) == NULL) {
this->buffer_size = 0;

View file

@ -7,7 +7,6 @@
#include <stdio.h>
#include <errno.h>
#include <limits.h>
#include <sys/socket.h>
#include <spa/support/loop.h>
#include <spa/support/log.h>
@ -26,9 +25,14 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.bluez5.iso");
#undef SPA_LOG_TOPIC_DEFAULT
#define SPA_LOG_TOPIC_DEFAULT &log_topic
#include "bt-latency.h"
#define IDLE_TIME (500 * SPA_NSEC_PER_MSEC)
#define EMPTY_BUF_SIZE 65536
#define LATENCY_PERIOD (200 * SPA_NSEC_PER_MSEC)
#define MAX_PACKET_QUEUE 3
struct group {
struct spa_log *log;
struct spa_loop *data_loop;
@ -39,7 +43,6 @@ struct group {
uint8_t id;
uint64_t next;
uint64_t duration;
uint32_t paused;
bool started;
};
@ -55,6 +58,8 @@ struct stream {
const struct media_codec *codec;
uint32_t block_size;
struct spa_bt_latency tx_latency;
};
struct modify_info
@ -131,15 +136,20 @@ static int set_timeout(struct group *group, uint64_t time)
group->timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL);
}
static int set_timers(struct group *group)
static uint64_t get_time_ns(struct spa_system *system, clockid_t clockid)
{
struct timespec now;
spa_system_clock_gettime(system, clockid, &now);
return SPA_TIMESPEC_TO_NSEC(&now);
}
static int set_timers(struct group *group)
{
if (group->duration == 0)
return -EINVAL;
spa_system_clock_gettime(group->data_system, CLOCK_MONOTONIC, &now);
group->next = SPA_ROUND_UP(SPA_TIMESPEC_TO_NSEC(&now) + group->duration,
group->next = SPA_ROUND_UP(get_time_ns(group->data_system, CLOCK_MONOTONIC) + group->duration,
group->duration);
return set_timeout(group, group->next);
@ -170,10 +180,6 @@ static void group_on_timeout(struct spa_source *source)
return;
}
/*
* If a stream failed, pause output of all streams for a while to avoid
* desynchronization.
*/
spa_list_for_each(stream, &group->streams, link) {
if (!stream->sink) {
if (!stream->pull) {
@ -183,6 +189,8 @@ static void group_on_timeout(struct spa_source *source)
continue;
}
spa_bt_latency_recv_errqueue(&stream->tx_latency, stream->fd, group->log);
if (stream->this.need_resync) {
resync = true;
stream->this.need_resync = false;
@ -192,18 +200,16 @@ static void group_on_timeout(struct spa_source *source)
group->started = true;
}
if (group->paused) {
--group->paused;
spa_log_debug(group->log, "%p: ISO group:%u paused:%u", group, group->id, group->paused);
}
/* Produce output */
spa_list_for_each(stream, &group->streams, link) {
int res;
uint64_t now;
int32_t min_latency = INT32_MAX, max_latency = INT32_MIN;
struct stream *other;
if (!stream->sink)
continue;
if (group->paused || !group->started) {
if (!group->started) {
stream->this.resync = true;
stream->this.size = 0;
continue;
@ -217,21 +223,50 @@ static void group_on_timeout(struct spa_source *source)
}
}
spa_list_for_each(other, &group->streams, link) {
if (!other->sink || stream == other || !other->tx_latency.valid)
continue;
min_latency = SPA_MIN(min_latency, other->tx_latency.ptp.min);
max_latency = SPA_MAX(max_latency, other->tx_latency.ptp.max);
}
if (stream->tx_latency.valid && min_latency <= max_latency &&
stream->tx_latency.ptp.min > min_latency + (int64_t)group->duration/2 &&
stream->tx_latency.ptp.max > max_latency + (int64_t)group->duration/2) {
spa_log_debug(group->log, "%p: ISO group:%u latency skip align fd:%d", group, group->id, stream->fd);
spa_bt_latency_reset(&stream->tx_latency);
goto stream_done;
}
/* TODO: this should use rate match */
if (stream->tx_latency.valid &&
stream->tx_latency.ptp.min > MAX_PACKET_QUEUE * (int64_t)group->duration) {
spa_log_debug(group->log, "%p: ISO group:%u latency skip fd:%d", group, group->id, stream->fd);
spa_bt_latency_reset(&stream->tx_latency);
goto stream_done;
}
now = get_time_ns(group->data_system, CLOCK_REALTIME);
res = send(stream->fd, stream->this.buf, stream->this.size, MSG_DONTWAIT | MSG_NOSIGNAL);
if (res < 0) {
res = -errno;
fail = true;
} else {
spa_bt_latency_sent(&stream->tx_latency, now);
}
spa_log_trace(group->log, "%p: ISO group:%u sent fd:%d size:%u ts:%u idle:%d res:%d",
stream_done:
spa_log_trace(group->log, "%p: ISO group:%u sent fd:%d size:%u ts:%u idle:%d res:%d latency:%d..%d us",
group, group->id, stream->fd, (unsigned)stream->this.size,
(unsigned)stream->this.timestamp, stream->idle, res);
(unsigned)stream->this.timestamp, stream->idle, res,
stream->tx_latency.valid ? stream->tx_latency.ptp.min/1000 : -1,
stream->tx_latency.valid ? stream->tx_latency.ptp.max/1000 : -1);
stream->this.size = 0;
}
if (fail)
group->paused = 1u + IDLE_TIME / group->duration;
spa_log_debug(group->log, "%p: ISO group:%d send failure", group, group->id);
/* Pull data for the next interval */
group->next += exp * group->duration;
@ -399,6 +434,8 @@ static struct stream *stream_create(struct spa_bt_transport *t, struct group *gr
stream->this.format = format;
stream->block_size = block_size;
spa_bt_latency_init(&stream->tx_latency, stream->fd, LATENCY_PERIOD, group->log);
if (sink)
stream_silence(stream);
@ -453,6 +490,8 @@ void spa_bt_iso_io_destroy(struct spa_bt_iso_io *this)
stream_unlink(stream);
spa_bt_latency_flush(&stream->tx_latency, stream->fd, stream->group->log);
if (spa_list_is_empty(&stream->group->streams))
group_destroy(stream->group);
@ -508,3 +547,12 @@ void spa_bt_iso_io_set_cb(struct spa_bt_iso_io *this, spa_bt_iso_io_pull_t pull,
return;
}
}
/** Must be called from data thread */
int spa_bt_iso_io_recv_errqueue(struct spa_bt_iso_io *this)
{
struct stream *stream = SPA_CONTAINER_OF(this, struct stream, this);
struct group *group = stream->group;
return spa_bt_latency_recv_errqueue(&stream->tx_latency, stream->fd, group->log);
}

View file

@ -44,5 +44,6 @@ struct spa_bt_iso_io *spa_bt_iso_io_create(struct spa_bt_transport *t,
struct spa_bt_iso_io *spa_bt_iso_io_attach(struct spa_bt_iso_io *io, struct spa_bt_transport *t);
void spa_bt_iso_io_destroy(struct spa_bt_iso_io *io);
void spa_bt_iso_io_set_cb(struct spa_bt_iso_io *io, spa_bt_iso_io_pull_t pull, void *user_data);
int spa_bt_iso_io_recv_errqueue(struct spa_bt_iso_io *io);
#endif

View file

@ -45,6 +45,8 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.bluez5.sink.media");
#undef SPA_LOG_TOPIC_DEFAULT
#define SPA_LOG_TOPIC_DEFAULT &log_topic
#include "bt-latency.h"
#define DEFAULT_CLOCK_NAME "clock.system.monotonic"
struct props {
@ -57,6 +59,7 @@ struct props {
#define MAX_BUFFERS 32
#define BUFFER_SIZE (8192*8)
#define RATE_CTL_DIFF_MAX 0.005
#define LATENCY_PERIOD (200 * SPA_NSEC_PER_MSEC)
/* Wait for two cycles before trying to sync ISO. On start/driver reassign,
* first cycle may have strange number of samples. */
@ -1086,9 +1089,18 @@ static void media_on_flush_error(struct spa_source *source)
{
struct impl *this = source->data;
if (source->rmask & SPA_IO_ERR) {
/* TX timestamp info? */
if (this->transport && this->transport->iso_io)
if (spa_bt_iso_io_recv_errqueue(this->transport->iso_io) == 0)
return;
/* Otherwise: actual error */
}
spa_log_trace(this->log, "%p: flush event", this);
if (source->rmask & (SPA_IO_ERR | SPA_IO_HUP)) {
if (source->rmask & (SPA_IO_HUP | SPA_IO_ERR)) {
spa_log_warn(this->log, "%p: error %d", this, source->rmask);
if (this->flush_source.loop)
spa_loop_remove_source(this->data_loop, &this->flush_source);
@ -1372,7 +1384,6 @@ static int do_remove_transport_source(struct spa_loop *loop,
if (this->flush_source.loop)
spa_loop_remove_source(this->data_loop, &this->flush_source);
if (this->flush_timer_source.loop)
spa_loop_remove_source(this->data_loop, &this->flush_timer_source);
enable_flush_timer(this, false);

View file

@ -148,6 +148,8 @@ struct impl {
uint8_t buffer_read[4096];
struct timespec now;
uint64_t sample_count;
uint32_t errqueue_count;
};
#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) == 0)
@ -455,6 +457,24 @@ static int32_t decode_data(struct impl *this, uint8_t *src, uint32_t src_size,
return dst_size - avail;
}
static void handle_errqueue(struct impl *this)
{
int res;
/* iso-io/media-sink use these for TX latency.
* Someone else should be reading them, so drop
* only after yielding.
*/
if (this->errqueue_count < 4) {
this->errqueue_count++;
return;
}
this->errqueue_count = 0;
res = recv(this->fd, NULL, 0, MSG_ERRQUEUE | MSG_TRUNC);
spa_log_trace(this->log, "%p: ignoring errqueue data (%d)", this, res);
}
static void media_on_ready_read(struct spa_source *source)
{
struct impl *this = source->data;
@ -467,6 +487,11 @@ static void media_on_ready_read(struct spa_source *source)
/* make sure the source is an input */
if ((source->rmask & SPA_IO_IN) == 0) {
if (source->rmask & SPA_IO_ERR) {
handle_errqueue(this);
return;
}
spa_log_error(this->log, "source is not an input, rmask=%d", source->rmask);
goto stop;
}
@ -475,6 +500,8 @@ static void media_on_ready_read(struct spa_source *source)
goto stop;
}
this->errqueue_count = 0;
spa_log_trace(this->log, "socket poll");
/* read */
@ -688,6 +715,7 @@ static int transport_start(struct impl *this)
}
this->sample_count = 0;
this->errqueue_count = 0;
this->source.data = this;

View file

@ -19,10 +19,11 @@ struct spa_bt_ptp
int32_t maxs[4];
};
uint32_t pos;
uint32_t left;
uint32_t period;
};
static inline void spa_bt_ptp_init(struct spa_bt_ptp *p, int32_t period)
static inline void spa_bt_ptp_init(struct spa_bt_ptp *p, int32_t period, uint32_t min_duration)
{
size_t i;
@ -31,6 +32,7 @@ static inline void spa_bt_ptp_init(struct spa_bt_ptp *p, int32_t period)
p->mins[i] = INT32_MAX;
p->maxs[i] = INT32_MIN;
}
p->left = min_duration;
p->period = period;
}
@ -54,6 +56,16 @@ static inline void spa_bt_ptp_update(struct spa_bt_ptp *p, int32_t value, uint32
p->mins[n-1] = INT32_MAX;
p->maxs[n-1] = INT32_MIN;
}
if (p->left < duration)
p->left = 0;
else
p->left -= duration;
}
static inline bool spa_bt_ptp_valid(struct spa_bt_ptp *p)
{
return p->left == 0;
}
/**