bluez5: sco-sink: implement flushing the same way as in a2dp-sink

Use separate timers for driving graph and for flushing, since they don't
have the same period.

Flushing is done based on the time positions of the next sample to be
written, so it will stay in sync with the graph.  Because writing too
much data to SCO sockets generally only causes the device to skip ahead,
we don't need to handle the case where the writing has been lagging.

This fixes simultaneous playback to both ALSA and SCO sinks from the
same graph, with SCO as driver, which previously produces broken sound
(e.g. with pw-play --latency 512, linked to the two sinks) ALSA nodes
require regular driver intervals, which was not true previously.
This commit is contained in:
Pauli Virtanen 2022-07-10 18:09:53 +03:00 committed by Wim Taymans
parent 9cfa66baa2
commit 9c4aab7508

View file

@ -60,14 +60,10 @@ static struct spa_log_topic log_topic = SPA_LOG_TOPIC(0, "spa.bluez5.sink.sco");
#define DEFAULT_CLOCK_NAME "clock.system.monotonic"
struct props {
uint32_t min_latency;
uint32_t max_latency;
char clock_name[64];
};
#define MAX_BUFFERS 32
#define MIN_LATENCY 512
#define MAX_LATENCY 1024
struct buffer {
uint32_t id;
@ -130,6 +126,8 @@ struct impl {
struct spa_param_info params[N_NODE_PARAMS];
struct props props;
uint32_t quantum_limit;
/* Transport */
struct spa_bt_transport *transport;
struct spa_hook transport_listener;
@ -140,38 +138,39 @@ struct impl {
/* Flags */
unsigned int started:1;
unsigned int following:1;
unsigned int flush_pending:1;
/* Sources */
struct spa_source source;
struct spa_source flush_timer_source;
/* Timer */
int timerfd;
struct timespec now;
int flush_timerfd;
struct spa_io_clock *clock;
struct spa_io_position *position;
uint64_t current_time;
uint64_t next_time;
uint64_t process_time;
uint64_t prev_flush_time;
uint64_t next_flush_time;
/* mSBC */
sbc_t msbc;
uint8_t *buffer;
uint8_t *buffer_head;
uint8_t *buffer_next;
int buffer_size;
/* Times */
uint64_t start_time;
uint64_t total_samples;
int msbc_seq;
};
#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) == 0)
static const uint32_t default_min_latency = MIN_LATENCY;
static const uint32_t default_max_latency = MAX_LATENCY;
static const char sntable[4] = { 0x08, 0x38, 0xC8, 0xF8 };
static void reset_props(struct props *props)
{
props->min_latency = default_min_latency;
props->max_latency = default_max_latency;
strncpy(props->clock_name, DEFAULT_CLOCK_NAME, sizeof(props->clock_name));
}
@ -199,23 +198,7 @@ static int impl_node_enum_params(void *object, int seq,
switch (id) {
case SPA_PARAM_PropInfo:
{
struct props *p = &this->props;
switch (result.index) {
case 0:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_PropInfo, id,
SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_minLatency),
SPA_PROP_INFO_description, SPA_POD_String("The minimum latency"),
SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->min_latency, 1, INT32_MAX));
break;
case 1:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_PropInfo, id,
SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_maxLatency),
SPA_PROP_INFO_description, SPA_POD_String("The maximum latency"),
SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->max_latency, 1, INT32_MAX));
break;
default:
return 0;
}
@ -223,14 +206,10 @@ static int impl_node_enum_params(void *object, int seq,
}
case SPA_PARAM_Props:
{
struct props *p = &this->props;
switch (result.index) {
case 0:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_Props, id,
SPA_PROP_minLatency, SPA_POD_Int(p->min_latency),
SPA_PROP_maxLatency, SPA_POD_Int(p->max_latency));
SPA_TYPE_OBJECT_Props, id);
break;
default:
return 0;
@ -260,28 +239,17 @@ static int set_timeout(struct impl *this, uint64_t time)
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
return spa_system_timerfd_settime(this->data_system,
this->timerfd, 0, &ts, NULL);
this->timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL);
}
static int set_timers(struct impl *this)
{
return set_timeout(this, this->following ? 0 : 1);
}
struct timespec now;
static uint64_t get_next_timeout(struct impl *this, uint64_t now_time, uint64_t processed_samples)
{
struct port *port = &this->port;
uint64_t playback_time = 0, elapsed_time = 0, next_time = 0;
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now);
this->next_time = SPA_TIMESPEC_TO_NSEC(&now);
this->total_samples += processed_samples;
playback_time = (this->total_samples * SPA_NSEC_PER_SEC) / port->current_format.info.raw.rate;
if (now_time > this->start_time)
elapsed_time = now_time - this->start_time;
if (elapsed_time < playback_time)
next_time = playback_time - elapsed_time;
return next_time;
return set_timeout(this, this->following ? 0 : this->next_time);
}
static int do_reassign_follower(struct spa_loop *loop,
@ -344,9 +312,7 @@ static int apply_props(struct impl *this, const struct spa_pod *param)
reset_props(&new_props);
} else {
spa_pod_parse_object(param,
SPA_TYPE_OBJECT_Props, NULL,
SPA_PROP_minLatency, SPA_POD_OPT_Int(&new_props.min_latency),
SPA_PROP_maxLatency, SPA_POD_OPT_Int(&new_props.max_latency));
SPA_TYPE_OBJECT_Props, NULL);
}
changed = (memcmp(&new_props, &this->props, sizeof(struct props)) != 0);
@ -378,22 +344,64 @@ static int impl_node_set_param(void *object, uint32_t id, uint32_t flags,
return 0;
}
static void enable_flush_timer(struct impl *this, bool enabled)
{
struct itimerspec ts;
if (!enabled)
this->next_flush_time = 0;
ts.it_value.tv_sec = this->next_flush_time / SPA_NSEC_PER_SEC;
ts.it_value.tv_nsec = this->next_flush_time % SPA_NSEC_PER_SEC;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
spa_system_timerfd_settime(this->data_system,
this->flush_timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL);
this->flush_pending = enabled;
}
static uint32_t get_queued_frames(struct impl *this)
{
struct port *port = &this->port;
uint32_t bytes = 0;
struct buffer *b;
spa_list_for_each(b, &port->ready, link) {
struct spa_data *d = b->buf->datas;
bytes += d[0].chunk->size;
}
if (bytes > port->ready_offset)
bytes -= port->ready_offset;
else
bytes = 0;
return bytes / port->frame_size;
}
static void flush_data(struct impl *this)
{
struct port *port = &this->port;
struct spa_data *datas;
const uint32_t min_in_size =
(this->transport->codec == HFP_AUDIO_CODEC_MSBC) ?
MSBC_DECODED_SIZE : this->transport->write_mtu;
uint8_t * const packet =
(this->transport->codec == HFP_AUDIO_CODEC_MSBC) ?
this->buffer_head : port->write_buffer;
const uint32_t packet_samples = min_in_size / port->frame_size;
const uint64_t packet_time = (uint64_t)packet_samples * SPA_NSEC_PER_SEC
/ port->current_format.info.raw.rate;
int processed = 0;
int written;
if (this->transport == NULL || this->transport->sco_io == NULL)
return;
again:
while (!spa_list_is_empty(&port->ready) && port->write_buffer_size < min_in_size) {
struct spa_data *datas;
/* get buffer */
if (!port->current_buffer) {
spa_return_if_fail(!spa_list_is_empty(&port->ready));
@ -429,147 +437,212 @@ again:
}
}
/* send the data if the write buffer is full */
if (port->write_buffer_size >= min_in_size) {
uint64_t now_time;
static int sn = 0;
int processed = 0;
ssize_t out_encoded;
int written;
uint64_t next_timeout;
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now);
now_time = SPA_TIMESPEC_TO_NSEC(&this->now);
if (this->start_time == 0)
this->start_time = now_time;
if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) {
/* Encode */
if (this->buffer_next + MSBC_ENCODED_SIZE > this->buffer + this->buffer_size) {
/* Buffer overrun; shouldn't usually happen. Drop data and reset. */
this->buffer_head = this->buffer_next = this->buffer;
spa_log_warn(this->log, "sco-sink: mSBC buffer overrun, dropping data");
}
this->buffer_next[0] = 0x01;
this->buffer_next[1] = sntable[sn];
this->buffer_next[59] = 0x00;
sn = (sn + 1) % 4;
processed = sbc_encode(&this->msbc, port->write_buffer, port->write_buffer_size,
this->buffer_next + 2, MSBC_ENCODED_SIZE - 3, &out_encoded);
if (processed < 0) {
spa_log_warn(this->log, "sbc_encode failed: %d", processed);
return;
}
this->buffer_next += out_encoded + 3;
port->write_buffer_size = 0;
/* Write */
written = spa_bt_sco_io_write(this->transport->sco_io, packet,
this->buffer_next - this->buffer_head);
if (written < 0) {
spa_log_warn(this->log, "failed to write data: %d (%s)",
written, spa_strerror(written));
goto stop;
}
this->buffer_head += written;
if (this->buffer_head == this->buffer_next)
this->buffer_head = this->buffer_next = this->buffer;
else if (this->buffer_next + MSBC_ENCODED_SIZE > this->buffer + this->buffer_size) {
/* Written bytes is not necessarily commensurate
* with MSBC_ENCODED_SIZE. If this occurs, copy data.
*/
int size = this->buffer_next - this->buffer_head;
spa_memmove(this->buffer, this->buffer_head, size);
this->buffer_next = this->buffer + size;
this->buffer_head = this->buffer;
}
} else {
written = spa_bt_sco_io_write(this->transport->sco_io, packet,
port->write_buffer_size);
if (written < 0) {
spa_log_warn(this->log, "sco-sink: write failure: %d (%s)",
written, spa_strerror(written));
goto stop;
} else if (written == 0) {
/* EAGAIN or similar, just skip ahead */
written = SPA_MIN(port->write_buffer_size, (uint32_t)48);
}
processed = written;
port->write_buffer_size -= written;
if (port->write_buffer_size > 0 && written > 0) {
spa_memmove(port->write_buffer, port->write_buffer + written, port->write_buffer_size);
}
}
spa_log_trace(this->log, "write socket data %d", written);
next_timeout = get_next_timeout(this, now_time, processed / port->frame_size);
if (!this->following && this->clock) {
this->clock->nsec = now_time;
this->clock->position = this->total_samples;
this->clock->delay = processed / port->frame_size;
this->clock->rate_diff = 1.0f;
this->clock->next_nsec = now_time + next_timeout;
}
if (next_timeout == 0)
goto again;
spa_log_trace(this->log, "timeout %"PRIu64" ns", next_timeout);
set_timeout(this, next_timeout);
} else {
/* As follower, driver will wake us up when there is data */
if (this->following)
return;
/* As driver, run timeout now to schedule data */
spa_log_trace(this->log, "timeout 1 ns (driver: schedule now)");
set_timeout(this, 1);
if (this->flush_pending) {
spa_log_trace(this->log, "%p: wait for flush timer", this);
return;
}
if (port->write_buffer_size < min_in_size) {
/* wait for more data */
spa_log_trace(this->log, "%p: skip flush", this);
enable_flush_timer(this, false);
return;
}
if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) {
ssize_t out_encoded;
/* Encode */
if (this->buffer_next + MSBC_ENCODED_SIZE > this->buffer + this->buffer_size) {
/* Buffer overrun; shouldn't usually happen. Drop data and reset. */
this->buffer_head = this->buffer_next = this->buffer;
spa_log_warn(this->log, "sco-sink: mSBC buffer overrun, dropping data");
}
this->buffer_next[0] = 0x01;
this->buffer_next[1] = sntable[this->msbc_seq % 4];
this->buffer_next[59] = 0x00;
this->msbc_seq = (this->msbc_seq + 1) % 4;
processed = sbc_encode(&this->msbc, port->write_buffer, port->write_buffer_size,
this->buffer_next + 2, MSBC_ENCODED_SIZE - 3, &out_encoded);
if (processed < 0) {
spa_log_warn(this->log, "sbc_encode failed: %d", processed);
return;
}
this->buffer_next += out_encoded + 3;
port->write_buffer_size = 0;
/* Write */
written = spa_bt_sco_io_write(this->transport->sco_io, packet,
this->buffer_next - this->buffer_head);
if (written < 0) {
spa_log_warn(this->log, "failed to write data: %d (%s)",
written, spa_strerror(written));
goto stop;
}
this->buffer_head += written;
if (this->buffer_head == this->buffer_next)
this->buffer_head = this->buffer_next = this->buffer;
else if (this->buffer_next + MSBC_ENCODED_SIZE > this->buffer + this->buffer_size) {
/* Written bytes is not necessarily commensurate
* with MSBC_ENCODED_SIZE. If this occurs, copy data.
*/
int size = this->buffer_next - this->buffer_head;
spa_memmove(this->buffer, this->buffer_head, size);
this->buffer_next = this->buffer + size;
this->buffer_head = this->buffer;
}
} else {
written = spa_bt_sco_io_write(this->transport->sco_io, packet,
port->write_buffer_size);
if (written < 0) {
spa_log_warn(this->log, "sco-sink: write failure: %d (%s)",
written, spa_strerror(written));
goto stop;
} else if (written == 0) {
/* EAGAIN or similar, just skip ahead */
written = SPA_MIN(port->write_buffer_size, (uint32_t)48);
}
processed = written;
port->write_buffer_size -= written;
if (port->write_buffer_size > 0 && written > 0) {
spa_memmove(port->write_buffer, port->write_buffer + written, port->write_buffer_size);
}
}
if (SPA_UNLIKELY(spa_log_level_topic_enabled(this->log, SPA_LOG_TOPIC_DEFAULT, SPA_LOG_LEVEL_TRACE))) {
struct timespec ts;
uint64_t now;
uint64_t dt;
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &ts);
now = SPA_TIMESPEC_TO_NSEC(&ts);
dt = now - this->prev_flush_time;
this->prev_flush_time = now;
spa_log_trace(this->log,
"%p: send wrote:%d dt:%"PRIu64,
this, written, dt);
}
spa_log_trace(this->log, "write socket data %d", written);
if (SPA_LIKELY(this->position)) {
uint32_t frames = get_queued_frames(this);
uint64_t duration_ns;
/*
* Flush at the time position of the next buffered sample.
*/
duration_ns = ((uint64_t)this->position->clock.duration * SPA_NSEC_PER_SEC
/ this->position->clock.rate.denom);
this->next_flush_time = this->process_time + duration_ns
- ((uint64_t)frames * SPA_NSEC_PER_SEC
/ port->current_format.info.raw.rate);
/*
* We could delay the output by one packet to avoid waiting
* for the next buffer and so make send intervals more regular.
* However, this appears not needed in practice, and it's better
* to not add latency if not needed.
*/
#if 0
this->next_flush_time += SPA_MIN(packet_time,
duration_ns * (port->n_buffers - 1));
#endif
} else {
if (this->next_flush_time == 0)
this->next_flush_time = this->process_time;
this->next_flush_time += packet_time;
}
enable_flush_timer(this, true);
return;
stop:
if (this->source.loop)
spa_loop_remove_source(this->data_loop, &this->source);
enable_flush_timer(this, false);
}
static void sco_on_flush_timeout(struct spa_source *source)
{
struct impl *this = source->data;
uint64_t exp;
spa_log_trace(this->log, "%p: flush on timeout", this);
if (spa_system_timerfd_read(this->data_system, this->flush_timerfd, &exp) < 0)
spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno));
if (this->transport == NULL) {
enable_flush_timer(this, false);
return;
}
while (exp-- > 0) {
this->flush_pending = false;
flush_data(this);
}
}
static void sco_on_timeout(struct spa_source *source)
{
struct impl *this = source->data;
struct port *port = &this->port;
uint64_t exp;
uint64_t exp, duration;
uint32_t rate;
struct spa_io_buffers *io = port->io;
uint64_t prev_time, now_time;
if (this->transport == NULL)
return;
/* Read the timerfd */
if (this->started && spa_system_timerfd_read(this->data_system, this->timerfd, &exp) < 0)
spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno));
/* delay if no buffers available */
if (!this->following && spa_list_is_empty(&port->ready)) {
set_timeout(this, this->transport->write_mtu / port->frame_size * SPA_NSEC_PER_SEC / port->current_format.info.raw.rate);
port->io->status = SPA_STATUS_NEED_DATA;
spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_DATA);
return;
prev_time = this->current_time;
now_time = this->current_time = this->next_time;
spa_log_debug(this->log, "%p: timer %"PRIu64" %"PRIu64"", this,
now_time, now_time - prev_time);
if (SPA_LIKELY(this->position)) {
duration = this->position->clock.duration;
rate = this->position->clock.rate.denom;
} else {
duration = 1024;
rate = 48000;
}
/* Flush data */
flush_data(this);
this->next_time = now_time + duration * SPA_NSEC_PER_SEC / rate;
if (SPA_LIKELY(this->clock)) {
this->clock->nsec = now_time;
this->clock->position += duration;
this->clock->duration = duration;
this->clock->rate_diff = 1.0f;
this->clock->next_nsec = this->next_time;
this->clock->delay = 0;
}
spa_log_trace(this->log, "%p: %d", this, io->status);
io->status = SPA_STATUS_NEED_DATA;
spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_DATA);
set_timeout(this, this->next_time);
}
/* greater common divider */
static int gcd(int a, int b) {
while(b) {
int c = b;
b = a % b;
a = c;
int c = b;
b = a % b;
a = c;
}
return a;
}
@ -616,6 +689,10 @@ static int do_start(struct impl *this)
this->buffer_size = lcm(24, lcm(60, lcm(this->transport->write_mtu, 2 * MSBC_ENCODED_SIZE)));
this->buffer = calloc(this->buffer_size, sizeof(uint8_t));
this->buffer_head = this->buffer_next = this->buffer;
if (this->buffer == NULL) {
res = -errno;
goto fail;
}
}
spa_return_val_if_fail(this->transport->write_mtu <= sizeof(this->port.write_buffer), -EINVAL);
@ -632,7 +709,15 @@ static int do_start(struct impl *this)
this->source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->source);
this->flush_timer_source.data = this;
this->flush_timer_source.fd = this->flush_timerfd;
this->flush_timer_source.func = sco_on_flush_timeout;
this->flush_timer_source.mask = SPA_IO_IN;
this->flush_timer_source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->flush_timer_source);
/* start processing */
this->flush_pending = false;
set_timers(this);
/* Set the started flag */
@ -675,13 +760,20 @@ static int do_remove_source(struct spa_loop *loop,
void *user_data)
{
struct impl *this = user_data;
struct itimerspec ts;
this->start_time = 0;
this->total_samples = 0;
set_timeout(this, 0);
if (this->source.loop)
spa_loop_remove_source(this->data_loop, &this->source);
if (this->flush_timer_source.loop)
spa_loop_remove_source(this->data_loop, &this->flush_timer_source);
ts.it_value.tv_sec = 0;
ts.it_value.tv_nsec = 0;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
spa_system_timerfd_settime(this->data_system, this->flush_timerfd, 0, &ts, NULL);
/* Drop buffered data in the ready queue. Ideally there shouldn't be any. */
drop_port_output(this);
@ -758,7 +850,7 @@ static void emit_node_info(struct impl *this, bool full)
{ SPA_KEY_DEVICE_API, "bluez5" },
{ SPA_KEY_MEDIA_CLASS, "Stream/Input/Audio" },
{ "media.name", ((this->transport && this->transport->device->name) ?
this->transport->device->name : "HSP/HFP") },
this->transport->device->name : "HSP/HFP") },
{ SPA_KEY_MEDIA_ROLE, "Communication" },
};
bool is_ag = this->transport &&
@ -917,9 +1009,9 @@ impl_node_port_enum_params(void *object, int seq,
SPA_TYPE_OBJECT_ParamBuffers, id,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 1, MAX_BUFFERS),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
this->props.max_latency * port->frame_size,
this->props.min_latency * port->frame_size,
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
this->quantum_limit * port->frame_size,
16 * port->frame_size,
INT32_MAX),
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->frame_size));
break;
@ -1150,6 +1242,11 @@ static int impl_node_process(void *object)
if ((io = port->io) == NULL)
return -EIO;
if (this->position && this->position->clock.flags & SPA_IO_CLOCK_FLAG_FREEWHEEL) {
io->status = SPA_STATUS_NEED_DATA;
return SPA_STATUS_HAVE_DATA;
}
if (io->status == SPA_STATUS_HAVE_DATA && io->buffer_id < port->n_buffers) {
struct buffer *b = &port->buffers[io->buffer_id];
@ -1167,8 +1264,22 @@ static int impl_node_process(void *object)
io->status = SPA_STATUS_OK;
}
if (!spa_list_is_empty(&port->ready))
if (this->following) {
if (this->position) {
this->current_time = this->position->clock.nsec;
} else {
struct timespec now;
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now);
this->current_time = SPA_TIMESPEC_TO_NSEC(&now);
}
}
this->process_time = this->current_time;
if (!spa_list_is_empty(&port->ready)) {
spa_log_trace(this->log, "%p: flush on process", this);
flush_data(this);
}
return SPA_STATUS_HAVE_DATA;
}
@ -1239,6 +1350,7 @@ static int impl_clear(struct spa_handle *handle)
if (this->transport)
spa_hook_remove(&this->transport_listener);
spa_system_close(this->data_system, this->timerfd);
spa_system_close(this->data_system, this->flush_timerfd);
return 0;
}
@ -1323,6 +1435,11 @@ impl_init(const struct spa_handle_factory *factory,
spa_list_init(&port->ready);
this->quantum_limit = 8192;
if (info && (str = spa_dict_lookup(info, "clock.quantum-limit")))
spa_atou32(str, &this->quantum_limit, 0);
if (info && (str = spa_dict_lookup(info, SPA_KEY_API_BLUEZ5_TRANSPORT)))
sscanf(str, "pointer:%p", &this->transport);
@ -1336,6 +1453,9 @@ impl_init(const struct spa_handle_factory *factory,
this->timerfd = spa_system_timerfd_create(this->data_system,
CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK);
this->flush_timerfd = spa_system_timerfd_create(this->data_system,
CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK);
return 0;
}