param: add a new user seq field in the param-info

Add a new seq field in the param-info struct. Users can use this
field to keep track of pending param updates.

Store the latest seq number of the param update in the seq field. Remove
all params that don't match the sequence number because they are too
old. This avoids duplicate old params in pw-dump output.

Rework the pulseaudio manager with this same method.
This commit is contained in:
Wim Taymans 2022-10-27 20:10:33 +02:00
parent a989230cb5
commit e6fd5888ee
4 changed files with 85 additions and 53 deletions

View file

@ -72,7 +72,9 @@ struct spa_param_info {
uint32_t flags;
uint32_t user; /**< private user field. You can use this to keep
* state. */
uint32_t padding[5];
int32_t seq; /**< private seq field. You can use this to keep
* state of a pending update. */
uint32_t padding[4];
};
#define SPA_PARAM_INFO(id,flags) ((struct spa_param_info){ (id), (flags) })

View file

@ -33,8 +33,6 @@
#include "log.h"
#include "module-protocol-pulse/server.h"
#define MAX_PARAMS 32
#define manager_emit_sync(m) spa_hook_list_call(&(m)->hooks, struct pw_manager_events, sync, 0)
#define manager_emit_added(m,o) spa_hook_list_call(&(m)->hooks, struct pw_manager_events, added, 0, o)
#define manager_emit_updated(m,o) spa_hook_list_call(&(m)->hooks, struct pw_manager_events, updated, 0, o)
@ -85,8 +83,6 @@ struct object {
struct spa_hook proxy_listener;
struct spa_hook object_listener;
int param_seq[MAX_PARAMS];
struct spa_list data_list;
};
@ -113,7 +109,7 @@ static uint32_t clear_params(struct spa_list *param_list, uint32_t id)
}
static struct pw_manager_param *add_param(struct spa_list *params,
int seq, int *param_seq, uint32_t id, const struct spa_pod *param)
int seq, uint32_t id, const struct spa_pod *param)
{
struct pw_manager_param *p;
@ -125,24 +121,12 @@ static struct pw_manager_param *add_param(struct spa_list *params,
id = SPA_POD_OBJECT_ID(param);
}
if (id >= MAX_PARAMS) {
pw_log_error("too big param id %d", id);
errno = EINVAL;
return NULL;
}
if (seq != param_seq[id]) {
pw_log_debug("ignoring param %d, seq:%d != current_seq:%d",
id, seq, param_seq[id]);
errno = EBUSY;
return NULL;
}
p = malloc(sizeof(*p) + (param != NULL ? SPA_POD_SIZE(param) : 0));
if (p == NULL)
return NULL;
p->id = id;
p->seq = seq;
if (param != NULL) {
p->param = SPA_PTROFF(p, sizeof(*p), struct spa_pod);
memcpy(p->param, param, SPA_POD_SIZE(param));
@ -167,7 +151,6 @@ static bool has_param(struct spa_list *param_list, struct pw_manager_param *p)
return false;
}
static struct object *find_object_by_id(struct manager *m, uint32_t id)
{
struct object *o;
@ -180,7 +163,19 @@ static struct object *find_object_by_id(struct manager *m, uint32_t id)
static void object_update_params(struct object *o)
{
struct pw_manager_param *p;
struct pw_manager_param *p, *t;
uint32_t i;
for (i = 0; i < o->this.n_params; i++) {
spa_list_for_each_safe(p, t, &o->pending_list, link) {
if (p->id == o->this.params[i].id &&
p->seq != o->this.params[i].seq &&
p->param != NULL) {
spa_list_remove(&p->link);
free(p);
}
}
}
spa_list_consume(p, &o->pending_list, link) {
spa_list_remove(&p->link);
@ -321,6 +316,9 @@ static void device_event_info(void *data, const struct pw_device_info *info)
if (info == NULL)
return;
o->this.n_params = info->n_params;
o->this.params = info->params;
if (info->change_mask & PW_DEVICE_CHANGE_MASK_PROPS)
changed++;
@ -333,11 +331,6 @@ static void device_event_info(void *data, const struct pw_device_info *info)
continue;
info->params[i].user = 0;
if (id >= MAX_PARAMS) {
pw_log_error("too big param id %d", id);
continue;
}
switch (id) {
case SPA_PARAM_EnumProfile:
case SPA_PARAM_Profile:
@ -347,14 +340,14 @@ static void device_event_info(void *data, const struct pw_device_info *info)
case SPA_PARAM_Route:
break;
}
add_param(&o->pending_list, o->param_seq[id], o->param_seq, id, NULL);
add_param(&o->pending_list, info->params[i].seq, id, NULL);
if (!(info->params[i].flags & SPA_PARAM_INFO_READ))
continue;
res = pw_device_enum_params((struct pw_device*)o->this.proxy,
++o->param_seq[id], id, 0, -1, NULL);
++info->params[i].seq, id, 0, -1, NULL);
if (SPA_RESULT_IS_ASYNC(res))
o->param_seq[id] = res;
info->params[i].seq = res;
}
}
if (changed) {
@ -391,7 +384,7 @@ static void device_event_param(void *data, int seq,
struct manager *m = o->manager;
struct pw_manager_param *p;
p = add_param(&o->pending_list, seq, o->param_seq, id, param);
p = add_param(&o->pending_list, seq, id, param);
if (p == NULL)
return;
@ -443,6 +436,9 @@ static void node_event_info(void *data, const struct pw_node_info *info)
if (info == NULL)
return;
o->this.n_params = info->n_params;
o->this.params = info->params;
if (info->change_mask & PW_NODE_CHANGE_MASK_STATE)
changed++;
@ -458,20 +454,15 @@ static void node_event_info(void *data, const struct pw_node_info *info)
continue;
info->params[i].user = 0;
if (id >= MAX_PARAMS) {
pw_log_error("too big param id %d", id);
continue;
}
changed++;
add_param(&o->pending_list, o->param_seq[id], o->param_seq, id, NULL);
add_param(&o->pending_list, info->params[i].seq, id, NULL);
if (!(info->params[i].flags & SPA_PARAM_INFO_READ))
continue;
res = pw_node_enum_params((struct pw_node*)o->this.proxy,
++o->param_seq[id], id, 0, -1, NULL);
++info->params[i].seq, id, 0, -1, NULL);
if (SPA_RESULT_IS_ASYNC(res))
o->param_seq[id] = res;
info->params[i].seq = res;
}
}
if (changed) {
@ -485,7 +476,7 @@ static void node_event_param(void *data, int seq,
const struct spa_pod *param)
{
struct object *o = data;
add_param(&o->pending_list, seq, o->param_seq, id, param);
add_param(&o->pending_list, seq, id, param);
}
static const struct pw_node_events node_events = {

View file

@ -72,6 +72,7 @@ struct pw_manager {
struct pw_manager_param {
uint32_t id;
int32_t seq;
struct spa_list link; /**< link in manager_object param_list */
struct spa_pod *param;
};
@ -92,6 +93,9 @@ struct pw_manager_object {
int changed;
void *info;
struct spa_param_info *params;
uint32_t n_params;
struct spa_list param_list;
unsigned int creating:1;
unsigned int removing:1;

View file

@ -88,6 +88,7 @@ struct data {
struct param {
uint32_t id;
int32_t seq;
struct spa_list link;
struct spa_pod *param;
};
@ -116,6 +117,8 @@ struct object {
const struct class *class;
void *info;
struct spa_param_info *params;
uint32_t n_params;
int changed;
struct spa_list param_list;
@ -148,7 +151,8 @@ static uint32_t clear_params(struct spa_list *param_list, uint32_t id)
return count;
}
static struct param *add_param(struct spa_list *params, uint32_t id, const struct spa_pod *param)
static struct param *add_param(struct spa_list *params, int seq,
uint32_t id, const struct spa_pod *param)
{
struct param *p;
@ -165,6 +169,7 @@ static struct param *add_param(struct spa_list *params, uint32_t id, const struc
return NULL;
p->id = id;
p->seq = seq;
if (param != NULL) {
p->param = SPA_PTROFF(p, sizeof(*p), struct spa_pod);
memcpy(p->param, param, SPA_POD_SIZE(param));
@ -189,7 +194,19 @@ static struct object *find_object(struct data *d, uint32_t id)
static void object_update_params(struct object *o)
{
struct param *p;
struct param *p, *t;
uint32_t i;
for (i = 0; i < o->n_params; i++) {
spa_list_for_each_safe(p, t, &o->pending_list, link) {
if (p->id == o->params[i].id &&
p->seq != o->params[i].seq &&
p->param != NULL) {
spa_list_remove(&p->link);
free(p);
}
}
}
spa_list_consume(p, &o->pending_list, link) {
spa_list_remove(&p->link);
@ -771,6 +788,7 @@ static void device_event_info(void *data, const struct pw_device_info *info)
{
struct object *o = data;
uint32_t i, changed = 0;
int res;
pw_log_debug("object %p: id:%d change-mask:%08"PRIx64, o, o->id, info->change_mask);
@ -778,6 +796,9 @@ static void device_event_info(void *data, const struct pw_device_info *info)
if (info == NULL)
return;
o->params = info->params;
o->n_params = info->n_params;
if (info->change_mask & PW_DEVICE_CHANGE_MASK_PROPS)
changed++;
@ -790,12 +811,14 @@ static void device_event_info(void *data, const struct pw_device_info *info)
info->params[i].user = 0;
changed++;
clear_params(&o->pending_list, id);
add_param(&o->pending_list, 0, id, NULL);
if (!(info->params[i].flags & SPA_PARAM_INFO_READ))
continue;
pw_device_enum_params((struct pw_device*)o->proxy,
0, id, 0, -1, NULL);
res = pw_device_enum_params((struct pw_device*)o->proxy,
++info->params[i].seq, id, 0, -1, NULL);
if (SPA_RESULT_IS_ASYNC(res))
info->params[i].seq = res;
}
}
if (changed) {
@ -809,7 +832,7 @@ static void device_event_param(void *data, int seq,
const struct spa_pod *param)
{
struct object *o = data;
add_param(&o->pending_list, id, param);
add_param(&o->pending_list, seq, id, param);
}
static const struct pw_device_events device_events = {
@ -867,6 +890,7 @@ static void node_event_info(void *data, const struct pw_node_info *info)
{
struct object *o = data;
uint32_t i, changed = 0;
int res;
pw_log_debug("object %p: id:%d change-mask:%08"PRIx64, o, o->id, info->change_mask);
@ -874,6 +898,9 @@ static void node_event_info(void *data, const struct pw_node_info *info)
if (info == NULL)
return;
o->params = info->params;
o->n_params = info->n_params;
if (info->change_mask & PW_NODE_CHANGE_MASK_STATE)
changed++;
@ -889,12 +916,14 @@ static void node_event_info(void *data, const struct pw_node_info *info)
info->params[i].user = 0;
changed++;
add_param(&o->pending_list, id, NULL);
add_param(&o->pending_list, 0, id, NULL);
if (!(info->params[i].flags & SPA_PARAM_INFO_READ))
continue;
pw_node_enum_params((struct pw_node*)o->proxy,
0, id, 0, -1, NULL);
res = pw_node_enum_params((struct pw_node*)o->proxy,
++info->params[i].seq, id, 0, -1, NULL);
if (SPA_RESULT_IS_ASYNC(res))
info->params[i].seq = res;
}
}
if (changed) {
@ -908,7 +937,7 @@ static void node_event_param(void *data, int seq,
const struct spa_pod *param)
{
struct object *o = data;
add_param(&o->pending_list, id, param);
add_param(&o->pending_list, seq, id, param);
}
static const struct pw_node_events node_events = {
@ -958,6 +987,7 @@ static void port_event_info(void *data, const struct pw_port_info *info)
{
struct object *o = data;
uint32_t i, changed = 0;
int res;
pw_log_debug("object %p: id:%d change-mask:%08"PRIx64, o, o->id, info->change_mask);
@ -965,6 +995,9 @@ static void port_event_info(void *data, const struct pw_port_info *info)
if (info == NULL)
return;
o->params = info->params;
o->n_params = info->n_params;
if (info->change_mask & PW_PORT_CHANGE_MASK_PROPS)
changed++;
@ -977,12 +1010,14 @@ static void port_event_info(void *data, const struct pw_port_info *info)
info->params[i].user = 0;
changed++;
add_param(&o->pending_list, id, NULL);
add_param(&o->pending_list, 0, id, NULL);
if (!(info->params[i].flags & SPA_PARAM_INFO_READ))
continue;
pw_port_enum_params((struct pw_port*)o->proxy,
0, id, 0, -1, NULL);
res = pw_port_enum_params((struct pw_port*)o->proxy,
++info->params[i].seq, id, 0, -1, NULL);
if (SPA_RESULT_IS_ASYNC(res))
info->params[i].seq = res;
}
}
if (changed) {
@ -996,7 +1031,7 @@ static void port_event_param(void *data, int seq,
const struct spa_pod *param)
{
struct object *o = data;
add_param(&o->pending_list, id, param);
add_param(&o->pending_list, seq, id, param);
}
static const struct pw_port_events port_events = {