From becd76de58c3d1f687b81dc7c6fd1a8f5425b54f Mon Sep 17 00:00:00 2001 From: George Kiagiadakis Date: Fri, 10 Sep 2021 11:57:49 +0300 Subject: [PATCH] bluez5: sco-sink: Add follower mode --- spa/plugins/bluez5/sco-sink.c | 138 ++++++++++++++++++++-------------- 1 file changed, 80 insertions(+), 58 deletions(-) diff --git a/spa/plugins/bluez5/sco-sink.c b/spa/plugins/bluez5/sco-sink.c index 54a01e291..ec81e97af 100644 --- a/spa/plugins/bluez5/sco-sink.c +++ b/spa/plugins/bluez5/sco-sink.c @@ -130,6 +130,7 @@ struct impl { /* Flags */ unsigned int started:1; + unsigned int following:1; /* Sources */ struct spa_source source; @@ -243,18 +244,20 @@ static int impl_node_enum_params(void *object, int seq, return 0; } -static void set_timeout(struct impl *this, uint64_t timeout) +static int set_timeout(struct impl *this, uint64_t time) { struct itimerspec ts; - - ts.it_value.tv_sec = timeout / SPA_NSEC_PER_SEC; - ts.it_value.tv_nsec = timeout % SPA_NSEC_PER_SEC; + ts.it_value.tv_sec = time / SPA_NSEC_PER_SEC; + ts.it_value.tv_nsec = time % SPA_NSEC_PER_SEC; ts.it_interval.tv_sec = 0; ts.it_interval.tv_nsec = 0; + return spa_system_timerfd_settime(this->data_system, + this->timerfd, 0, &ts, NULL); +} - spa_system_timerfd_settime(this->data_system, this->timerfd, 0, &ts, NULL); - this->source.mask = SPA_IO_IN; - spa_loop_update_source(this->data_loop, &this->source); +static int set_timers(struct impl *this) +{ + return set_timeout(this, this->following ? 0 : 1); } static uint64_t get_next_timeout(struct impl *this, uint64_t now_time, uint64_t processed_samples) @@ -273,11 +276,29 @@ static uint64_t get_next_timeout(struct impl *this, uint64_t now_time, uint64_t return next_time; } +static int do_reassign_follower(struct spa_loop *loop, + bool async, + uint32_t seq, + const void *data, + size_t size, + void *user_data) +{ + struct impl *this = user_data; + set_timers(this); + return 0; +} + +static inline bool is_following(struct impl *this) +{ + return this->position && this->clock && this->position->clock.id != this->clock->id; +} + static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) { struct impl *this = object; + bool following; - spa_return_val_if_fail(object != NULL, -EINVAL); + spa_return_val_if_fail(this != NULL, -EINVAL); switch (id) { case SPA_IO_Clock: @@ -290,6 +311,12 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) return -ENOENT; } + following = is_following(this); + if (this->started && following != this->following) { + spa_log_debug(this->log, NAME " %p: reassign follower %d->%d", this, this->following, following); + this->following = following; + spa_loop_invoke(this->data_loop, do_reassign_follower, 0, NULL, 0, true, this); + } return 0; } @@ -343,61 +370,50 @@ static void flush_data(struct impl *this) struct port *port = &this->port; struct spa_data *datas; uint64_t next_timeout = 1; - uint32_t min_in_size; - uint8_t *packet; + const uint32_t min_in_size = + (this->transport->codec == HFP_AUDIO_CODEC_MSBC) ? + MSBC_DECODED_SIZE : this->transport->write_mtu; + uint8_t * const packet = + (this->transport->codec == HFP_AUDIO_CODEC_MSBC) ? + this->buffer_head : port->write_buffer; if (this->transport == NULL || this->transport->sco_io == NULL) return; - /* get buffer */ - if (!port->current_buffer) { - spa_return_if_fail(!spa_list_is_empty(&port->ready)); - port->current_buffer = spa_list_first(&port->ready, struct buffer, link); - port->ready_offset = 0; - } - datas = port->current_buffer->buf->datas; + if (!spa_list_is_empty(&port->ready)) { + /* get buffer */ + if (!port->current_buffer) { + spa_return_if_fail(!spa_list_is_empty(&port->ready)); + port->current_buffer = spa_list_first(&port->ready, struct buffer, link); + port->ready_offset = 0; + } + datas = port->current_buffer->buf->datas; - if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) { - min_in_size = MSBC_DECODED_SIZE; - packet = this->buffer_head; - } else { - min_in_size = this->transport->write_mtu; - packet = port->write_buffer; - } + /* if buffer has data, copy it into the write buffer */ + if (datas[0].chunk->size - port->ready_offset > 0) { + const uint32_t avail = + SPA_MIN(min_in_size, datas[0].chunk->size - port->ready_offset); + const uint32_t size = + (avail + port->write_buffer_size) > min_in_size ? + min_in_size - port->write_buffer_size : avail; + memcpy(port->write_buffer + port->write_buffer_size, + (uint8_t *)datas[0].data + port->ready_offset, + size); + port->write_buffer_size += size; + port->ready_offset += size; + } else { + struct buffer *b; - /* if buffer has data, copy it into the write buffer */ - if (datas[0].chunk->size - port->ready_offset > 0) { - uint32_t avail = SPA_MIN(min_in_size, datas[0].chunk->size - port->ready_offset); - uint32_t size = (avail + port->write_buffer_size) > min_in_size ? min_in_size - port->write_buffer_size : avail; - memcpy(port->write_buffer + port->write_buffer_size, - (uint8_t *)datas[0].data + port->ready_offset, - size); - port->write_buffer_size += size; - port->ready_offset += size; - } + b = port->current_buffer; + port->current_buffer = NULL; - /* otherwise request a new buffer */ - else { - struct buffer *b; - - b = port->current_buffer; - port->current_buffer = NULL; - - /* reuse buffer */ - spa_list_remove(&b->link); - b->outstanding = true; - spa_log_trace(this->log, "sco-sink %p: reuse buffer %u", this, b->id); - port->io->buffer_id = b->id; - spa_node_call_reuse_buffer(&this->callbacks, 0, b->id); - - /* notify we need more data */ - port->io->status = SPA_STATUS_NEED_DATA; - spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_DATA); - - next_timeout = (this->transport->write_mtu / port->frame_size - * SPA_NSEC_PER_SEC / port->current_format.info.raw.rate); - set_timeout(this, next_timeout); - return; + /* reuse buffer */ + spa_list_remove(&b->link); + b->outstanding = true; + spa_log_trace(this->log, "sco-sink %p: reuse buffer %u", this, b->id); + port->io->buffer_id = b->id; + spa_node_call_reuse_buffer(&this->callbacks, 0, b->id); + } } /* send the data if the write buffer is full */ @@ -476,6 +492,8 @@ static void flush_data(struct impl *this) } } + spa_log_trace(this->log, "write socket data %d", written); + next_timeout = get_next_timeout(this, now_time, processed / port->frame_size); if (this->clock) { @@ -510,7 +528,7 @@ static void sco_on_timeout(struct spa_source *source) spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno)); /* delay if no buffers available */ - if (spa_list_is_empty(&port->ready)) { + if (!this->following && spa_list_is_empty(&port->ready)) { set_timeout(this, this->transport->write_mtu / port->frame_size * SPA_NSEC_PER_SEC / port->current_format.info.raw.rate); port->io->status = SPA_STATUS_NEED_DATA; spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_DATA); @@ -547,6 +565,10 @@ static int do_start(struct impl *this) /* Make sure the transport is valid */ spa_return_val_if_fail(this->transport != NULL, -EIO); + this->following = is_following(this); + + spa_log_debug(this->log, NAME " %p: start following:%d", this, this->following); + /* Do accept if Gateway; otherwise do connect for Head Unit */ do_accept = this->transport->profile & SPA_BT_PROFILE_HEADSET_AUDIO_GATEWAY; @@ -586,7 +608,7 @@ static int do_start(struct impl *this) spa_loop_add_source(this->data_loop, &this->source); /* start processing */ - set_timeout(this, 1); + set_timers(this); /* Set the started flag */ this->started = true;