bluez5: a2dp-source: separate clock from recv + handle buffering

a2dp-source as driver does not produce regularly spaced graph cycles,
because A2DP is not isochronous. This causes e.g. crackling for alsa
etc. that expect regular timings. It also does not rate match.

Change a2dp-source to trigger graph on regular intervals. Change recv to
only accumulate data to a buffer, and put data to buffers in process().

Rate match with DLL, keeping average buffer level constant.  Keep track
of jitter to determine a safe target value.
This commit is contained in:
Pauli Virtanen 2022-06-13 14:20:55 +03:00 committed by Wim Taymans
parent 459a1114f1
commit 51356ea3d0
2 changed files with 656 additions and 175 deletions

View file

@ -60,18 +60,16 @@ static struct spa_log_topic log_topic = SPA_LOG_TOPIC(0, "spa.bluez5.source.a2dp
#undef SPA_LOG_TOPIC_DEFAULT
#define SPA_LOG_TOPIC_DEFAULT &log_topic
#include "decode-buffer.h"
#define DEFAULT_CLOCK_NAME "clock.system.monotonic"
struct props {
uint32_t min_latency;
uint32_t max_latency;
char clock_name[64];
};
#define FILL_FRAMES 2
#define MAX_BUFFERS 32
#define MIN_LATENCY 512
#define MAX_LATENCY 1024
struct buffer {
uint32_t id;
@ -89,6 +87,7 @@ struct port {
uint64_t info_all;
struct spa_port_info info;
struct spa_io_buffers *io;
struct spa_io_rate_match *rate_match;
struct spa_latency_info latency;
#define IDX_EnumFormat 0
#define IDX_Meta 1
@ -105,8 +104,7 @@ struct port {
struct spa_list free;
struct spa_list ready;
struct buffer *current_buffer;
uint32_t ready_offset;
struct spa_bt_decode_buffer buffer;
};
struct impl {
@ -120,6 +118,8 @@ struct impl {
struct spa_hook_list hooks;
struct spa_callbacks callbacks;
uint32_t quantum_limit;
uint64_t info_all;
struct spa_node_info info;
#define IDX_PropInfo 0
@ -137,6 +137,8 @@ struct impl {
unsigned int started:1;
unsigned int transport_acquired:1;
unsigned int following:1;
unsigned int matching:1;
unsigned int resampling:1;
unsigned int is_input:1;
unsigned int is_duplex:1;
@ -144,9 +146,15 @@ struct impl {
int fd;
struct spa_source source;
struct spa_source timer_source;
int timerfd;
struct spa_io_clock *clock;
struct spa_io_position *position;
uint64_t current_time;
uint64_t next_time;
const struct a2dp_codec *codec;
bool codec_props_changed;
void *codec_props;
@ -154,10 +162,8 @@ struct impl {
struct spa_audio_info codec_format;
uint8_t buffer_read[4096];
uint8_t buffer_decoded[65536];
struct timespec now;
uint64_t sample_count;
uint64_t skip_count;
int duplex_timerfd;
uint64_t duplex_timeout;
@ -165,13 +171,8 @@ struct impl {
#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) == 0)
static const uint32_t default_min_latency = MIN_LATENCY;
static const uint32_t default_max_latency = MAX_LATENCY;
static void reset_props(struct props *props)
{
props->min_latency = default_min_latency;
props->max_latency = default_max_latency;
strncpy(props->clock_name, DEFAULT_CLOCK_NAME, sizeof(props->clock_name));
}
@ -200,43 +201,19 @@ static int impl_node_enum_params(void *object, int seq,
switch (id) {
case SPA_PARAM_PropInfo:
{
struct props *p = &this->props;
switch (result.index) {
case 0:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_PropInfo, id,
SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_minLatency),
SPA_PROP_INFO_description, SPA_POD_String("The minimum latency"),
SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->min_latency, 1, INT32_MAX));
break;
case 1:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_PropInfo, id,
SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_maxLatency),
SPA_PROP_INFO_description, SPA_POD_String("The maximum latency"),
SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->max_latency, 1, INT32_MAX));
break;
default:
enum_codec = true;
index_offset = 2;
index_offset = 0;
}
break;
}
case SPA_PARAM_Props:
{
struct props *p = &this->props;
switch (result.index) {
case 0:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_Props, id,
SPA_PROP_minLatency, SPA_POD_Int(p->min_latency),
SPA_PROP_maxLatency, SPA_POD_Int(p->max_latency));
break;
default:
enum_codec = true;
index_offset = 1;
index_offset = 0;
}
break;
}
@ -267,13 +244,38 @@ static int impl_node_enum_params(void *object, int seq,
return 0;
}
static int do_reassing_follower(struct spa_loop *loop,
static int set_timeout(struct impl *this, uint64_t time)
{
struct itimerspec ts;
ts.it_value.tv_sec = time / SPA_NSEC_PER_SEC;
ts.it_value.tv_nsec = time % SPA_NSEC_PER_SEC;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
return spa_system_timerfd_settime(this->data_system,
this->timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL);
}
static int set_timers(struct impl *this)
{
struct timespec now;
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now);
this->next_time = SPA_TIMESPEC_TO_NSEC(&now);
return set_timeout(this, this->following ? 0 : this->next_time);
}
static int do_reassign_follower(struct spa_loop *loop,
bool async,
uint32_t seq,
const void *data,
size_t size,
void *user_data)
{
struct impl *this = user_data;
struct port *port = &this->port;
spa_bt_decode_buffer_recover(&port->buffer);
return 0;
}
@ -309,7 +311,7 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
if (this->started && following != this->following) {
spa_log_debug(this->log, "%p: reassign follower %d->%d", this, this->following, following);
this->following = following;
spa_loop_invoke(this->data_loop, do_reassing_follower, 0, NULL, 0, true, this);
spa_loop_invoke(this->data_loop, do_reassign_follower, 0, NULL, 0, true, this);
}
return 0;
}
@ -324,10 +326,7 @@ static int apply_props(struct impl *this, const struct spa_pod *param)
if (param == NULL) {
reset_props(&new_props);
} else {
spa_pod_parse_object(param,
SPA_TYPE_OBJECT_Props, NULL,
SPA_PROP_minLatency, SPA_POD_OPT_Int(&new_props.min_latency),
SPA_PROP_maxLatency, SPA_POD_OPT_Int(&new_props.max_latency));
/* noop */
}
changed = (memcmp(&new_props, &this->props, sizeof(struct props)) != 0);
@ -372,7 +371,6 @@ static void reset_buffers(struct port *port)
spa_list_init(&port->free);
spa_list_init(&port->ready);
port->current_buffer = NULL;
for (i = 0; i < port->n_buffers; i++) {
struct buffer *b = &port->buffers[i];
@ -449,30 +447,15 @@ static int32_t decode_data(struct impl *this, uint8_t *src, uint32_t src_size,
return dst_size - avail;
}
static void skip_ready_buffers(struct impl *this)
{
struct port *port = &this->port;
/* Move all buffers from ready to free */
while (!spa_list_is_empty(&port->ready)) {
struct buffer *b;
b = spa_list_first(&port->ready, struct buffer, link);
spa_list_remove(&b->link);
spa_list_append(&port->free, &b->link);
spa_assert(!b->outstanding);
this->skip_count += b->buf->datas[0].chunk->size / port->frame_size;
}
}
static void a2dp_on_ready_read(struct spa_source *source)
{
struct impl *this = source->data;
struct port *port = &this->port;
struct spa_io_buffers *io = port->io;
int32_t size_read, decoded, avail;
struct spa_data *datas;
struct buffer *buffer;
uint32_t min_data;
struct timespec now;
void *buf;
int32_t size_read, decoded;
uint32_t avail;
uint64_t dt;
/* make sure the source is an input */
if ((source->rmask & SPA_IO_IN) == 0) {
@ -486,9 +469,6 @@ static void a2dp_on_ready_read(struct spa_source *source)
spa_log_trace(this->log, "socket poll");
/* update the current pts */
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now);
/* read */
size_read = read_data (this);
if (size_read == 0)
@ -497,7 +477,9 @@ static void a2dp_on_ready_read(struct spa_source *source)
spa_log_error(this->log, "failed to read data: %s", spa_strerror(size_read));
goto stop;
}
spa_log_trace(this->log, "read socket data %d", size_read);
/* update the current pts */
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now);
if (this->codec_props_changed && this->codec_props
&& this->codec->update_props) {
@ -505,111 +487,33 @@ static void a2dp_on_ready_read(struct spa_source *source)
this->codec_props_changed = false;
}
/* decode */
decoded = decode_data(this, this->buffer_read, size_read,
this->buffer_decoded, sizeof (this->buffer_decoded));
/* decode to buffer */
buf = spa_bt_decode_buffer_get_write(&port->buffer, &avail);
spa_log_trace(this->log, "read socket data size:%d, avail:%d", size_read, avail);
decoded = decode_data(this, this->buffer_read, size_read, buf, avail);
if (decoded < 0) {
spa_log_debug(this->log, "failed to decode data: %d", decoded);
return;
}
if (decoded == 0)
if (decoded == 0) {
spa_log_trace(this->log, "no decoded socket data");
return;
spa_log_trace(this->log, "decoded socket data %d", decoded);
}
/* discard when not started */
if (!this->started)
return;
/* get buffer */
if (!port->current_buffer) {
if (spa_list_is_empty(&port->free)) {
/* xrun, skip ahead */
skip_ready_buffers(this);
this->skip_count += decoded / port->frame_size;
this->sample_count += decoded / port->frame_size;
return;
}
if (this->skip_count > 0) {
spa_log_info(this->log, "%p: xrun, skipped %"PRIu64" usec",
this, (uint64_t)(this->skip_count * SPA_USEC_PER_SEC / port->current_format.info.raw.rate));
this->skip_count = 0;
}
spa_bt_decode_buffer_write_packet(&port->buffer, decoded);
buffer = spa_list_first(&port->free, struct buffer, link);
spa_list_remove(&buffer->link);
dt = SPA_TIMESPEC_TO_NSEC(&this->now);
this->now = now;
dt = SPA_TIMESPEC_TO_NSEC(&this->now) - dt;
port->current_buffer = buffer;
port->ready_offset = 0;
spa_log_trace(this->log, "dequeue %d", buffer->id);
spa_log_trace(this->log, "decoded socket data size:%d frames:%d dt:%d dms",
(int)decoded, (int)decoded/port->frame_size,
(int)(dt / 100000));
if (buffer->h) {
buffer->h->seq = this->sample_count;
buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now);
buffer->h->dts_offset = 0;
}
} else {
buffer = port->current_buffer;
}
datas = buffer->buf->datas;
/* copy data into buffer */
avail = SPA_MIN(decoded, (int32_t)(datas[0].maxsize - port->ready_offset));
if (avail < decoded)
spa_log_warn(this->log, "buffer too small (%d > %d)", decoded, avail);
memcpy ((uint8_t *)datas[0].data + port->ready_offset, this->buffer_decoded, avail);
port->ready_offset += avail;
this->sample_count += decoded / port->frame_size;
/* send buffer if full */
min_data = SPA_MIN(this->props.min_latency * port->frame_size, datas[0].maxsize / 2);
if (port->ready_offset >= min_data) {
uint64_t sample_count;
datas[0].chunk->offset = 0;
datas[0].chunk->size = port->ready_offset;
datas[0].chunk->stride = port->frame_size;
sample_count = datas[0].chunk->size / port->frame_size;
spa_log_trace(this->log, "queue %d", buffer->id);
spa_list_append(&port->ready, &buffer->link);
port->current_buffer = NULL;
if (!this->following && this->clock) {
this->clock->nsec = SPA_TIMESPEC_TO_NSEC(&this->now);
this->clock->duration = sample_count * this->clock->rate.denom / port->current_format.info.raw.rate;
this->clock->position = this->sample_count * this->clock->rate.denom / port->current_format.info.raw.rate;
this->clock->delay = 0;
this->clock->rate_diff = 1.0f;
this->clock->next_nsec = this->clock->nsec + (uint64_t)sample_count * SPA_NSEC_PER_SEC / port->current_format.info.raw.rate;
}
}
/* done if there are no buffers ready */
if (spa_list_is_empty(&port->ready))
return;
if (this->following)
return;
/* process the buffer if IO does not have any */
if (io != NULL && io->status != SPA_STATUS_HAVE_DATA) {
struct buffer *b;
if (io->buffer_id < port->n_buffers)
recycle_buffer(this, port, io->buffer_id);
b = spa_list_first(&port->ready, struct buffer, link);
spa_list_remove(&b->link);
b->outstanding = true;
io->buffer_id = b->id;
io->status = SPA_STATUS_HAVE_DATA;
}
/* notify ready */
spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA);
return;
stop:
@ -641,6 +545,75 @@ static void a2dp_on_duplex_timeout(struct spa_source *source)
a2dp_on_ready_read(source);
}
static int setup_matching(struct impl *this)
{
struct port *port = &this->port;
if (this->position && port->rate_match) {
port->rate_match->rate = 1 / port->buffer.corr;
this->matching = this->following;
this->resampling = this->matching ||
(port->current_format.info.raw.rate != this->position->clock.rate.denom);
} else {
this->matching = false;
this->resampling = false;
}
if (port->rate_match)
SPA_FLAG_UPDATE(port->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE, this->matching);
return 0;
}
static void a2dp_on_timeout(struct spa_source *source)
{
struct impl *this = source->data;
struct port *port = &this->port;
uint64_t exp, duration;
uint32_t rate;
struct spa_io_buffers *io = port->io;
uint64_t prev_time, now_time;
if (this->transport == NULL)
return;
if (this->started && spa_system_timerfd_read(this->data_system, this->timerfd, &exp) < 0)
spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno));
prev_time = this->current_time;
now_time = this->current_time = this->next_time;
spa_log_trace(this->log, "%p: timer %"PRIu64" %"PRIu64"", this,
now_time, now_time - prev_time);
if (SPA_LIKELY(this->position)) {
duration = this->position->clock.duration;
rate = this->position->clock.rate.denom;
} else {
duration = 1024;
rate = 48000;
}
setup_matching(this);
this->next_time = now_time + duration * SPA_NSEC_PER_SEC / port->buffer.corr / rate;
if (SPA_LIKELY(this->clock)) {
this->clock->nsec = now_time;
this->clock->position += duration;
this->clock->duration = duration;
this->clock->rate_diff = port->buffer.corr;
this->clock->next_nsec = this->next_time;
}
spa_log_trace(this->log, "%p: %d", this, io->status);
io->status = SPA_STATUS_HAVE_DATA;
spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA);
set_timeout(this, this->next_time);
}
static int transport_start(struct impl *this)
{
int res, val;
@ -683,7 +656,13 @@ static int transport_start(struct impl *this)
if (setsockopt(this->transport->fd, SOL_SOCKET, SO_PRIORITY, &val, sizeof(val)) < 0)
spa_log_warn(this->log, "SO_PRIORITY failed: %m");
reset_buffers(&this->port);
reset_buffers(port);
spa_bt_decode_buffer_clear(&port->buffer);
if ((res = spa_bt_decode_buffer_init(&port->buffer, this->log,
port->frame_size, port->current_format.info.raw.rate,
this->quantum_limit, this->quantum_limit)) < 0)
return res;
this->fd = this->transport->fd;
@ -716,8 +695,18 @@ static int transport_start(struct impl *this)
set_duplex_timeout(this, this->duplex_timeout);
}
this->timer_source.data = this;
this->timer_source.fd = this->timerfd;
this->timer_source.func = a2dp_on_timeout;
this->timer_source.mask = SPA_IO_IN;
this->timer_source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->timer_source);
this->sample_count = 0;
this->skip_count = 0;
setup_matching(this);
set_timers(this);
return 0;
}
@ -753,6 +742,7 @@ static int do_remove_source(struct spa_loop *loop,
void *user_data)
{
struct impl *this = user_data;
struct itimerspec ts;
spa_log_debug(this->log, "%p: remove source", this);
@ -761,11 +751,20 @@ static int do_remove_source(struct spa_loop *loop,
if (this->source.loop)
spa_loop_remove_source(this->data_loop, &this->source);
if (this->timer_source.loop)
spa_loop_remove_source(this->data_loop, &this->timer_source);
ts.it_value.tv_sec = 0;
ts.it_value.tv_nsec = 0;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
spa_system_timerfd_settime(this->data_system, this->timerfd, 0, &ts, NULL);
return 0;
}
static int transport_stop(struct impl *this)
{
struct port *port = &this->port;
int res;
spa_log_debug(this->log, "%p: transport stop", this);
@ -783,6 +782,8 @@ static int transport_stop(struct impl *this)
this->codec->deinit(this->codec_data);
this->codec_data = NULL;
spa_bt_decode_buffer_clear(&port->buffer);
return res;
}
@ -836,24 +837,20 @@ static int impl_node_send_command(void *object, const struct spa_command *comman
static void emit_node_info(struct impl *this, bool full)
{
char latency[64] = SPA_STRINGIFY(MIN_LATENCY)"/48000";
uint64_t old = full ? this->info.change_mask : 0;
struct spa_dict_item node_info_items[] = {
{ SPA_KEY_DEVICE_API, "bluez5" },
{ SPA_KEY_MEDIA_CLASS, this->is_input ? "Audio/Source" : "Stream/Output/Audio" },
{ SPA_KEY_NODE_LATENCY, latency },
{ SPA_KEY_NODE_LATENCY, this->is_input ? "" : "512/48000" },
{ "media.name", ((this->transport && this->transport->device->name) ?
this->transport->device->name : "A2DP") },
{ SPA_KEY_NODE_DRIVER, this->is_input ? "true" : "false" },
this->transport->device->name : "A2DP") },
{ SPA_KEY_NODE_DRIVER, this->is_input ? "true" : "false" },
};
if (full)
this->info.change_mask = this->info_all;
if (this->info.change_mask) {
if (this->transport && this->port.have_format)
snprintf(latency, sizeof(latency), "%d/%d", (int)this->props.min_latency,
(int)this->port.current_format.info.raw.rate);
this->info.props = &SPA_DICT_INIT_ARRAY(node_info_items);
spa_node_emit_info(&this->hooks, &this->info);
this->info.change_mask = old;
@ -991,11 +988,11 @@ impl_node_port_enum_params(void *object, int seq,
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamBuffers, id,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(8, 8, MAX_BUFFERS),
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 1, MAX_BUFFERS),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
this->props.max_latency * port->frame_size,
this->props.min_latency * port->frame_size,
this->quantum_limit * port->frame_size,
16 * port->frame_size,
INT32_MAX),
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->frame_size));
break;
@ -1021,6 +1018,12 @@ impl_node_port_enum_params(void *object, int seq,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)));
break;
case 1:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamIO, id,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_RateMatch),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_rate_match)));
break;
default:
return 0;
}
@ -1059,7 +1062,6 @@ static int clear_buffers(struct impl *this, struct port *port)
spa_list_init(&port->ready);
port->n_buffers = 0;
}
port->current_buffer = NULL;
return 0;
}
@ -1217,6 +1219,9 @@ impl_node_port_set_io(void *object,
case SPA_IO_Buffers:
port->io = data;
break;
case SPA_IO_RateMatch:
port->rate_match = data;
break;
default:
return -ENOENT;
}
@ -1244,6 +1249,88 @@ static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t
return 0;
}
static uint32_t get_samples(struct impl *this, uint32_t *duration)
{
struct port *port = &this->port;
uint32_t samples;
if (SPA_LIKELY(port->rate_match) && this->resampling) {
samples = port->rate_match->size;
} else {
if (SPA_LIKELY(this->position))
samples = this->position->clock.duration * port->current_format.info.raw.rate
/ this->position->clock.rate.denom;
else
samples = 1024;
}
if (SPA_LIKELY(this->position))
*duration = this->position->clock.duration * port->current_format.info.raw.rate
/ this->position->clock.rate.denom;
else if (SPA_LIKELY(this->clock))
*duration = this->clock->duration * port->current_format.info.raw.rate
/ this->clock->rate.denom;
else
*duration = 1024 * port->current_format.info.raw.rate / 48000;
return samples;
}
static void process_buffering(struct impl *this)
{
struct port *port = &this->port;
uint32_t duration;
const uint32_t samples = get_samples(this, &duration);
uint32_t avail;
void *buf;
spa_bt_decode_buffer_process(&port->buffer, samples, duration);
setup_matching(this);
buf = spa_bt_decode_buffer_get_read(&port->buffer, &avail);
/* copy data to buffers */
if (!spa_list_is_empty(&port->free) && avail > 0) {
struct buffer *buffer;
struct spa_data *datas;
uint32_t data_size;
data_size = samples * port->frame_size;
avail = SPA_MIN(avail, data_size);
spa_bt_decode_buffer_read(&port->buffer, avail);
buffer = spa_list_first(&port->free, struct buffer, link);
spa_list_remove(&buffer->link);
spa_log_trace(this->log, "dequeue %d", buffer->id);
if (buffer->h) {
buffer->h->seq = this->sample_count;
buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now);
buffer->h->dts_offset = 0;
}
datas = buffer->buf->datas;
spa_assert(datas[0].maxsize >= data_size);
datas[0].chunk->offset = 0;
datas[0].chunk->size = avail;
datas[0].chunk->stride = port->frame_size;
memcpy(datas[0].data, buf, avail);
this->sample_count += avail / port->frame_size;
/* ready buffer if full */
spa_log_trace(this->log, "queue %d frames:%d", buffer->id, (int)avail / port->frame_size);
spa_list_append(&port->ready, &buffer->link);
}
}
static int impl_node_process(void *object)
{
struct impl *this = object;
@ -1269,6 +1356,9 @@ static int impl_node_process(void *object)
io->buffer_id = SPA_ID_INVALID;
}
/* Handle buffering delay */
process_buffering(this);
/* Return if there are no buffers ready to be processed */
if (spa_list_is_empty(&port->ready))
return SPA_STATUS_OK;
@ -1350,16 +1440,19 @@ static int impl_get_interface(struct spa_handle *handle, const char *type, void
static int impl_clear(struct spa_handle *handle)
{
struct impl *this = (struct impl *) handle;
struct port *port = &this->port;
if (this->codec_data)
this->codec->deinit(this->codec_data);
if (this->codec_props && this->codec->clear_props)
this->codec->clear_props(this->codec_props);
if (this->transport)
spa_hook_remove(&this->transport_listener);
spa_system_close(this->data_system, this->timerfd);
if (this->duplex_timerfd >= 0) {
spa_system_close(this->data_system, this->duplex_timerfd);
this->duplex_timerfd = -1;
}
spa_bt_decode_buffer_clear(&port->buffer);
return 0;
}
@ -1451,7 +1544,11 @@ impl_init(const struct spa_handle_factory *factory,
spa_list_init(&port->ready);
spa_list_init(&port->free);
this->quantum_limit = 8192;
if (info != NULL) {
if (info && (str = spa_dict_lookup(info, "clock.quantum-limit")))
spa_atou32(str, &this->quantum_limit, 0);
if ((str = spa_dict_lookup(info, SPA_KEY_API_BLUEZ5_TRANSPORT)) != NULL)
sscanf(str, "pointer:%p", &this->transport);
if ((str = spa_dict_lookup(info, "bluez5.a2dp-source-role")) != NULL)
@ -1486,6 +1583,9 @@ impl_init(const struct spa_handle_factory *factory,
spa_bt_transport_add_listener(this->transport,
&this->transport_listener, &transport_events, this);
this->timerfd = spa_system_timerfd_create(this->data_system,
CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK);
if (this->is_duplex) {
this->duplex_timerfd = spa_system_timerfd_create(this->data_system,
CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK);

View file

@ -0,0 +1,381 @@
/* Spa Bluez5 decode buffer
*
* Copyright © 2022 Pauli Virtanen
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice (including the next
* paragraph) shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*/
/**
* \file decode-buffer.h Buffering for Bluetooth sources
*
* A linear buffer, which is compacted when it gets half full.
*
* Also contains buffering logic, which calculates a rate correction
* factor to maintain the buffer level at the target value.
*
* Consider typical packet intervals with nominal frame duration
* of 10ms:
*
* ... 5ms | 5ms | 20ms | 5ms | 5ms | 20ms ...
*
* ... 3ms | 3ms | 4ms | 30ms | 3ms | 3ms | 4ms | 30ms ...
*
* plus random jitter; 10ms nominal may occasionally have 20+ms interval.
* The regular timer cycle cannot be aligned with this, so process()
* may occur at any time.
*
* The buffer level is the difference between the number of samples in
* buffer immediately after receiving a packet, and the samples consumed
* before receiving the next packet.
*
* The buffer level indicates how much any packet can be delayed without
* underrun. If it is positive, there are no underruns.
*
* The rate correction aims to maintain the average level at a safety margin.
*/
#ifndef SPA_BLUEZ5_DECODE_BUFFER_H
#define SPA_BLUEZ5_DECODE_BUFFER_H
#include <stdlib.h>
#include <spa/utils/defs.h>
#include <spa/utils/dll.h>
#include <spa/support/log.h>
#define BUFFERING_LONG_MSEC 60000
#define BUFFERING_SHORT_MSEC 1000
#define BUFFERING_DLL_BW 0.03
#define BUFFERING_RATE_DIFF_MAX 0.005
/**
* Safety margin.
*
* The spike is the long-window maximum difference
* between minimum and average buffer level.
*/
#define BUFFERING_TARGET(spike,packet_size) \
SPA_CLAMP((spike)*3/2, (packet_size), 6*(packet_size))
/** Windowed min/max */
struct spa_bt_ptp
{
union {
int32_t min;
int32_t mins[4];
};
union {
int32_t max;
int32_t maxs[4];
};
uint32_t pos;
uint32_t period;
};
struct spa_bt_decode_buffer
{
struct spa_log *log;
uint32_t frame_size;
uint32_t rate;
uint8_t *buffer_decoded;
uint32_t buffer_size;
uint32_t buffer_reserve;
uint32_t write_index;
uint32_t read_index;
struct spa_bt_ptp spike; /**< spikes (long window) */
struct spa_bt_ptp packet_size; /**< packet size (short window) */
int32_t target;
int32_t level;
double level_avg;
struct spa_dll dll;
double corr;
uint32_t prev_consumed;
uint32_t prev_avail;
uint32_t prev_duration;
uint32_t underrun;
uint32_t pos;
uint8_t received:1;
uint8_t buffering:1;
};
static void spa_bt_ptp_init(struct spa_bt_ptp *p, int32_t period)
{
size_t i;
spa_zero(*p);
for (i = 0; i < SPA_N_ELEMENTS(p->mins); ++i) {
p->mins[i] = INT32_MAX;
p->maxs[i] = INT32_MIN;
}
p->period = period;
}
static void spa_bt_ptp_update(struct spa_bt_ptp *p, int32_t value, uint32_t duration)
{
const size_t n = SPA_N_ELEMENTS(p->mins);
size_t i;
for (i = 0; i < n; ++i) {
p->mins[i] = SPA_MIN(p->mins[i], value);
p->maxs[i] = SPA_MAX(p->maxs[i], value);
}
p->pos += duration;
if (p->pos >= p->period / (n - 1)) {
p->pos = 0;
for (i = 1; i < SPA_N_ELEMENTS(p->mins); ++i) {
p->mins[i-1] = p->mins[i];
p->maxs[i-1] = p->maxs[i];
}
p->mins[n-1] = INT32_MAX;
p->maxs[n-1] = INT32_MIN;
}
}
static int spa_bt_decode_buffer_init(struct spa_bt_decode_buffer *this, struct spa_log *log,
uint32_t frame_size, uint32_t rate, uint32_t quantum_limit, uint32_t reserve)
{
spa_zero(*this);
this->frame_size = frame_size;
this->rate = rate;
this->log = log;
this->buffer_reserve = this->frame_size * reserve;
this->buffer_size = this->frame_size * quantum_limit * 2;
this->buffer_size += this->buffer_reserve;
this->corr = 1.0;
this->buffering = true;
spa_dll_init(&this->dll);
spa_bt_ptp_init(&this->spike, (uint64_t)this->rate * BUFFERING_LONG_MSEC / 1000);
spa_bt_ptp_init(&this->packet_size, (uint64_t)this->rate * BUFFERING_SHORT_MSEC / 1000);
if ((this->buffer_decoded = malloc(this->buffer_size)) == NULL) {
this->buffer_size = 0;
return -ENOMEM;
}
return 0;
}
static void spa_bt_decode_buffer_clear(struct spa_bt_decode_buffer *this)
{
free(this->buffer_decoded);
spa_zero(*this);
}
static void spa_bt_decode_buffer_compact(struct spa_bt_decode_buffer *this)
{
uint32_t avail;
spa_assert(this->read_index <= this->write_index);
if (this->read_index == this->write_index) {
this->read_index = 0;
this->write_index = 0;
goto done;
}
if (this->write_index > this->read_index + this->buffer_size - this->buffer_reserve) {
/* Drop data to keep buffer reserve free */
spa_log_info(this->log, "%p buffer overrun: dropping data", this);
this->read_index = this->write_index + this->buffer_reserve - this->buffer_size;
}
if (this->write_index < (this->buffer_size - this->buffer_reserve) / 2
|| this->read_index == 0)
goto done;
avail = this->write_index - this->read_index;
spa_memmove(this->buffer_decoded,
SPA_PTROFF(this->buffer_decoded, this->read_index, void),
avail);
this->read_index = 0;
this->write_index = avail;
done:
spa_assert(this->buffer_size - this->write_index >= this->buffer_reserve);
}
static void *spa_bt_decode_buffer_get_write(struct spa_bt_decode_buffer *this, uint32_t *avail)
{
spa_bt_decode_buffer_compact(this);
spa_assert(this->buffer_size >= this->write_index);
*avail = this->buffer_size - this->write_index;
return SPA_PTROFF(this->buffer_decoded, this->write_index, void);
}
static void spa_bt_decode_buffer_write_packet(struct spa_bt_decode_buffer *this, uint32_t size)
{
spa_assert(size % this->frame_size == 0);
this->write_index += size;
this->received = true;
spa_bt_ptp_update(&this->packet_size, size / this->frame_size, size / this->frame_size);
}
static void *spa_bt_decode_buffer_get_read(struct spa_bt_decode_buffer *this, uint32_t *avail)
{
spa_assert(this->write_index >= this->read_index);
if (!this->buffering)
*avail = this->write_index - this->read_index;
else
*avail = 0;
return SPA_PTROFF(this->buffer_decoded, this->read_index, void);
}
static void spa_bt_decode_buffer_read(struct spa_bt_decode_buffer *this, uint32_t size)
{
spa_assert(size % this->frame_size == 0);
this->read_index += size;
}
static void spa_bt_decode_buffer_recover(struct spa_bt_decode_buffer *this)
{
int32_t size = (this->write_index - this->read_index) / this->frame_size;
this->prev_avail = size * this->frame_size;
this->prev_consumed = this->prev_duration;
this->level = (int32_t)this->prev_avail/this->frame_size
- (int32_t)this->prev_duration;
this->level_avg = this->level;
this->target = this->level;
this->corr = 1.0;
spa_dll_init(&this->dll);
}
static void spa_bt_decode_buffer_process(struct spa_bt_decode_buffer *this, uint32_t samples, uint32_t duration)
{
const uint32_t data_size = samples * this->frame_size;
const int32_t max_level = SPA_MAX(8 * this->packet_size.max, (int32_t)duration);
uint32_t avail;
if (SPA_UNLIKELY(duration != this->prev_duration)) {
this->prev_duration = duration;
spa_bt_decode_buffer_recover(this);
}
if (SPA_UNLIKELY(this->buffering)) {
int32_t size = (this->write_index - this->read_index) / this->frame_size;
this->corr = 1.0;
spa_log_trace(this->log, "%p buffering size:%d", this, (int)size);
if (this->received &&
this->packet_size.max > 0 &&
size >= SPA_MAX(3*this->packet_size.max, (int32_t)duration))
this->buffering = false;
else
return;
spa_bt_decode_buffer_recover(this);
}
if (SPA_UNLIKELY(this->dll.bw == 0.0)) {
spa_log_trace(this->log, "%p dll reset duration:%d rate:%d", this,
(int)duration, (int)this->rate);
spa_dll_set_bw(&this->dll, BUFFERING_DLL_BW, duration, (uint64_t)this->rate);
}
spa_bt_decode_buffer_get_read(this, &avail);
if (this->received) {
const uint32_t avg_period = (uint64_t)this->rate * BUFFERING_SHORT_MSEC / 1000;
int32_t level, target;
/* Track buffer level */
level = (int32_t)(this->prev_avail/this->frame_size) - (int32_t)this->prev_consumed;
level = SPA_MAX(level, -max_level);
this->prev_consumed = SPA_MIN(this->prev_consumed, avg_period);
this->level_avg = ((double)this->prev_consumed*level
+ ((double)avg_period - this->prev_consumed)*this->level_avg) / avg_period;
spa_bt_ptp_update(&this->spike, this->level_avg - level, this->prev_consumed);
/* Update target level */
target = BUFFERING_TARGET(this->spike.max, this->packet_size.max);
if (level > SPA_MAX(4 * target, 2*(int32_t)duration) &&
avail > data_size) {
/* Lagging too much: drop data */
uint32_t size = SPA_MIN(avail - data_size,
(level - target*5/2) * this->frame_size);
spa_bt_decode_buffer_read(this, size);
spa_log_trace(this->log, "%p overrun samples:%d level:%d target:%d",
this, (int)size/this->frame_size,
(int)level, (int)target);
spa_bt_decode_buffer_recover(this);
}
this->pos += this->prev_consumed;
if (this->pos > this->rate) {
spa_log_debug(this->log,
"%p avg:%d target:%d level:%d buffer:%d spike:%d corr:%f",
this,
(int)this->level_avg,
(int)target,
(int)level,
(int)(avail / this->frame_size),
(int)this->spike.max,
(double)this->corr);
this->pos = 0;
}
spa_bt_decode_buffer_get_read(this, &avail);
this->prev_consumed = 0;
this->prev_avail = avail;
this->underrun = 0;
this->received = false;
this->level = level;
this->target = target;
}
this->corr = spa_dll_update(&this->dll, this->target - this->level);
if (SPA_ABS(this->corr - 1.0) > BUFFERING_RATE_DIFF_MAX) {
spa_log_trace(this->log, "%p too big rate difference: clamp + reset", this);
spa_dll_init(&this->dll);
this->corr = SPA_CLAMP(this->corr, 1.0 - BUFFERING_RATE_DIFF_MAX,
1.0 + BUFFERING_RATE_DIFF_MAX);
}
if (avail < data_size) {
spa_log_trace(this->log, "%p underrun samples:%d", this,
(data_size - avail) / this->frame_size);
this->underrun += samples;
if (this->underrun >= SPA_MIN((uint32_t)max_level, this->buffer_size / this->frame_size)) {
this->buffering = true;
spa_log_debug(this->log, "%p underrun too much: start buffering", this);
}
}
this->prev_consumed += samples;
}
#endif