mirror of
git://source.winehq.org/git/wine.git
synced 2024-11-05 18:01:34 +00:00
560 lines
18 KiB
C
560 lines
18 KiB
C
/*
|
|
* GStreamer muxer backend
|
|
*
|
|
* Copyright 2023 Ziqing Hui for CodeWeavers
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Lesser General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2.1 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public
|
|
* License along with this library; if not, write to the Free Software
|
|
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
|
|
*/
|
|
|
|
/*
|
|
* wg_muxer will autoplug gstreamer muxer and parser elements.
|
|
* It creates a pipeline like this:
|
|
*
|
|
* ------------------- -------
|
|
* [my_src 1] ==> |parser 1 (optional)| ==> | |
|
|
* ------------------- | |
|
|
* | |
|
|
* ------------------- | |
|
|
* [my_src 2] ==> |parser 2 (optional)| ==> | |
|
|
* ------------------- | |
|
|
* | muxer | ==> [my_sink]
|
|
* | |
|
|
* [ ...... ] | |
|
|
* | |
|
|
* | |
|
|
* ------------------- | |
|
|
* [my_src n] ==> |parser n (optional)| ==> | |
|
|
* ------------------- -------
|
|
*/
|
|
|
|
#if 0
|
|
#pragma makedep unix
|
|
#endif
|
|
|
|
#include <stdio.h>
|
|
|
|
#include "ntstatus.h"
|
|
#define WIN32_NO_STATUS
|
|
#include "winternl.h"
|
|
|
|
#include "unix_private.h"
|
|
|
|
#include "wine/list.h"
|
|
|
|
struct wg_muxer
|
|
{
|
|
GstElement *container, *muxer;
|
|
GstPad *my_sink;
|
|
GstCaps *my_sink_caps;
|
|
|
|
GstAtomicQueue *output_queue;
|
|
GstBuffer *buffer;
|
|
|
|
pthread_mutex_t mutex;
|
|
pthread_cond_t cond;
|
|
|
|
bool eos;
|
|
guint64 offset; /* Write offset of the output buffer generated by muxer. */
|
|
|
|
struct list streams;
|
|
};
|
|
|
|
struct wg_muxer_stream
|
|
{
|
|
struct wg_muxer *muxer;
|
|
struct wg_format format;
|
|
uint32_t id;
|
|
|
|
GstPad *my_src;
|
|
GstCaps *my_src_caps, *parser_src_caps;
|
|
GstElement *parser;
|
|
GstSegment segment;
|
|
|
|
struct list entry;
|
|
};
|
|
|
|
static struct wg_muxer *get_muxer(wg_muxer_t muxer)
|
|
{
|
|
return (struct wg_muxer *)(ULONG_PTR)muxer;
|
|
}
|
|
|
|
static struct wg_muxer_stream *muxer_get_stream_by_id(struct wg_muxer *muxer, DWORD id)
|
|
{
|
|
struct wg_muxer_stream *stream;
|
|
|
|
LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
|
|
{
|
|
if (stream->id == id)
|
|
return stream;
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static bool muxer_try_muxer_factory(struct wg_muxer *muxer, GstElementFactory *muxer_factory)
|
|
{
|
|
struct wg_muxer_stream *stream;
|
|
|
|
GST_INFO("Trying %"GST_PTR_FORMAT".", muxer_factory);
|
|
|
|
LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
|
|
{
|
|
GstCaps *caps = stream->parser ? stream->parser_src_caps : stream->my_src_caps;
|
|
|
|
if (!gst_element_factory_can_sink_any_caps(muxer_factory, caps))
|
|
{
|
|
GST_INFO("%"GST_PTR_FORMAT" cannot sink stream %u %p, caps %"GST_PTR_FORMAT,
|
|
muxer_factory, stream->id, stream, caps);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
static GstElement *muxer_find_muxer(struct wg_muxer *muxer)
|
|
{
|
|
/* Some muxers are formatter, eg. id3mux. */
|
|
GstElementFactoryListType muxer_type = GST_ELEMENT_FACTORY_TYPE_MUXER | GST_ELEMENT_FACTORY_TYPE_FORMATTER;
|
|
GstElement *element = NULL;
|
|
GList *muxers, *tmp;
|
|
|
|
GST_DEBUG("muxer %p.", muxer);
|
|
|
|
muxers = find_element_factories(muxer_type, GST_RANK_NONE, NULL, muxer->my_sink_caps);
|
|
|
|
for (tmp = muxers; tmp && !element; tmp = tmp->next)
|
|
{
|
|
GstElementFactory *factory = GST_ELEMENT_FACTORY(tmp->data);
|
|
if (muxer_try_muxer_factory(muxer, factory))
|
|
element = factory_create_element(factory);
|
|
}
|
|
|
|
gst_plugin_feature_list_free(muxers);
|
|
|
|
if (!element)
|
|
GST_WARNING("Failed to find any compatible muxer element.");
|
|
|
|
return element;
|
|
}
|
|
|
|
static gboolean muxer_sink_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
|
|
{
|
|
struct wg_muxer *muxer = gst_pad_get_element_private(pad);
|
|
|
|
GST_DEBUG("pad %p, parent %p, query %p, muxer %p, type \"%s\".",
|
|
pad, parent, query, muxer, gst_query_type_get_name(query->type));
|
|
|
|
switch (query->type)
|
|
{
|
|
case GST_QUERY_SEEKING:
|
|
gst_query_set_seeking(query, GST_FORMAT_BYTES, true, 0, -1);
|
|
return true;
|
|
default:
|
|
GST_WARNING("Ignoring \"%s\" query.", gst_query_type_get_name(query->type));
|
|
return gst_pad_query_default(pad, parent, query);
|
|
}
|
|
}
|
|
|
|
static gboolean muxer_sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
|
|
{
|
|
struct wg_muxer *muxer = gst_pad_get_element_private(pad);
|
|
const GstSegment *segment;
|
|
|
|
GST_DEBUG("pad %p, parent %p, event %p, muxer %p, type \"%s\".",
|
|
pad, parent, event, muxer, GST_EVENT_TYPE_NAME(event));
|
|
|
|
switch (event->type)
|
|
{
|
|
case GST_EVENT_EOS:
|
|
pthread_mutex_lock(&muxer->mutex);
|
|
muxer->eos = true;
|
|
pthread_mutex_unlock(&muxer->mutex);
|
|
pthread_cond_signal(&muxer->cond);
|
|
break;
|
|
|
|
case GST_EVENT_SEGMENT:
|
|
pthread_mutex_lock(&muxer->mutex);
|
|
|
|
gst_event_parse_segment(event, &segment);
|
|
if (segment->format != GST_FORMAT_BYTES)
|
|
{
|
|
pthread_mutex_unlock(&muxer->mutex);
|
|
GST_FIXME("Unhandled segment format \"%s\".", gst_format_get_name(segment->format));
|
|
break;
|
|
}
|
|
muxer->offset = segment->start;
|
|
|
|
pthread_mutex_unlock(&muxer->mutex);
|
|
break;
|
|
|
|
default:
|
|
GST_WARNING("Ignoring \"%s\" event.", GST_EVENT_TYPE_NAME(event));
|
|
break;
|
|
}
|
|
|
|
gst_event_unref(event);
|
|
return TRUE;
|
|
}
|
|
|
|
static GstFlowReturn muxer_sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer)
|
|
{
|
|
GstBuffer *buffer_writable= gst_buffer_make_writable(buffer);
|
|
struct wg_muxer *muxer = gst_pad_get_element_private(pad);
|
|
|
|
GST_DEBUG("muxer %p, pad %"GST_PTR_FORMAT", parent %"GST_PTR_FORMAT", buffer <%"GST_PTR_FORMAT">.",
|
|
muxer, pad, parent, buffer);
|
|
|
|
pthread_mutex_lock(&muxer->mutex);
|
|
|
|
GST_BUFFER_OFFSET(buffer_writable) = GST_BUFFER_OFFSET_NONE;
|
|
if (muxer->offset != GST_BUFFER_OFFSET_NONE)
|
|
{
|
|
GST_BUFFER_OFFSET(buffer_writable) = muxer->offset;
|
|
muxer->offset = GST_BUFFER_OFFSET_NONE;
|
|
}
|
|
|
|
gst_atomic_queue_push(muxer->output_queue, buffer_writable);
|
|
|
|
GST_DEBUG("Pushed writable buffer <%"GST_PTR_FORMAT"> to output queue %p, %u buffers in queue now.",
|
|
buffer_writable, muxer->output_queue, gst_atomic_queue_length(muxer->output_queue));
|
|
|
|
pthread_mutex_unlock(&muxer->mutex);
|
|
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
static void stream_free(struct wg_muxer_stream *stream)
|
|
{
|
|
if (stream->parser_src_caps)
|
|
gst_caps_unref(stream->parser_src_caps);
|
|
gst_object_unref(stream->my_src);
|
|
gst_caps_unref(stream->my_src_caps);
|
|
free(stream);
|
|
}
|
|
|
|
NTSTATUS wg_muxer_create(void *args)
|
|
{
|
|
struct wg_muxer_create_params *params = args;
|
|
NTSTATUS status = STATUS_UNSUCCESSFUL;
|
|
GstPadTemplate *template = NULL;
|
|
struct wg_muxer *muxer;
|
|
|
|
/* Create wg_muxer object. */
|
|
if (!(muxer = calloc(1, sizeof(*muxer))))
|
|
return STATUS_NO_MEMORY;
|
|
list_init(&muxer->streams);
|
|
muxer->offset = GST_BUFFER_OFFSET_NONE;
|
|
pthread_mutex_init(&muxer->mutex, NULL);
|
|
pthread_cond_init(&muxer->cond, NULL);
|
|
if (!(muxer->container = gst_bin_new("wg_muxer")))
|
|
goto out;
|
|
if (!(muxer->output_queue = gst_atomic_queue_new(8)))
|
|
goto out;
|
|
|
|
/* Create sink pad. */
|
|
if (!(muxer->my_sink_caps = gst_caps_from_string(params->format)))
|
|
{
|
|
GST_ERROR("Failed to get caps from format string: \"%s\".", params->format);
|
|
goto out;
|
|
}
|
|
if (!(template = gst_pad_template_new("sink", GST_PAD_SINK, GST_PAD_ALWAYS, muxer->my_sink_caps)))
|
|
goto out;
|
|
muxer->my_sink = gst_pad_new_from_template(template, "wg_muxer_sink");
|
|
if (!muxer->my_sink)
|
|
goto out;
|
|
gst_pad_set_element_private(muxer->my_sink, muxer);
|
|
gst_pad_set_query_function(muxer->my_sink, muxer_sink_query_cb);
|
|
gst_pad_set_event_function(muxer->my_sink, muxer_sink_event_cb);
|
|
gst_pad_set_chain_function(muxer->my_sink, muxer_sink_chain_cb);
|
|
|
|
gst_object_unref(template);
|
|
|
|
GST_INFO("Created winegstreamer muxer %p.", muxer);
|
|
params->muxer = (wg_transform_t)(ULONG_PTR)muxer;
|
|
|
|
return STATUS_SUCCESS;
|
|
|
|
out:
|
|
if (muxer->my_sink)
|
|
gst_object_unref(muxer->my_sink);
|
|
if (template)
|
|
gst_object_unref(template);
|
|
if (muxer->my_sink_caps)
|
|
gst_caps_unref(muxer->my_sink_caps);
|
|
if (muxer->output_queue)
|
|
gst_atomic_queue_unref(muxer->output_queue);
|
|
if (muxer->container)
|
|
gst_object_unref(muxer->container);
|
|
pthread_cond_destroy(&muxer->cond);
|
|
pthread_mutex_destroy(&muxer->mutex);
|
|
free(muxer);
|
|
|
|
return status;
|
|
}
|
|
|
|
NTSTATUS wg_muxer_destroy(void *args)
|
|
{
|
|
struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args);
|
|
struct wg_muxer_stream *stream, *next;
|
|
GstBuffer *buffer;
|
|
|
|
LIST_FOR_EACH_ENTRY_SAFE(stream, next, &muxer->streams, struct wg_muxer_stream, entry)
|
|
{
|
|
list_remove(&stream->entry);
|
|
stream_free(stream);
|
|
}
|
|
|
|
if (muxer->buffer)
|
|
gst_buffer_unref(muxer->buffer);
|
|
|
|
while ((buffer = gst_atomic_queue_pop(muxer->output_queue)))
|
|
gst_buffer_unref(buffer);
|
|
gst_atomic_queue_unref(muxer->output_queue);
|
|
|
|
gst_object_unref(muxer->my_sink);
|
|
gst_caps_unref(muxer->my_sink_caps);
|
|
gst_element_set_state(muxer->container, GST_STATE_NULL);
|
|
gst_object_unref(muxer->container);
|
|
|
|
pthread_cond_destroy(&muxer->cond);
|
|
pthread_mutex_destroy(&muxer->mutex);
|
|
|
|
free(muxer);
|
|
|
|
return S_OK;
|
|
}
|
|
|
|
NTSTATUS wg_muxer_add_stream(void *args)
|
|
{
|
|
struct wg_muxer_add_stream_params *params = args;
|
|
struct wg_muxer *muxer = get_muxer(params->muxer);
|
|
NTSTATUS status = STATUS_UNSUCCESSFUL;
|
|
GstPadTemplate *template = NULL;
|
|
struct wg_muxer_stream *stream;
|
|
char src_pad_name[64];
|
|
|
|
GST_DEBUG("muxer %p, stream %u, format %p.", muxer, params->stream_id, params->format);
|
|
|
|
/* Create stream object. */
|
|
if (!(stream = calloc(1, sizeof(*stream))))
|
|
return STATUS_NO_MEMORY;
|
|
stream->muxer = muxer;
|
|
stream->format = *params->format;
|
|
stream->id = params->stream_id;
|
|
|
|
/* Create stream my_src pad. */
|
|
if (!(stream->my_src_caps = wg_format_to_caps(params->format)))
|
|
goto out;
|
|
if (!(template = gst_pad_template_new("src", GST_PAD_SRC, GST_PAD_ALWAYS, stream->my_src_caps)))
|
|
goto out;
|
|
sprintf(src_pad_name, "wg_muxer_stream_src_%u", stream->id);
|
|
if (!(stream->my_src = gst_pad_new_from_template(template, src_pad_name)))
|
|
goto out;
|
|
gst_pad_set_element_private(stream->my_src, stream);
|
|
|
|
/* Create parser. */
|
|
if ((stream->parser = find_element(GST_ELEMENT_FACTORY_TYPE_PARSER, stream->my_src_caps, NULL)))
|
|
{
|
|
GstPad *parser_src;
|
|
|
|
if (!gst_bin_add(GST_BIN(muxer->container), stream->parser)
|
|
|| !link_src_to_element(stream->my_src, stream->parser))
|
|
goto out;
|
|
|
|
parser_src = gst_element_get_static_pad(stream->parser, "src");
|
|
stream->parser_src_caps = gst_pad_query_caps(parser_src, NULL);
|
|
GST_INFO("Created parser %"GST_PTR_FORMAT" for stream %u %p.",
|
|
stream->parser, stream->id, stream);
|
|
gst_object_unref(parser_src);
|
|
}
|
|
|
|
/* Add to muxer stream list. */
|
|
list_add_tail(&muxer->streams, &stream->entry);
|
|
|
|
gst_object_unref(template);
|
|
|
|
GST_INFO("Created winegstreamer muxer stream %p.", stream);
|
|
|
|
return STATUS_SUCCESS;
|
|
|
|
out:
|
|
if (stream->parser)
|
|
gst_object_unref(stream->parser);
|
|
if (stream->my_src)
|
|
gst_object_unref(stream->my_src);
|
|
if (template)
|
|
gst_object_unref(template);
|
|
if (stream->my_src_caps)
|
|
gst_caps_unref(stream->my_src_caps);
|
|
free(stream);
|
|
|
|
return status;
|
|
}
|
|
|
|
NTSTATUS wg_muxer_start(void *args)
|
|
{
|
|
struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args);
|
|
NTSTATUS status = STATUS_UNSUCCESSFUL;
|
|
struct wg_muxer_stream *stream;
|
|
|
|
GST_DEBUG("muxer %p.", muxer);
|
|
|
|
/* Create muxer element. */
|
|
if (!(muxer->muxer = muxer_find_muxer(muxer))
|
|
|| !gst_bin_add(GST_BIN(muxer->container), muxer->muxer))
|
|
return status;
|
|
|
|
/* Link muxer element to my_sink */
|
|
if (!link_element_to_sink(muxer->muxer, muxer->my_sink)
|
|
|| !gst_pad_set_active(muxer->my_sink, 1))
|
|
return status;
|
|
|
|
/* Link each stream to muxer element. */
|
|
LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
|
|
{
|
|
bool link_ok = stream->parser ?
|
|
gst_element_link(stream->parser, muxer->muxer) :
|
|
link_src_to_element(stream->my_src, muxer->muxer);
|
|
|
|
if (!link_ok)
|
|
return status;
|
|
}
|
|
|
|
/* Set to pause state. */
|
|
if (gst_element_set_state(muxer->container, GST_STATE_PAUSED) == GST_STATE_CHANGE_FAILURE
|
|
|| gst_element_get_state(muxer->container, NULL, NULL, -1) == GST_STATE_CHANGE_FAILURE)
|
|
return status;
|
|
|
|
/* Active stream my_src pad and push events to prepare for streaming. */
|
|
LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
|
|
{
|
|
char buffer[64];
|
|
|
|
sprintf(buffer, "wg_muxer_stream_src_%u", stream->id);
|
|
gst_segment_init(&stream->segment, GST_FORMAT_BYTES);
|
|
if (!gst_pad_set_active(stream->my_src, 1))
|
|
return status;
|
|
if (!push_event(stream->my_src, gst_event_new_stream_start(buffer))
|
|
|| !push_event(stream->my_src, gst_event_new_caps(stream->my_src_caps))
|
|
|| !push_event(stream->my_src, gst_event_new_segment(&stream->segment)))
|
|
return status;
|
|
}
|
|
|
|
GST_DEBUG("Started muxer %p.", muxer);
|
|
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
NTSTATUS wg_muxer_push_sample(void *args)
|
|
{
|
|
struct wg_muxer_push_sample_params *params = args;
|
|
struct wg_muxer *muxer = get_muxer(params->muxer);
|
|
struct wg_sample *sample = params->sample;
|
|
struct wg_muxer_stream *stream;
|
|
GstFlowReturn ret;
|
|
GstBuffer *buffer;
|
|
|
|
if (!(stream = muxer_get_stream_by_id(muxer, params->stream_id)))
|
|
return STATUS_NOT_FOUND;
|
|
|
|
/* Create sample data buffer. */
|
|
if (!(buffer = gst_buffer_new_and_alloc(sample->size))
|
|
|| !gst_buffer_fill(buffer, 0, wg_sample_data(sample), sample->size))
|
|
{
|
|
GST_ERROR("Failed to allocate input buffer.");
|
|
return STATUS_NO_MEMORY;
|
|
}
|
|
|
|
GST_INFO("Copied %u bytes from sample %p to buffer %p.", sample->size, sample, buffer);
|
|
|
|
/* Set sample properties. */
|
|
if (sample->flags & WG_SAMPLE_FLAG_HAS_PTS)
|
|
GST_BUFFER_PTS(buffer) = sample->pts * 100;
|
|
if (sample->flags & WG_SAMPLE_FLAG_HAS_DURATION)
|
|
GST_BUFFER_DURATION(buffer) = sample->duration * 100;
|
|
if (!(sample->flags & WG_SAMPLE_FLAG_SYNC_POINT))
|
|
GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT);
|
|
if (sample->flags & WG_SAMPLE_FLAG_DISCONTINUITY)
|
|
GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_DISCONT);
|
|
|
|
/* Push sample data buffer to stream src pad. */
|
|
if ((ret = gst_pad_push(stream->my_src, buffer)) < 0)
|
|
{
|
|
GST_ERROR("Failed to push buffer %p to pad %s, reason %s.",
|
|
buffer, gst_pad_get_name(stream->my_src), gst_flow_get_name(ret));
|
|
return STATUS_UNSUCCESSFUL;
|
|
}
|
|
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
NTSTATUS wg_muxer_read_data(void *args)
|
|
{
|
|
struct wg_muxer_read_data_params *params = args;
|
|
struct wg_muxer *muxer = get_muxer(params->muxer);
|
|
gsize size, copied;
|
|
|
|
/* Pop buffer from output queue. */
|
|
if (!muxer->buffer)
|
|
{
|
|
if (!(muxer->buffer = gst_atomic_queue_pop(muxer->output_queue)))
|
|
return STATUS_NO_MEMORY;
|
|
|
|
/* We may continuously read data from a same buffer multiple times.
|
|
* But we only need to set the offset at the first reading. */
|
|
if (GST_BUFFER_OFFSET_IS_VALID(muxer->buffer))
|
|
params->offset = GST_BUFFER_OFFSET(muxer->buffer);
|
|
}
|
|
|
|
/* Copy data. */
|
|
size = min(gst_buffer_get_size(muxer->buffer), params->size);
|
|
copied = gst_buffer_extract(muxer->buffer, 0, params->buffer, size);
|
|
params->size = copied;
|
|
|
|
GST_INFO("Copied %"G_GSIZE_FORMAT" bytes from buffer <%"GST_PTR_FORMAT">", copied, muxer->buffer);
|
|
|
|
/* Unref buffer if all data is read. */
|
|
gst_buffer_resize(muxer->buffer, (gssize)copied, -1);
|
|
if (!gst_buffer_get_size(muxer->buffer))
|
|
{
|
|
gst_buffer_unref(muxer->buffer);
|
|
muxer->buffer = NULL;
|
|
}
|
|
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
NTSTATUS wg_muxer_finalize(void *args)
|
|
{
|
|
struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args);
|
|
struct wg_muxer_stream *stream;
|
|
|
|
/* Notify each stream of EOS. */
|
|
LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
|
|
{
|
|
if (!push_event(stream->my_src, gst_event_new_segment_done(GST_FORMAT_BYTES, -1))
|
|
|| !push_event(stream->my_src, gst_event_new_eos()))
|
|
return STATUS_UNSUCCESSFUL;
|
|
}
|
|
|
|
/* Wait for muxer EOS. */
|
|
pthread_mutex_lock(&muxer->mutex);
|
|
while (!muxer->eos)
|
|
pthread_cond_wait(&muxer->cond, &muxer->mutex);
|
|
pthread_mutex_unlock(&muxer->mutex);
|
|
|
|
return STATUS_SUCCESS;
|
|
}
|