diff --git a/spa/plugins/bluez5/sco-sink.c b/spa/plugins/bluez5/sco-sink.c index d92bada2a..a80abfca3 100644 --- a/spa/plugins/bluez5/sco-sink.c +++ b/spa/plugins/bluez5/sco-sink.c @@ -60,14 +60,10 @@ static struct spa_log_topic log_topic = SPA_LOG_TOPIC(0, "spa.bluez5.sink.sco"); #define DEFAULT_CLOCK_NAME "clock.system.monotonic" struct props { - uint32_t min_latency; - uint32_t max_latency; char clock_name[64]; }; #define MAX_BUFFERS 32 -#define MIN_LATENCY 512 -#define MAX_LATENCY 1024 struct buffer { uint32_t id; @@ -130,6 +126,8 @@ struct impl { struct spa_param_info params[N_NODE_PARAMS]; struct props props; + uint32_t quantum_limit; + /* Transport */ struct spa_bt_transport *transport; struct spa_hook transport_listener; @@ -140,38 +138,39 @@ struct impl { /* Flags */ unsigned int started:1; unsigned int following:1; + unsigned int flush_pending:1; /* Sources */ struct spa_source source; + struct spa_source flush_timer_source; /* Timer */ int timerfd; - struct timespec now; + int flush_timerfd; struct spa_io_clock *clock; struct spa_io_position *position; + uint64_t current_time; + uint64_t next_time; + uint64_t process_time; + uint64_t prev_flush_time; + uint64_t next_flush_time; + /* mSBC */ sbc_t msbc; uint8_t *buffer; uint8_t *buffer_head; uint8_t *buffer_next; int buffer_size; - - /* Times */ - uint64_t start_time; - uint64_t total_samples; + int msbc_seq; }; #define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) == 0) -static const uint32_t default_min_latency = MIN_LATENCY; -static const uint32_t default_max_latency = MAX_LATENCY; static const char sntable[4] = { 0x08, 0x38, 0xC8, 0xF8 }; static void reset_props(struct props *props) { - props->min_latency = default_min_latency; - props->max_latency = default_max_latency; strncpy(props->clock_name, DEFAULT_CLOCK_NAME, sizeof(props->clock_name)); } @@ -199,23 +198,7 @@ static int impl_node_enum_params(void *object, int seq, switch (id) { case SPA_PARAM_PropInfo: { - struct props *p = &this->props; - switch (result.index) { - case 0: - param = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_PropInfo, id, - SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_minLatency), - SPA_PROP_INFO_description, SPA_POD_String("The minimum latency"), - SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->min_latency, 1, INT32_MAX)); - break; - case 1: - param = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_PropInfo, id, - SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_maxLatency), - SPA_PROP_INFO_description, SPA_POD_String("The maximum latency"), - SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->max_latency, 1, INT32_MAX)); - break; default: return 0; } @@ -223,14 +206,10 @@ static int impl_node_enum_params(void *object, int seq, } case SPA_PARAM_Props: { - struct props *p = &this->props; - switch (result.index) { case 0: param = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_Props, id, - SPA_PROP_minLatency, SPA_POD_Int(p->min_latency), - SPA_PROP_maxLatency, SPA_POD_Int(p->max_latency)); + SPA_TYPE_OBJECT_Props, id); break; default: return 0; @@ -260,28 +239,17 @@ static int set_timeout(struct impl *this, uint64_t time) ts.it_interval.tv_sec = 0; ts.it_interval.tv_nsec = 0; return spa_system_timerfd_settime(this->data_system, - this->timerfd, 0, &ts, NULL); + this->timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL); } static int set_timers(struct impl *this) { - return set_timeout(this, this->following ? 0 : 1); -} + struct timespec now; -static uint64_t get_next_timeout(struct impl *this, uint64_t now_time, uint64_t processed_samples) -{ - struct port *port = &this->port; - uint64_t playback_time = 0, elapsed_time = 0, next_time = 0; + spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now); + this->next_time = SPA_TIMESPEC_TO_NSEC(&now); - this->total_samples += processed_samples; - - playback_time = (this->total_samples * SPA_NSEC_PER_SEC) / port->current_format.info.raw.rate; - if (now_time > this->start_time) - elapsed_time = now_time - this->start_time; - if (elapsed_time < playback_time) - next_time = playback_time - elapsed_time; - - return next_time; + return set_timeout(this, this->following ? 0 : this->next_time); } static int do_reassign_follower(struct spa_loop *loop, @@ -344,9 +312,7 @@ static int apply_props(struct impl *this, const struct spa_pod *param) reset_props(&new_props); } else { spa_pod_parse_object(param, - SPA_TYPE_OBJECT_Props, NULL, - SPA_PROP_minLatency, SPA_POD_OPT_Int(&new_props.min_latency), - SPA_PROP_maxLatency, SPA_POD_OPT_Int(&new_props.max_latency)); + SPA_TYPE_OBJECT_Props, NULL); } changed = (memcmp(&new_props, &this->props, sizeof(struct props)) != 0); @@ -378,22 +344,64 @@ static int impl_node_set_param(void *object, uint32_t id, uint32_t flags, return 0; } +static void enable_flush_timer(struct impl *this, bool enabled) +{ + struct itimerspec ts; + + if (!enabled) + this->next_flush_time = 0; + + ts.it_value.tv_sec = this->next_flush_time / SPA_NSEC_PER_SEC; + ts.it_value.tv_nsec = this->next_flush_time % SPA_NSEC_PER_SEC; + ts.it_interval.tv_sec = 0; + ts.it_interval.tv_nsec = 0; + spa_system_timerfd_settime(this->data_system, + this->flush_timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL); + + this->flush_pending = enabled; +} + +static uint32_t get_queued_frames(struct impl *this) +{ + struct port *port = &this->port; + uint32_t bytes = 0; + struct buffer *b; + + spa_list_for_each(b, &port->ready, link) { + struct spa_data *d = b->buf->datas; + + bytes += d[0].chunk->size; + } + + if (bytes > port->ready_offset) + bytes -= port->ready_offset; + else + bytes = 0; + + return bytes / port->frame_size; +} + static void flush_data(struct impl *this) { struct port *port = &this->port; - struct spa_data *datas; const uint32_t min_in_size = (this->transport->codec == HFP_AUDIO_CODEC_MSBC) ? MSBC_DECODED_SIZE : this->transport->write_mtu; uint8_t * const packet = (this->transport->codec == HFP_AUDIO_CODEC_MSBC) ? this->buffer_head : port->write_buffer; + const uint32_t packet_samples = min_in_size / port->frame_size; + const uint64_t packet_time = (uint64_t)packet_samples * SPA_NSEC_PER_SEC + / port->current_format.info.raw.rate; + int processed = 0; + int written; if (this->transport == NULL || this->transport->sco_io == NULL) return; -again: while (!spa_list_is_empty(&port->ready) && port->write_buffer_size < min_in_size) { + struct spa_data *datas; + /* get buffer */ if (!port->current_buffer) { spa_return_if_fail(!spa_list_is_empty(&port->ready)); @@ -429,147 +437,212 @@ again: } } - /* send the data if the write buffer is full */ - if (port->write_buffer_size >= min_in_size) { - uint64_t now_time; - static int sn = 0; - int processed = 0; - ssize_t out_encoded; - int written; - uint64_t next_timeout; - - spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now); - now_time = SPA_TIMESPEC_TO_NSEC(&this->now); - if (this->start_time == 0) - this->start_time = now_time; - - if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) { - /* Encode */ - if (this->buffer_next + MSBC_ENCODED_SIZE > this->buffer + this->buffer_size) { - /* Buffer overrun; shouldn't usually happen. Drop data and reset. */ - this->buffer_head = this->buffer_next = this->buffer; - spa_log_warn(this->log, "sco-sink: mSBC buffer overrun, dropping data"); - } - this->buffer_next[0] = 0x01; - this->buffer_next[1] = sntable[sn]; - this->buffer_next[59] = 0x00; - sn = (sn + 1) % 4; - processed = sbc_encode(&this->msbc, port->write_buffer, port->write_buffer_size, - this->buffer_next + 2, MSBC_ENCODED_SIZE - 3, &out_encoded); - if (processed < 0) { - spa_log_warn(this->log, "sbc_encode failed: %d", processed); - return; - } - this->buffer_next += out_encoded + 3; - port->write_buffer_size = 0; - - /* Write */ - written = spa_bt_sco_io_write(this->transport->sco_io, packet, - this->buffer_next - this->buffer_head); - if (written < 0) { - spa_log_warn(this->log, "failed to write data: %d (%s)", - written, spa_strerror(written)); - goto stop; - } - - this->buffer_head += written; - - if (this->buffer_head == this->buffer_next) - this->buffer_head = this->buffer_next = this->buffer; - else if (this->buffer_next + MSBC_ENCODED_SIZE > this->buffer + this->buffer_size) { - /* Written bytes is not necessarily commensurate - * with MSBC_ENCODED_SIZE. If this occurs, copy data. - */ - int size = this->buffer_next - this->buffer_head; - spa_memmove(this->buffer, this->buffer_head, size); - this->buffer_next = this->buffer + size; - this->buffer_head = this->buffer; - } - } else { - written = spa_bt_sco_io_write(this->transport->sco_io, packet, - port->write_buffer_size); - if (written < 0) { - spa_log_warn(this->log, "sco-sink: write failure: %d (%s)", - written, spa_strerror(written)); - goto stop; - } else if (written == 0) { - /* EAGAIN or similar, just skip ahead */ - written = SPA_MIN(port->write_buffer_size, (uint32_t)48); - } - - processed = written; - port->write_buffer_size -= written; - - if (port->write_buffer_size > 0 && written > 0) { - spa_memmove(port->write_buffer, port->write_buffer + written, port->write_buffer_size); - } - } - - spa_log_trace(this->log, "write socket data %d", written); - - next_timeout = get_next_timeout(this, now_time, processed / port->frame_size); - - if (!this->following && this->clock) { - this->clock->nsec = now_time; - this->clock->position = this->total_samples; - this->clock->delay = processed / port->frame_size; - this->clock->rate_diff = 1.0f; - this->clock->next_nsec = now_time + next_timeout; - } - - if (next_timeout == 0) - goto again; - - spa_log_trace(this->log, "timeout %"PRIu64" ns", next_timeout); - set_timeout(this, next_timeout); - } else { - /* As follower, driver will wake us up when there is data */ - if (this->following) - return; - - /* As driver, run timeout now to schedule data */ - spa_log_trace(this->log, "timeout 1 ns (driver: schedule now)"); - set_timeout(this, 1); + if (this->flush_pending) { + spa_log_trace(this->log, "%p: wait for flush timer", this); + return; } + if (port->write_buffer_size < min_in_size) { + /* wait for more data */ + spa_log_trace(this->log, "%p: skip flush", this); + enable_flush_timer(this, false); + return; + } + + if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) { + ssize_t out_encoded; + + /* Encode */ + if (this->buffer_next + MSBC_ENCODED_SIZE > this->buffer + this->buffer_size) { + /* Buffer overrun; shouldn't usually happen. Drop data and reset. */ + this->buffer_head = this->buffer_next = this->buffer; + spa_log_warn(this->log, "sco-sink: mSBC buffer overrun, dropping data"); + } + this->buffer_next[0] = 0x01; + this->buffer_next[1] = sntable[this->msbc_seq % 4]; + this->buffer_next[59] = 0x00; + this->msbc_seq = (this->msbc_seq + 1) % 4; + processed = sbc_encode(&this->msbc, port->write_buffer, port->write_buffer_size, + this->buffer_next + 2, MSBC_ENCODED_SIZE - 3, &out_encoded); + if (processed < 0) { + spa_log_warn(this->log, "sbc_encode failed: %d", processed); + return; + } + this->buffer_next += out_encoded + 3; + port->write_buffer_size = 0; + + /* Write */ + written = spa_bt_sco_io_write(this->transport->sco_io, packet, + this->buffer_next - this->buffer_head); + if (written < 0) { + spa_log_warn(this->log, "failed to write data: %d (%s)", + written, spa_strerror(written)); + goto stop; + } + + this->buffer_head += written; + + if (this->buffer_head == this->buffer_next) + this->buffer_head = this->buffer_next = this->buffer; + else if (this->buffer_next + MSBC_ENCODED_SIZE > this->buffer + this->buffer_size) { + /* Written bytes is not necessarily commensurate + * with MSBC_ENCODED_SIZE. If this occurs, copy data. + */ + int size = this->buffer_next - this->buffer_head; + spa_memmove(this->buffer, this->buffer_head, size); + this->buffer_next = this->buffer + size; + this->buffer_head = this->buffer; + } + } else { + written = spa_bt_sco_io_write(this->transport->sco_io, packet, + port->write_buffer_size); + if (written < 0) { + spa_log_warn(this->log, "sco-sink: write failure: %d (%s)", + written, spa_strerror(written)); + goto stop; + } else if (written == 0) { + /* EAGAIN or similar, just skip ahead */ + written = SPA_MIN(port->write_buffer_size, (uint32_t)48); + } + + processed = written; + port->write_buffer_size -= written; + + if (port->write_buffer_size > 0 && written > 0) { + spa_memmove(port->write_buffer, port->write_buffer + written, port->write_buffer_size); + } + } + + if (SPA_UNLIKELY(spa_log_level_topic_enabled(this->log, SPA_LOG_TOPIC_DEFAULT, SPA_LOG_LEVEL_TRACE))) { + struct timespec ts; + uint64_t now; + uint64_t dt; + + spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &ts); + now = SPA_TIMESPEC_TO_NSEC(&ts); + dt = now - this->prev_flush_time; + this->prev_flush_time = now; + + spa_log_trace(this->log, + "%p: send wrote:%d dt:%"PRIu64, + this, written, dt); + } + + spa_log_trace(this->log, "write socket data %d", written); + + if (SPA_LIKELY(this->position)) { + uint32_t frames = get_queued_frames(this); + uint64_t duration_ns; + + /* + * Flush at the time position of the next buffered sample. + */ + duration_ns = ((uint64_t)this->position->clock.duration * SPA_NSEC_PER_SEC + / this->position->clock.rate.denom); + this->next_flush_time = this->process_time + duration_ns + - ((uint64_t)frames * SPA_NSEC_PER_SEC + / port->current_format.info.raw.rate); + + /* + * We could delay the output by one packet to avoid waiting + * for the next buffer and so make send intervals more regular. + * However, this appears not needed in practice, and it's better + * to not add latency if not needed. + */ +#if 0 + this->next_flush_time += SPA_MIN(packet_time, + duration_ns * (port->n_buffers - 1)); +#endif + } else { + if (this->next_flush_time == 0) + this->next_flush_time = this->process_time; + this->next_flush_time += packet_time; + } + + enable_flush_timer(this, true); return; stop: if (this->source.loop) spa_loop_remove_source(this->data_loop, &this->source); + enable_flush_timer(this, false); +} + + +static void sco_on_flush_timeout(struct spa_source *source) +{ + struct impl *this = source->data; + uint64_t exp; + + spa_log_trace(this->log, "%p: flush on timeout", this); + + if (spa_system_timerfd_read(this->data_system, this->flush_timerfd, &exp) < 0) + spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno)); + + if (this->transport == NULL) { + enable_flush_timer(this, false); + return; + } + + while (exp-- > 0) { + this->flush_pending = false; + flush_data(this); + } } static void sco_on_timeout(struct spa_source *source) { struct impl *this = source->data; struct port *port = &this->port; - uint64_t exp; + uint64_t exp, duration; + uint32_t rate; + struct spa_io_buffers *io = port->io; + uint64_t prev_time, now_time; if (this->transport == NULL) return; - /* Read the timerfd */ if (this->started && spa_system_timerfd_read(this->data_system, this->timerfd, &exp) < 0) spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno)); - /* delay if no buffers available */ - if (!this->following && spa_list_is_empty(&port->ready)) { - set_timeout(this, this->transport->write_mtu / port->frame_size * SPA_NSEC_PER_SEC / port->current_format.info.raw.rate); - port->io->status = SPA_STATUS_NEED_DATA; - spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_DATA); - return; + prev_time = this->current_time; + now_time = this->current_time = this->next_time; + + spa_log_debug(this->log, "%p: timer %"PRIu64" %"PRIu64"", this, + now_time, now_time - prev_time); + + if (SPA_LIKELY(this->position)) { + duration = this->position->clock.duration; + rate = this->position->clock.rate.denom; + } else { + duration = 1024; + rate = 48000; } - /* Flush data */ - flush_data(this); + this->next_time = now_time + duration * SPA_NSEC_PER_SEC / rate; + + if (SPA_LIKELY(this->clock)) { + this->clock->nsec = now_time; + this->clock->position += duration; + this->clock->duration = duration; + this->clock->rate_diff = 1.0f; + this->clock->next_nsec = this->next_time; + this->clock->delay = 0; + } + + spa_log_trace(this->log, "%p: %d", this, io->status); + io->status = SPA_STATUS_NEED_DATA; + spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_DATA); + + set_timeout(this, this->next_time); } /* greater common divider */ static int gcd(int a, int b) { while(b) { - int c = b; - b = a % b; - a = c; + int c = b; + b = a % b; + a = c; } return a; } @@ -616,6 +689,10 @@ static int do_start(struct impl *this) this->buffer_size = lcm(24, lcm(60, lcm(this->transport->write_mtu, 2 * MSBC_ENCODED_SIZE))); this->buffer = calloc(this->buffer_size, sizeof(uint8_t)); this->buffer_head = this->buffer_next = this->buffer; + if (this->buffer == NULL) { + res = -errno; + goto fail; + } } spa_return_val_if_fail(this->transport->write_mtu <= sizeof(this->port.write_buffer), -EINVAL); @@ -632,7 +709,15 @@ static int do_start(struct impl *this) this->source.rmask = 0; spa_loop_add_source(this->data_loop, &this->source); + this->flush_timer_source.data = this; + this->flush_timer_source.fd = this->flush_timerfd; + this->flush_timer_source.func = sco_on_flush_timeout; + this->flush_timer_source.mask = SPA_IO_IN; + this->flush_timer_source.rmask = 0; + spa_loop_add_source(this->data_loop, &this->flush_timer_source); + /* start processing */ + this->flush_pending = false; set_timers(this); /* Set the started flag */ @@ -675,13 +760,20 @@ static int do_remove_source(struct spa_loop *loop, void *user_data) { struct impl *this = user_data; + struct itimerspec ts; - this->start_time = 0; - this->total_samples = 0; set_timeout(this, 0); if (this->source.loop) spa_loop_remove_source(this->data_loop, &this->source); + if (this->flush_timer_source.loop) + spa_loop_remove_source(this->data_loop, &this->flush_timer_source); + ts.it_value.tv_sec = 0; + ts.it_value.tv_nsec = 0; + ts.it_interval.tv_sec = 0; + ts.it_interval.tv_nsec = 0; + spa_system_timerfd_settime(this->data_system, this->flush_timerfd, 0, &ts, NULL); + /* Drop buffered data in the ready queue. Ideally there shouldn't be any. */ drop_port_output(this); @@ -758,7 +850,7 @@ static void emit_node_info(struct impl *this, bool full) { SPA_KEY_DEVICE_API, "bluez5" }, { SPA_KEY_MEDIA_CLASS, "Stream/Input/Audio" }, { "media.name", ((this->transport && this->transport->device->name) ? - this->transport->device->name : "HSP/HFP") }, + this->transport->device->name : "HSP/HFP") }, { SPA_KEY_MEDIA_ROLE, "Communication" }, }; bool is_ag = this->transport && @@ -917,9 +1009,9 @@ impl_node_port_enum_params(void *object, int seq, SPA_TYPE_OBJECT_ParamBuffers, id, SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 1, MAX_BUFFERS), SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1), - SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( - this->props.max_latency * port->frame_size, - this->props.min_latency * port->frame_size, + SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( + this->quantum_limit * port->frame_size, + 16 * port->frame_size, INT32_MAX), SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->frame_size)); break; @@ -1150,6 +1242,11 @@ static int impl_node_process(void *object) if ((io = port->io) == NULL) return -EIO; + if (this->position && this->position->clock.flags & SPA_IO_CLOCK_FLAG_FREEWHEEL) { + io->status = SPA_STATUS_NEED_DATA; + return SPA_STATUS_HAVE_DATA; + } + if (io->status == SPA_STATUS_HAVE_DATA && io->buffer_id < port->n_buffers) { struct buffer *b = &port->buffers[io->buffer_id]; @@ -1167,8 +1264,22 @@ static int impl_node_process(void *object) io->status = SPA_STATUS_OK; } - if (!spa_list_is_empty(&port->ready)) + if (this->following) { + if (this->position) { + this->current_time = this->position->clock.nsec; + } else { + struct timespec now; + spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now); + this->current_time = SPA_TIMESPEC_TO_NSEC(&now); + } + } + + this->process_time = this->current_time; + + if (!spa_list_is_empty(&port->ready)) { + spa_log_trace(this->log, "%p: flush on process", this); flush_data(this); + } return SPA_STATUS_HAVE_DATA; } @@ -1239,6 +1350,7 @@ static int impl_clear(struct spa_handle *handle) if (this->transport) spa_hook_remove(&this->transport_listener); spa_system_close(this->data_system, this->timerfd); + spa_system_close(this->data_system, this->flush_timerfd); return 0; } @@ -1323,6 +1435,11 @@ impl_init(const struct spa_handle_factory *factory, spa_list_init(&port->ready); + this->quantum_limit = 8192; + + if (info && (str = spa_dict_lookup(info, "clock.quantum-limit"))) + spa_atou32(str, &this->quantum_limit, 0); + if (info && (str = spa_dict_lookup(info, SPA_KEY_API_BLUEZ5_TRANSPORT))) sscanf(str, "pointer:%p", &this->transport); @@ -1336,6 +1453,9 @@ impl_init(const struct spa_handle_factory *factory, this->timerfd = spa_system_timerfd_create(this->data_system, CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); + this->flush_timerfd = spa_system_timerfd_create(this->data_system, + CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); + return 0; }