wine/dlls/winegstreamer/wg_muxer.c
2023-11-27 22:39:52 +01:00

561 lines
18 KiB
C

/*
* GStreamer muxer backend
*
* Copyright 2023 Ziqing Hui for CodeWeavers
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
*/
/*
* wg_muxer will autoplug gstreamer muxer and parser elements.
* It creates a pipeline like this:
*
* ------------------- -------
* [my_src 1] ==> |parser 1 (optional)| ==> | |
* ------------------- | |
* | |
* ------------------- | |
* [my_src 2] ==> |parser 2 (optional)| ==> | |
* ------------------- | |
* | muxer | ==> [my_sink]
* | |
* [ ...... ] | |
* | |
* | |
* ------------------- | |
* [my_src n] ==> |parser n (optional)| ==> | |
* ------------------- -------
*/
#if 0
#pragma makedep unix
#endif
#include <stdio.h>
#include "ntstatus.h"
#define WIN32_NO_STATUS
#include "winternl.h"
#include "unix_private.h"
#include "wine/list.h"
struct wg_muxer
{
GstElement *container, *muxer;
GstPad *my_sink;
GstCaps *my_sink_caps;
GstAtomicQueue *output_queue;
GstBuffer *buffer;
pthread_mutex_t mutex;
pthread_cond_t cond;
bool eos;
guint64 offset; /* Write offset of the output buffer generated by muxer. */
struct list streams;
};
struct wg_muxer_stream
{
struct wg_muxer *muxer;
struct wg_format format;
uint32_t id;
GstPad *my_src;
GstCaps *my_src_caps, *parser_src_caps;
GstElement *parser;
GstSegment segment;
struct list entry;
};
static struct wg_muxer *get_muxer(wg_muxer_t muxer)
{
return (struct wg_muxer *)(ULONG_PTR)muxer;
}
static struct wg_muxer_stream *muxer_get_stream_by_id(struct wg_muxer *muxer, DWORD id)
{
struct wg_muxer_stream *stream;
LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
{
if (stream->id == id)
return stream;
}
return NULL;
}
static bool muxer_try_muxer_factory(struct wg_muxer *muxer, GstElementFactory *muxer_factory)
{
struct wg_muxer_stream *stream;
GST_INFO("Trying %"GST_PTR_FORMAT".", muxer_factory);
LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
{
GstCaps *caps = stream->parser ? stream->parser_src_caps : stream->my_src_caps;
if (!gst_element_factory_can_sink_any_caps(muxer_factory, caps))
{
GST_INFO("%"GST_PTR_FORMAT" cannot sink stream %u %p, caps %"GST_PTR_FORMAT,
muxer_factory, stream->id, stream, caps);
return false;
}
}
return true;
}
static GstElement *muxer_find_muxer(struct wg_muxer *muxer)
{
/* Some muxers are formatter, eg. id3mux. */
GstElementFactoryListType muxer_type = GST_ELEMENT_FACTORY_TYPE_MUXER | GST_ELEMENT_FACTORY_TYPE_FORMATTER;
GstElement *element = NULL;
GList *muxers, *tmp;
GST_DEBUG("muxer %p.", muxer);
muxers = find_element_factories(muxer_type, GST_RANK_NONE, NULL, muxer->my_sink_caps);
for (tmp = muxers; tmp && !element; tmp = tmp->next)
{
GstElementFactory *factory = GST_ELEMENT_FACTORY(tmp->data);
if (muxer_try_muxer_factory(muxer, factory))
element = factory_create_element(factory);
}
gst_plugin_feature_list_free(muxers);
if (!element)
GST_WARNING("Failed to find any compatible muxer element.");
return element;
}
static gboolean muxer_sink_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
{
struct wg_muxer *muxer = gst_pad_get_element_private(pad);
GST_DEBUG("pad %p, parent %p, query %p, muxer %p, type \"%s\".",
pad, parent, query, muxer, gst_query_type_get_name(query->type));
switch (query->type)
{
case GST_QUERY_SEEKING:
gst_query_set_seeking(query, GST_FORMAT_BYTES, true, 0, -1);
return true;
default:
GST_WARNING("Ignoring \"%s\" query.", gst_query_type_get_name(query->type));
return gst_pad_query_default(pad, parent, query);
}
}
static gboolean muxer_sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
{
struct wg_muxer *muxer = gst_pad_get_element_private(pad);
const GstSegment *segment;
GST_DEBUG("pad %p, parent %p, event %p, muxer %p, type \"%s\".",
pad, parent, event, muxer, GST_EVENT_TYPE_NAME(event));
switch (event->type)
{
case GST_EVENT_EOS:
pthread_mutex_lock(&muxer->mutex);
muxer->eos = true;
pthread_mutex_unlock(&muxer->mutex);
pthread_cond_signal(&muxer->cond);
break;
case GST_EVENT_SEGMENT:
pthread_mutex_lock(&muxer->mutex);
gst_event_parse_segment(event, &segment);
if (segment->format != GST_FORMAT_BYTES)
{
pthread_mutex_unlock(&muxer->mutex);
GST_FIXME("Unhandled segment format \"%s\".", gst_format_get_name(segment->format));
break;
}
muxer->offset = segment->start;
pthread_mutex_unlock(&muxer->mutex);
break;
default:
GST_WARNING("Ignoring \"%s\" event.", GST_EVENT_TYPE_NAME(event));
break;
}
gst_event_unref(event);
return TRUE;
}
static GstFlowReturn muxer_sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer)
{
GstBuffer *buffer_writable= gst_buffer_make_writable(buffer);
struct wg_muxer *muxer = gst_pad_get_element_private(pad);
GST_DEBUG("muxer %p, pad %"GST_PTR_FORMAT", parent %"GST_PTR_FORMAT", buffer <%"GST_PTR_FORMAT">.",
muxer, pad, parent, buffer);
pthread_mutex_lock(&muxer->mutex);
GST_BUFFER_OFFSET(buffer_writable) = GST_BUFFER_OFFSET_NONE;
if (muxer->offset != GST_BUFFER_OFFSET_NONE)
{
GST_BUFFER_OFFSET(buffer_writable) = muxer->offset;
muxer->offset = GST_BUFFER_OFFSET_NONE;
}
gst_atomic_queue_push(muxer->output_queue, buffer_writable);
GST_DEBUG("Pushed writable buffer <%"GST_PTR_FORMAT"> to output queue %p, %u buffers in queue now.",
buffer_writable, muxer->output_queue, gst_atomic_queue_length(muxer->output_queue));
pthread_mutex_unlock(&muxer->mutex);
return GST_FLOW_OK;
}
static void stream_free(struct wg_muxer_stream *stream)
{
if (stream->parser_src_caps)
gst_caps_unref(stream->parser_src_caps);
gst_object_unref(stream->my_src);
gst_caps_unref(stream->my_src_caps);
free(stream);
}
NTSTATUS wg_muxer_create(void *args)
{
struct wg_muxer_create_params *params = args;
NTSTATUS status = STATUS_UNSUCCESSFUL;
GstPadTemplate *template = NULL;
struct wg_muxer *muxer;
/* Create wg_muxer object. */
if (!(muxer = calloc(1, sizeof(*muxer))))
return STATUS_NO_MEMORY;
list_init(&muxer->streams);
muxer->offset = GST_BUFFER_OFFSET_NONE;
pthread_mutex_init(&muxer->mutex, NULL);
pthread_cond_init(&muxer->cond, NULL);
if (!(muxer->container = gst_bin_new("wg_muxer")))
goto out;
if (!(muxer->output_queue = gst_atomic_queue_new(8)))
goto out;
/* Create sink pad. */
if (!(muxer->my_sink_caps = gst_caps_from_string(params->format)))
{
GST_ERROR("Failed to get caps from format string: \"%s\".", params->format);
goto out;
}
if (!(template = gst_pad_template_new("sink", GST_PAD_SINK, GST_PAD_ALWAYS, muxer->my_sink_caps)))
goto out;
muxer->my_sink = gst_pad_new_from_template(template, "wg_muxer_sink");
if (!muxer->my_sink)
goto out;
gst_pad_set_element_private(muxer->my_sink, muxer);
gst_pad_set_query_function(muxer->my_sink, muxer_sink_query_cb);
gst_pad_set_event_function(muxer->my_sink, muxer_sink_event_cb);
gst_pad_set_chain_function(muxer->my_sink, muxer_sink_chain_cb);
gst_object_unref(template);
GST_INFO("Created winegstreamer muxer %p.", muxer);
params->muxer = (wg_transform_t)(ULONG_PTR)muxer;
return STATUS_SUCCESS;
out:
if (muxer->my_sink)
gst_object_unref(muxer->my_sink);
if (template)
gst_object_unref(template);
if (muxer->my_sink_caps)
gst_caps_unref(muxer->my_sink_caps);
if (muxer->output_queue)
gst_atomic_queue_unref(muxer->output_queue);
if (muxer->container)
gst_object_unref(muxer->container);
pthread_cond_destroy(&muxer->cond);
pthread_mutex_destroy(&muxer->mutex);
free(muxer);
return status;
}
NTSTATUS wg_muxer_destroy(void *args)
{
struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args);
struct wg_muxer_stream *stream, *next;
GstBuffer *buffer;
LIST_FOR_EACH_ENTRY_SAFE(stream, next, &muxer->streams, struct wg_muxer_stream, entry)
{
list_remove(&stream->entry);
stream_free(stream);
}
if (muxer->buffer)
gst_buffer_unref(muxer->buffer);
while ((buffer = gst_atomic_queue_pop(muxer->output_queue)))
gst_buffer_unref(buffer);
gst_atomic_queue_unref(muxer->output_queue);
gst_object_unref(muxer->my_sink);
gst_caps_unref(muxer->my_sink_caps);
gst_element_set_state(muxer->container, GST_STATE_NULL);
gst_object_unref(muxer->container);
pthread_cond_destroy(&muxer->cond);
pthread_mutex_destroy(&muxer->mutex);
free(muxer);
return S_OK;
}
NTSTATUS wg_muxer_add_stream(void *args)
{
struct wg_muxer_add_stream_params *params = args;
struct wg_muxer *muxer = get_muxer(params->muxer);
NTSTATUS status = STATUS_UNSUCCESSFUL;
GstPadTemplate *template = NULL;
struct wg_muxer_stream *stream;
char src_pad_name[64];
GST_DEBUG("muxer %p, stream %u, format %p.", muxer, params->stream_id, params->format);
/* Create stream object. */
if (!(stream = calloc(1, sizeof(*stream))))
return STATUS_NO_MEMORY;
stream->muxer = muxer;
stream->format = *params->format;
stream->id = params->stream_id;
/* Create stream my_src pad. */
if (!(stream->my_src_caps = wg_format_to_caps(params->format)))
goto out;
if (!(template = gst_pad_template_new("src", GST_PAD_SRC, GST_PAD_ALWAYS, stream->my_src_caps)))
goto out;
sprintf(src_pad_name, "wg_muxer_stream_src_%u", stream->id);
if (!(stream->my_src = gst_pad_new_from_template(template, src_pad_name)))
goto out;
gst_pad_set_element_private(stream->my_src, stream);
/* Create parser. */
if ((stream->parser = find_element(GST_ELEMENT_FACTORY_TYPE_PARSER, stream->my_src_caps, NULL)))
{
GstPad *parser_src;
if (!gst_bin_add(GST_BIN(muxer->container), stream->parser)
|| !link_src_to_element(stream->my_src, stream->parser))
goto out;
parser_src = gst_element_get_static_pad(stream->parser, "src");
stream->parser_src_caps = gst_pad_query_caps(parser_src, NULL);
GST_INFO("Created parser %"GST_PTR_FORMAT" for stream %u %p.",
stream->parser, stream->id, stream);
gst_object_unref(parser_src);
}
/* Add to muxer stream list. */
list_add_tail(&muxer->streams, &stream->entry);
gst_object_unref(template);
GST_INFO("Created winegstreamer muxer stream %p.", stream);
return STATUS_SUCCESS;
out:
if (stream->parser)
gst_object_unref(stream->parser);
if (stream->my_src)
gst_object_unref(stream->my_src);
if (template)
gst_object_unref(template);
if (stream->my_src_caps)
gst_caps_unref(stream->my_src_caps);
free(stream);
return status;
}
NTSTATUS wg_muxer_start(void *args)
{
struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args);
NTSTATUS status = STATUS_UNSUCCESSFUL;
struct wg_muxer_stream *stream;
GST_DEBUG("muxer %p.", muxer);
/* Create muxer element. */
if (!(muxer->muxer = muxer_find_muxer(muxer))
|| !gst_bin_add(GST_BIN(muxer->container), muxer->muxer))
return status;
/* Link muxer element to my_sink */
if (!link_element_to_sink(muxer->muxer, muxer->my_sink)
|| !gst_pad_set_active(muxer->my_sink, 1))
return status;
/* Link each stream to muxer element. */
LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
{
bool link_ok = stream->parser ?
gst_element_link(stream->parser, muxer->muxer) :
link_src_to_element(stream->my_src, muxer->muxer);
if (!link_ok)
return status;
}
/* Set to pause state. */
if (gst_element_set_state(muxer->container, GST_STATE_PAUSED) == GST_STATE_CHANGE_FAILURE
|| gst_element_get_state(muxer->container, NULL, NULL, -1) == GST_STATE_CHANGE_FAILURE)
return status;
/* Active stream my_src pad and push events to prepare for streaming. */
LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
{
char buffer[64];
sprintf(buffer, "wg_muxer_stream_src_%u", stream->id);
gst_segment_init(&stream->segment, GST_FORMAT_BYTES);
if (!gst_pad_set_active(stream->my_src, 1))
return status;
if (!push_event(stream->my_src, gst_event_new_stream_start(buffer))
|| !push_event(stream->my_src, gst_event_new_caps(stream->my_src_caps))
|| !push_event(stream->my_src, gst_event_new_segment(&stream->segment)))
return status;
}
GST_DEBUG("Started muxer %p.", muxer);
return STATUS_SUCCESS;
}
NTSTATUS wg_muxer_push_sample(void *args)
{
struct wg_muxer_push_sample_params *params = args;
struct wg_muxer *muxer = get_muxer(params->muxer);
struct wg_sample *sample = params->sample;
struct wg_muxer_stream *stream;
GstFlowReturn ret;
GstBuffer *buffer;
if (!(stream = muxer_get_stream_by_id(muxer, params->stream_id)))
return STATUS_NOT_FOUND;
/* Create sample data buffer. */
if (!(buffer = gst_buffer_new_and_alloc(sample->size))
|| !gst_buffer_fill(buffer, 0, wg_sample_data(sample), sample->size))
{
GST_ERROR("Failed to allocate input buffer.");
return STATUS_NO_MEMORY;
}
GST_INFO("Copied %u bytes from sample %p to buffer %p.", sample->size, sample, buffer);
/* Set sample properties. */
if (sample->flags & WG_SAMPLE_FLAG_HAS_PTS)
GST_BUFFER_PTS(buffer) = sample->pts * 100;
if (sample->flags & WG_SAMPLE_FLAG_HAS_DURATION)
GST_BUFFER_DURATION(buffer) = sample->duration * 100;
if (!(sample->flags & WG_SAMPLE_FLAG_SYNC_POINT))
GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT);
if (sample->flags & WG_SAMPLE_FLAG_DISCONTINUITY)
GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_DISCONT);
/* Push sample data buffer to stream src pad. */
if ((ret = gst_pad_push(stream->my_src, buffer)) < 0)
{
GST_ERROR("Failed to push buffer %p to pad %s, reason %s.",
buffer, gst_pad_get_name(stream->my_src), gst_flow_get_name(ret));
return STATUS_UNSUCCESSFUL;
}
return STATUS_SUCCESS;
}
NTSTATUS wg_muxer_read_data(void *args)
{
struct wg_muxer_read_data_params *params = args;
struct wg_muxer *muxer = get_muxer(params->muxer);
gsize size, copied;
/* Pop buffer from output queue. */
if (!muxer->buffer)
{
if (!(muxer->buffer = gst_atomic_queue_pop(muxer->output_queue)))
return STATUS_NO_MEMORY;
/* We may continuously read data from a same buffer multiple times.
* But we only need to set the offset at the first reading. */
if (GST_BUFFER_OFFSET_IS_VALID(muxer->buffer))
params->offset = GST_BUFFER_OFFSET(muxer->buffer);
}
/* Copy data. */
size = min(gst_buffer_get_size(muxer->buffer), params->size);
copied = gst_buffer_extract(muxer->buffer, 0, params->buffer, size);
params->size = copied;
GST_INFO("Copied %"G_GSIZE_FORMAT" bytes from buffer <%"GST_PTR_FORMAT">", copied, muxer->buffer);
/* Unref buffer if all data is read. */
gst_buffer_resize(muxer->buffer, (gssize)copied, -1);
if (!gst_buffer_get_size(muxer->buffer))
{
gst_buffer_unref(muxer->buffer);
muxer->buffer = NULL;
}
return STATUS_SUCCESS;
}
NTSTATUS wg_muxer_finalize(void *args)
{
struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args);
struct wg_muxer_stream *stream;
/* Notify each stream of EOS. */
LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
{
if (!push_event(stream->my_src, gst_event_new_segment_done(GST_FORMAT_BYTES, -1))
|| !push_event(stream->my_src, gst_event_new_eos()))
return STATUS_UNSUCCESSFUL;
}
/* Wait for muxer EOS. */
pthread_mutex_lock(&muxer->mutex);
while (!muxer->eos)
pthread_cond_wait(&muxer->cond, &muxer->mutex);
pthread_mutex_unlock(&muxer->mutex);
return STATUS_SUCCESS;
}