stream: use the pw_impl_node implementation details

We can use the position/clock and driving variables from the
pw_impl_node instead of keeping another copy around.
This commit is contained in:
Wim Taymans 2024-04-10 17:33:30 +02:00
parent 06905cd53b
commit d672e8fbf2
2 changed files with 16 additions and 87 deletions

View file

@ -114,12 +114,6 @@ struct filter {
struct spa_node impl_node;
struct spa_hook_list hooks;
struct spa_callbacks callbacks;
struct spa_io_clock *clock;
struct spa_io_position *position;
struct {
struct spa_io_position *position;
} rt;
struct spa_list port_list;
struct pw_map ports[2];
@ -151,7 +145,6 @@ struct filter {
unsigned int allow_mlock:1;
unsigned int warn_mlock:1;
unsigned int process_rt:1;
unsigned int driving:1;
unsigned int trigger:1;
int in_emit_param_changed;
};
@ -491,38 +484,12 @@ static int impl_set_param(void *object, uint32_t id, uint32_t flags, const struc
return 0;
}
static int
do_set_position(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct filter *impl = user_data;
impl->rt.position = impl->position;
return 0;
}
static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
{
struct filter *impl = object;
pw_log_debug("%p: io %d %p/%zd", impl, id, data, size);
switch(id) {
case SPA_IO_Clock:
if (data && size >= sizeof(struct spa_io_clock))
impl->clock = data;
else
impl->clock = NULL;
break;
case SPA_IO_Position:
if (data && size >= sizeof(struct spa_io_position))
impl->position = data;
else
impl->position = NULL;
pw_loop_invoke(impl->data_loop,
do_set_position, 1, NULL, 0, true, impl);
break;
}
impl->driving = impl->clock && impl->position && impl->position->clock.id == impl->clock->id;
pw_filter_emit_io_changed(&impl->this, NULL, id, data, size);
return 0;
@ -1010,7 +977,7 @@ do_call_process(struct spa_loop *loop,
struct filter *impl = user_data;
struct pw_filter *filter = &impl->this;
pw_log_trace("%p: do process", filter);
pw_filter_emit_process(filter, impl->position);
pw_filter_emit_process(filter, filter->node->rt.position);
return 0;
}
@ -1020,7 +987,7 @@ static void call_process(struct filter *impl)
if (SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_RT_PROCESS)) {
if (impl->rt_callbacks.funcs)
spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_filter_events,
process, 0, impl->rt.position);
process, 0, impl->this.node->rt.position);
} else {
pw_loop_invoke(impl->main_loop,
do_call_process, 1, NULL, 0, false, impl);
@ -1646,7 +1613,6 @@ pw_filter_connect(struct pw_filter *filter,
impl->disconnecting = false;
impl->draining = false;
impl->driving = false;
filter_set_state(filter, PW_FILTER_STATE_CONNECTING, 0, NULL);
if (flags & PW_FILTER_FLAG_DRIVER)
@ -1978,7 +1944,7 @@ SPA_EXPORT
int pw_filter_get_time(struct pw_filter *filter, struct pw_time *time)
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
struct spa_io_position *p = impl->position;
struct spa_io_position *p = filter->node->rt.position;
if (SPA_LIKELY(p != NULL)) {
impl->time.now = p->clock.nsec;
@ -2095,8 +2061,7 @@ int pw_filter_flush(struct pw_filter *filter, bool drain)
SPA_EXPORT
bool pw_filter_is_driving(struct pw_filter *filter)
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
return impl->driving;
return filter->node->driving;
}
static int
@ -2128,11 +2093,11 @@ int pw_filter_trigger_process(struct pw_filter *filter)
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
int res = 0;
pw_log_trace_fp("%p: driving:%d", impl, impl->driving);
pw_log_trace_fp("%p: driving:%d", impl, filter->node->driving);
if (impl->trigger) {
pw_impl_node_trigger(filter->node);
} else if (impl->driving) {
} else if (filter->node->driving) {
res = pw_loop_invoke(impl->data_loop,
do_trigger_process, 1, NULL, 0, false, impl);
} else {

View file

@ -93,15 +93,10 @@ struct stream {
struct spa_hook_list hooks;
struct spa_callbacks callbacks;
struct spa_io_clock *clock;
struct spa_io_position *position;
struct spa_io_buffers *io;
struct spa_io_rate_match *rate_match;
uint32_t rate_queued;
uint64_t rate_size;
struct {
struct spa_io_position *position;
} rt;
uint64_t port_change_mask_all;
struct spa_port_info port_info;
@ -153,7 +148,6 @@ struct stream {
unsigned int allow_mlock:1;
unsigned int warn_mlock:1;
unsigned int process_rt:1;
unsigned int driving:1;
unsigned int using_trigger:1;
unsigned int trigger:1;
unsigned int early_process:1;
@ -506,15 +500,6 @@ static void call_trigger_done(struct stream *impl)
}
}
static int
do_set_position(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct stream *impl = user_data;
impl->rt.position = impl->position;
return 0;
}
static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
{
struct stream *impl = object;
@ -523,26 +508,6 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
pw_log_debug("%p: set io id %d (%s) %p %zd", impl, id,
spa_debug_type_find_name(spa_type_io, id), data, size);
switch(id) {
case SPA_IO_Clock:
if (data && size >= sizeof(struct spa_io_clock))
impl->clock = data;
else
impl->clock = NULL;
break;
case SPA_IO_Position:
if (data && size >= sizeof(struct spa_io_position))
impl->position = data;
else
impl->position = NULL;
pw_loop_invoke(impl->data_loop,
do_set_position, 1, NULL, 0, true, impl);
break;
default:
break;
}
impl->driving = impl->clock && impl->position && impl->position->clock.id == impl->clock->id;
pw_stream_emit_io_changed(stream, id, data, size);
return 0;
@ -670,7 +635,7 @@ static int impl_set_param(void *object, uint32_t id, uint32_t flags, const struc
static inline void copy_position(struct stream *impl, int64_t queued)
{
struct spa_io_position *p = impl->rt.position;
struct spa_io_position *p = impl->this.node->rt.position;
SPA_SEQ_WRITE(impl->seq);
if (SPA_LIKELY(p != NULL)) {
@ -726,7 +691,7 @@ static int impl_send_command(void *object, const struct spa_command *command)
}
else {
copy_position(impl, impl->queued.incount);
if (!impl->process_rt && !impl->driving)
if (!impl->process_rt && !stream->node->driving)
call_process(impl);
}
stream_set_state(stream, PW_STREAM_STATE_STREAMING, 0, NULL);
@ -1089,7 +1054,7 @@ static int impl_node_process_input(void *object)
}
io->status = SPA_STATUS_NEED_DATA;
}
if (impl->driving && impl->using_trigger)
if (stream->node->driving && impl->using_trigger)
call_trigger_done(impl);
return SPA_STATUS_NEED_DATA | SPA_STATUS_HAVE_DATA;
@ -1156,7 +1121,7 @@ again:
copy_position(impl, impl->queued.outcount);
if (!impl->draining && !impl->driving) {
if (!impl->draining && !stream->node->driving) {
/* we're not draining, not a driver check if we need to get
* more buffers */
if (ask_more) {
@ -1171,7 +1136,7 @@ again:
pw_log_trace_fp("%p: res %d", stream, res);
if (impl->driving && impl->using_trigger && res != SPA_STATUS_HAVE_DATA)
if (stream->node->driving && impl->using_trigger && res != SPA_STATUS_HAVE_DATA)
call_trigger_done(impl);
return res;
@ -2017,9 +1982,9 @@ pw_stream_connect(struct pw_stream *stream,
impl->disconnecting = false;
impl->drained = false;
impl->draining = false;
impl->driving = false;
impl->trigger = false;
impl->using_trigger = false;
stream_set_state(stream, PW_STREAM_STATE_CONNECTING, 0, NULL);
if (target_id != PW_ID_ANY)
@ -2481,7 +2446,7 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
return res;
if (impl->direction == SPA_DIRECTION_OUTPUT &&
impl->driving && !impl->using_trigger) {
stream->node->driving && !impl->using_trigger) {
pw_log_debug("deprecated: use pw_stream_trigger_process() to drive the stream.");
res = pw_loop_invoke(impl->data_loop,
do_trigger_deprecated, 1, NULL, 0, false, impl);
@ -2549,8 +2514,7 @@ int pw_stream_flush(struct pw_stream *stream, bool drain)
SPA_EXPORT
bool pw_stream_is_driving(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
return impl->driving;
return stream->node->driving;
}
static int
@ -2589,14 +2553,14 @@ int pw_stream_trigger_process(struct pw_stream *stream)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
int res = 0;
pw_log_trace_fp("%p: trigger:%d driving:%d", impl, impl->trigger, impl->driving);
pw_log_trace_fp("%p: trigger:%d driving:%d", impl, impl->trigger, stream->node->driving);
/* flag to check for old or new behaviour */
impl->using_trigger = true;
if (impl->trigger) {
pw_impl_node_trigger(stream->node);
} else if (impl->driving) {
} else if (stream->node->driving) {
if (!impl->process_rt)
call_process(impl);
res = pw_loop_invoke(impl->data_loop,