1 | /* GStreamer |
---|
2 | * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> |
---|
3 | * 2000 Wim Taymans <wtay@chello.be> |
---|
4 | * 2003 Colin Walters <cwalters@gnome.org> |
---|
5 | * |
---|
6 | * gstqueue.c: |
---|
7 | * |
---|
8 | * This library is free software; you can redistribute it and/or |
---|
9 | * modify it under the terms of the GNU Library General Public |
---|
10 | * License as published by the Free Software Foundation; either |
---|
11 | * version 2 of the License, or (at your option) any later version. |
---|
12 | * |
---|
13 | * This library is distributed in the hope that it will be useful, |
---|
14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
---|
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
---|
16 | * Library General Public License for more details. |
---|
17 | * |
---|
18 | * You should have received a copy of the GNU Library General Public |
---|
19 | * License along with this library; if not, write to the |
---|
20 | * Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
---|
21 | * Boston, MA 02111-1307, USA. |
---|
22 | */ |
---|
23 | |
---|
24 | |
---|
25 | #include "gst_private.h" |
---|
26 | |
---|
27 | #include "gstqueue.h" |
---|
28 | #include "gstscheduler.h" |
---|
29 | #include "gstevent.h" |
---|
30 | #include "gstinfo.h" |
---|
31 | #include "gsterror.h" |
---|
32 | |
---|
33 | static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", |
---|
34 | GST_PAD_SINK, |
---|
35 | GST_PAD_ALWAYS, |
---|
36 | GST_STATIC_CAPS_ANY); |
---|
37 | |
---|
38 | static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", |
---|
39 | GST_PAD_SRC, |
---|
40 | GST_PAD_ALWAYS, |
---|
41 | GST_STATIC_CAPS_ANY); |
---|
42 | |
---|
43 | GST_DEBUG_CATEGORY_STATIC (queue_dataflow); |
---|
44 | #define GST_CAT_DEFAULT (queue_dataflow) |
---|
45 | |
---|
46 | #define STATUS(queue, msg) \ |
---|
47 | GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ |
---|
48 | "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ |
---|
49 | "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ |
---|
50 | "-%" G_GUINT64_FORMAT " ns, %u elements", \ |
---|
51 | GST_DEBUG_PAD_NAME (pad), \ |
---|
52 | queue->cur_level.buffers, \ |
---|
53 | queue->min_threshold.buffers, \ |
---|
54 | queue->max_size.buffers, \ |
---|
55 | queue->cur_level.bytes, \ |
---|
56 | queue->min_threshold.bytes, \ |
---|
57 | queue->max_size.bytes, \ |
---|
58 | queue->cur_level.time, \ |
---|
59 | queue->min_threshold.time, \ |
---|
60 | queue->max_size.time, \ |
---|
61 | queue->queue->length) |
---|
62 | |
---|
63 | static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue", |
---|
64 | "Generic", |
---|
65 | "Simple data queue", |
---|
66 | "Erik Walthinsen <omega@cse.ogi.edu>"); |
---|
67 | |
---|
68 | |
---|
69 | /* Queue signals and args */ |
---|
70 | enum |
---|
71 | { |
---|
72 | SIGNAL_UNDERRUN, |
---|
73 | SIGNAL_RUNNING, |
---|
74 | SIGNAL_OVERRUN, |
---|
75 | LAST_SIGNAL |
---|
76 | }; |
---|
77 | |
---|
78 | enum |
---|
79 | { |
---|
80 | ARG_0, |
---|
81 | /* FIXME: don't we have another way of doing this |
---|
82 | * "Gstreamer format" (frame/byte/time) queries? */ |
---|
83 | ARG_CUR_LEVEL_BUFFERS, |
---|
84 | ARG_CUR_LEVEL_BYTES, |
---|
85 | ARG_CUR_LEVEL_TIME, |
---|
86 | ARG_MAX_SIZE_BUFFERS, |
---|
87 | ARG_MAX_SIZE_BYTES, |
---|
88 | ARG_MAX_SIZE_TIME, |
---|
89 | ARG_MIN_THRESHOLD_BUFFERS, |
---|
90 | ARG_MIN_THRESHOLD_BYTES, |
---|
91 | ARG_MIN_THRESHOLD_TIME, |
---|
92 | ARG_LEAKY, |
---|
93 | ARG_MAY_DEADLOCK, |
---|
94 | ARG_BLOCK_TIMEOUT |
---|
95 | /* FILL ME */ |
---|
96 | }; |
---|
97 | |
---|
98 | #define GST_QUEUE_MUTEX_LOCK G_STMT_START { \ |
---|
99 | GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ |
---|
100 | "locking qlock from thread %p", \ |
---|
101 | g_thread_self ()); \ |
---|
102 | g_mutex_lock (queue->qlock); \ |
---|
103 | GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ |
---|
104 | "locked qlock from thread %p", \ |
---|
105 | g_thread_self ()); \ |
---|
106 | } G_STMT_END |
---|
107 | |
---|
108 | #define GST_QUEUE_MUTEX_UNLOCK G_STMT_START { \ |
---|
109 | GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ |
---|
110 | "unlocking qlock from thread %p", \ |
---|
111 | g_thread_self ()); \ |
---|
112 | g_mutex_unlock (queue->qlock); \ |
---|
113 | } G_STMT_END |
---|
114 | |
---|
115 | |
---|
116 | typedef struct _GstQueueEventResponse |
---|
117 | { |
---|
118 | GstEvent *event; |
---|
119 | gboolean ret, handled; |
---|
120 | } |
---|
121 | GstQueueEventResponse; |
---|
122 | |
---|
123 | static void gst_queue_base_init (GstQueueClass * klass); |
---|
124 | static void gst_queue_class_init (GstQueueClass * klass); |
---|
125 | static void gst_queue_init (GstQueue * queue); |
---|
126 | static void gst_queue_finalize (GObject * object); |
---|
127 | |
---|
128 | static void gst_queue_set_property (GObject * object, |
---|
129 | guint prop_id, const GValue * value, GParamSpec * pspec); |
---|
130 | static void gst_queue_get_property (GObject * object, |
---|
131 | guint prop_id, GValue * value, GParamSpec * pspec); |
---|
132 | |
---|
133 | static void gst_queue_chain (GstPad * pad, GstData * data); |
---|
134 | static GstData *gst_queue_get (GstPad * pad); |
---|
135 | |
---|
136 | static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event); |
---|
137 | static gboolean gst_queue_handle_src_query (GstPad * pad, |
---|
138 | GstQueryType type, GstFormat * fmt, gint64 * value); |
---|
139 | |
---|
140 | static GstCaps *gst_queue_getcaps (GstPad * pad); |
---|
141 | static GstPadLinkReturn |
---|
142 | gst_queue_link_sink (GstPad * pad, const GstCaps * caps); |
---|
143 | static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps); |
---|
144 | static void gst_queue_locked_flush (GstQueue * queue); |
---|
145 | |
---|
146 | static GstElementStateReturn gst_queue_change_state (GstElement * element); |
---|
147 | static gboolean gst_queue_release_locks (GstElement * element); |
---|
148 | |
---|
149 | |
---|
150 | #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ()) |
---|
151 | |
---|
152 | static GType |
---|
153 | queue_leaky_get_type (void) |
---|
154 | { |
---|
155 | static GType queue_leaky_type = 0; |
---|
156 | static GEnumValue queue_leaky[] = { |
---|
157 | {GST_QUEUE_NO_LEAK, "0", "Not Leaky"}, |
---|
158 | {GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream"}, |
---|
159 | {GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream"}, |
---|
160 | {0, NULL, NULL}, |
---|
161 | }; |
---|
162 | |
---|
163 | if (!queue_leaky_type) { |
---|
164 | queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky); |
---|
165 | } |
---|
166 | return queue_leaky_type; |
---|
167 | } |
---|
168 | |
---|
169 | static GstElementClass *parent_class = NULL; |
---|
170 | static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; |
---|
171 | |
---|
172 | GType |
---|
173 | gst_queue_get_type (void) |
---|
174 | { |
---|
175 | static GType queue_type = 0; |
---|
176 | |
---|
177 | if (!queue_type) { |
---|
178 | static const GTypeInfo queue_info = { |
---|
179 | sizeof (GstQueueClass), |
---|
180 | (GBaseInitFunc) gst_queue_base_init, |
---|
181 | NULL, |
---|
182 | (GClassInitFunc) gst_queue_class_init, |
---|
183 | NULL, |
---|
184 | NULL, |
---|
185 | sizeof (GstQueue), |
---|
186 | 0, |
---|
187 | (GInstanceInitFunc) gst_queue_init, |
---|
188 | NULL |
---|
189 | }; |
---|
190 | |
---|
191 | queue_type = g_type_register_static (GST_TYPE_ELEMENT, |
---|
192 | "GstQueue", &queue_info, 0); |
---|
193 | GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue_dataflow", 0, |
---|
194 | "dataflow inside the queue element"); |
---|
195 | } |
---|
196 | |
---|
197 | return queue_type; |
---|
198 | } |
---|
199 | |
---|
200 | static void |
---|
201 | gst_queue_base_init (GstQueueClass * klass) |
---|
202 | { |
---|
203 | GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); |
---|
204 | |
---|
205 | gst_element_class_add_pad_template (gstelement_class, |
---|
206 | gst_static_pad_template_get (&srctemplate)); |
---|
207 | gst_element_class_add_pad_template (gstelement_class, |
---|
208 | gst_static_pad_template_get (&sinktemplate)); |
---|
209 | gst_element_class_set_details (gstelement_class, &gst_queue_details); |
---|
210 | } |
---|
211 | |
---|
212 | static void |
---|
213 | gst_queue_class_init (GstQueueClass * klass) |
---|
214 | { |
---|
215 | GObjectClass *gobject_class = G_OBJECT_CLASS (klass); |
---|
216 | GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); |
---|
217 | |
---|
218 | parent_class = g_type_class_peek_parent (klass); |
---|
219 | |
---|
220 | /* signals */ |
---|
221 | gst_queue_signals[SIGNAL_UNDERRUN] = |
---|
222 | g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, |
---|
223 | G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL, |
---|
224 | g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); |
---|
225 | gst_queue_signals[SIGNAL_RUNNING] = |
---|
226 | g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, |
---|
227 | G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL, |
---|
228 | g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); |
---|
229 | gst_queue_signals[SIGNAL_OVERRUN] = |
---|
230 | g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, |
---|
231 | G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL, |
---|
232 | g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); |
---|
233 | |
---|
234 | /* properties */ |
---|
235 | g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES, |
---|
236 | g_param_spec_uint ("current-level-bytes", "Current level (kB)", |
---|
237 | "Current amount of data in the queue (bytes)", |
---|
238 | 0, G_MAXUINT, 0, G_PARAM_READABLE)); |
---|
239 | g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS, |
---|
240 | g_param_spec_uint ("current-level-buffers", "Current level (buffers)", |
---|
241 | "Current number of buffers in the queue", |
---|
242 | 0, G_MAXUINT, 0, G_PARAM_READABLE)); |
---|
243 | g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME, |
---|
244 | g_param_spec_uint64 ("current-level-time", "Current level (ns)", |
---|
245 | "Current amount of data in the queue (in ns)", |
---|
246 | 0, G_MAXUINT64, 0, G_PARAM_READABLE)); |
---|
247 | |
---|
248 | g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES, |
---|
249 | g_param_spec_uint ("max-size-bytes", "Max. size (kB)", |
---|
250 | "Max. amount of data in the queue (bytes, 0=disable)", |
---|
251 | 0, G_MAXUINT, 0, G_PARAM_READWRITE)); |
---|
252 | g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS, |
---|
253 | g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", |
---|
254 | "Max. number of buffers in the queue (0=disable)", |
---|
255 | 0, G_MAXUINT, 0, G_PARAM_READWRITE)); |
---|
256 | g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME, |
---|
257 | g_param_spec_uint64 ("max-size-time", "Max. size (ns)", |
---|
258 | "Max. amount of data in the queue (in ns, 0=disable)", |
---|
259 | 0, G_MAXUINT64, 0, G_PARAM_READWRITE)); |
---|
260 | |
---|
261 | g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BYTES, |
---|
262 | g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)", |
---|
263 | "Min. amount of data in the queue to allow reading (bytes, 0=disable)", |
---|
264 | 0, G_MAXUINT, 0, G_PARAM_READWRITE)); |
---|
265 | g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BUFFERS, |
---|
266 | g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)", |
---|
267 | "Min. number of buffers in the queue to allow reading (0=disable)", |
---|
268 | 0, G_MAXUINT, 0, G_PARAM_READWRITE)); |
---|
269 | g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_TIME, |
---|
270 | g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)", |
---|
271 | "Min. amount of data in the queue to allow reading (in ns, 0=disable)", |
---|
272 | 0, G_MAXUINT64, 0, G_PARAM_READWRITE)); |
---|
273 | |
---|
274 | g_object_class_install_property (gobject_class, ARG_LEAKY, |
---|
275 | g_param_spec_enum ("leaky", "Leaky", |
---|
276 | "Where the queue leaks, if at all", |
---|
277 | GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE)); |
---|
278 | g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK, |
---|
279 | g_param_spec_boolean ("may_deadlock", "May Deadlock", |
---|
280 | "The queue may deadlock if it's full and not PLAYING", |
---|
281 | TRUE, G_PARAM_READWRITE)); |
---|
282 | g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT, |
---|
283 | g_param_spec_uint64 ("block_timeout", "Timeout for Block", |
---|
284 | "Nanoseconds until blocked queue times out and returns filler event. " |
---|
285 | "Value of -1 disables timeout", |
---|
286 | 0, G_MAXUINT64, -1, G_PARAM_READWRITE)); |
---|
287 | |
---|
288 | /* set several parent class virtual functions */ |
---|
289 | gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_queue_finalize); |
---|
290 | gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property); |
---|
291 | gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property); |
---|
292 | |
---|
293 | gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state); |
---|
294 | gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks); |
---|
295 | } |
---|
296 | |
---|
297 | static void |
---|
298 | gst_queue_init (GstQueue * queue) |
---|
299 | { |
---|
300 | /* scheduling on this kind of element is, well, interesting */ |
---|
301 | GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED); |
---|
302 | GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE); |
---|
303 | |
---|
304 | queue->sinkpad = |
---|
305 | gst_pad_new_from_template (gst_static_pad_template_get (&sinktemplate), |
---|
306 | "sink"); |
---|
307 | gst_pad_set_chain_function (queue->sinkpad, |
---|
308 | GST_DEBUG_FUNCPTR (gst_queue_chain)); |
---|
309 | gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad); |
---|
310 | gst_pad_set_link_function (queue->sinkpad, |
---|
311 | GST_DEBUG_FUNCPTR (gst_queue_link_sink)); |
---|
312 | gst_pad_set_getcaps_function (queue->sinkpad, |
---|
313 | GST_DEBUG_FUNCPTR (gst_queue_getcaps)); |
---|
314 | gst_pad_set_active (queue->sinkpad, TRUE); |
---|
315 | |
---|
316 | queue->srcpad = |
---|
317 | gst_pad_new_from_template (gst_static_pad_template_get (&srctemplate), |
---|
318 | "src"); |
---|
319 | gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get)); |
---|
320 | gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); |
---|
321 | gst_pad_set_link_function (queue->srcpad, |
---|
322 | GST_DEBUG_FUNCPTR (gst_queue_link_src)); |
---|
323 | gst_pad_set_getcaps_function (queue->srcpad, |
---|
324 | GST_DEBUG_FUNCPTR (gst_queue_getcaps)); |
---|
325 | gst_pad_set_event_function (queue->srcpad, |
---|
326 | GST_DEBUG_FUNCPTR (gst_queue_handle_src_event)); |
---|
327 | gst_pad_set_query_function (queue->srcpad, |
---|
328 | GST_DEBUG_FUNCPTR (gst_queue_handle_src_query)); |
---|
329 | gst_pad_set_active (queue->srcpad, TRUE); |
---|
330 | |
---|
331 | queue->cur_level.buffers = 0; /* no content */ |
---|
332 | queue->cur_level.bytes = 0; /* no content */ |
---|
333 | queue->cur_level.time = 0; /* no content */ |
---|
334 | queue->max_size.buffers = 100; /* 100 buffers */ |
---|
335 | queue->max_size.bytes = 10 * 1024 * 1024; /* 10 MB */ |
---|
336 | queue->max_size.time = GST_SECOND; /* 1 s. */ |
---|
337 | queue->min_threshold.buffers = 0; /* no threshold */ |
---|
338 | queue->min_threshold.bytes = 0; /* no threshold */ |
---|
339 | queue->min_threshold.time = 0; /* no threshold */ |
---|
340 | |
---|
341 | queue->leaky = GST_QUEUE_NO_LEAK; |
---|
342 | queue->may_deadlock = TRUE; |
---|
343 | queue->block_timeout = GST_CLOCK_TIME_NONE; |
---|
344 | queue->interrupt = FALSE; |
---|
345 | queue->flush = FALSE; |
---|
346 | |
---|
347 | queue->qlock = g_mutex_new (); |
---|
348 | queue->item_add = g_cond_new (); |
---|
349 | queue->item_del = g_cond_new (); |
---|
350 | queue->event_done = g_cond_new (); |
---|
351 | queue->events = g_queue_new (); |
---|
352 | queue->event_lock = g_mutex_new (); |
---|
353 | queue->queue = g_queue_new (); |
---|
354 | |
---|
355 | GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, |
---|
356 | "initialized queue's not_empty & not_full conditions"); |
---|
357 | } |
---|
358 | |
---|
359 | /* called only once, as opposed to dispose */ |
---|
360 | static void |
---|
361 | gst_queue_finalize (GObject * object) |
---|
362 | { |
---|
363 | GstQueue *queue = GST_QUEUE (object); |
---|
364 | |
---|
365 | GST_DEBUG_OBJECT (queue, "finalizing queue"); |
---|
366 | |
---|
367 | while (!g_queue_is_empty (queue->queue)) { |
---|
368 | GstData *data = g_queue_pop_head (queue->queue); |
---|
369 | |
---|
370 | gst_data_unref (data); |
---|
371 | } |
---|
372 | g_queue_free (queue->queue); |
---|
373 | g_mutex_free (queue->qlock); |
---|
374 | g_cond_free (queue->item_add); |
---|
375 | g_cond_free (queue->item_del); |
---|
376 | g_cond_free (queue->event_done); |
---|
377 | g_mutex_lock (queue->event_lock); |
---|
378 | while (!g_queue_is_empty (queue->events)) { |
---|
379 | GstQueueEventResponse *er = g_queue_pop_head (queue->events); |
---|
380 | |
---|
381 | gst_event_unref (er->event); |
---|
382 | } |
---|
383 | g_mutex_unlock (queue->event_lock); |
---|
384 | g_mutex_free (queue->event_lock); |
---|
385 | g_queue_free (queue->events); |
---|
386 | |
---|
387 | if (G_OBJECT_CLASS (parent_class)->finalize) |
---|
388 | G_OBJECT_CLASS (parent_class)->finalize (object); |
---|
389 | } |
---|
390 | |
---|
391 | static GstCaps * |
---|
392 | gst_queue_getcaps (GstPad * pad) |
---|
393 | { |
---|
394 | GstQueue *queue; |
---|
395 | |
---|
396 | queue = GST_QUEUE (gst_pad_get_parent (pad)); |
---|
397 | |
---|
398 | if (pad == queue->srcpad && queue->cur_level.bytes > 0) { |
---|
399 | return gst_caps_copy (queue->negotiated_caps); |
---|
400 | } |
---|
401 | |
---|
402 | return gst_pad_proxy_getcaps (pad); |
---|
403 | } |
---|
404 | |
---|
405 | static GstPadLinkReturn |
---|
406 | gst_queue_link_sink (GstPad * pad, const GstCaps * caps) |
---|
407 | { |
---|
408 | GstQueue *queue; |
---|
409 | GstPadLinkReturn link_ret; |
---|
410 | |
---|
411 | queue = GST_QUEUE (gst_pad_get_parent (pad)); |
---|
412 | |
---|
413 | if (queue->cur_level.bytes > 0) { |
---|
414 | if (gst_caps_is_equal (caps, queue->negotiated_caps)) { |
---|
415 | return GST_PAD_LINK_OK; |
---|
416 | } else if (GST_STATE (queue) != GST_STATE_PLAYING) { |
---|
417 | return GST_PAD_LINK_DELAYED; |
---|
418 | } |
---|
419 | |
---|
420 | /* Wait until the queue is empty before attempting the pad |
---|
421 | negotiation. */ |
---|
422 | GST_QUEUE_MUTEX_LOCK; |
---|
423 | |
---|
424 | STATUS (queue, "waiting for queue to get empty"); |
---|
425 | while (queue->cur_level.bytes > 0) { |
---|
426 | g_cond_wait (queue->item_del, queue->qlock); |
---|
427 | if (queue->interrupt) { |
---|
428 | GST_QUEUE_MUTEX_UNLOCK; |
---|
429 | return GST_PAD_LINK_DELAYED; |
---|
430 | } |
---|
431 | } |
---|
432 | STATUS (queue, "queue is now empty"); |
---|
433 | |
---|
434 | GST_QUEUE_MUTEX_UNLOCK; |
---|
435 | } |
---|
436 | |
---|
437 | link_ret = gst_pad_proxy_pad_link (pad, caps); |
---|
438 | |
---|
439 | if (GST_PAD_LINK_SUCCESSFUL (link_ret)) { |
---|
440 | /* we store an extra copy of the negotiated caps, just in case |
---|
441 | * the pads become unnegotiated while we have buffers */ |
---|
442 | gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps)); |
---|
443 | } |
---|
444 | |
---|
445 | return link_ret; |
---|
446 | } |
---|
447 | |
---|
448 | static GstPadLinkReturn |
---|
449 | gst_queue_link_src (GstPad * pad, const GstCaps * caps) |
---|
450 | { |
---|
451 | GstQueue *queue; |
---|
452 | GstPadLinkReturn link_ret; |
---|
453 | |
---|
454 | queue = GST_QUEUE (gst_pad_get_parent (pad)); |
---|
455 | |
---|
456 | if (queue->cur_level.bytes > 0) { |
---|
457 | if (gst_caps_is_equal (caps, queue->negotiated_caps)) { |
---|
458 | return GST_PAD_LINK_OK; |
---|
459 | } |
---|
460 | return GST_PAD_LINK_REFUSED; |
---|
461 | } |
---|
462 | |
---|
463 | link_ret = gst_pad_proxy_pad_link (pad, caps); |
---|
464 | |
---|
465 | if (GST_PAD_LINK_SUCCESSFUL (link_ret)) { |
---|
466 | /* we store an extra copy of the negotiated caps, just in case |
---|
467 | * the pads become unnegotiated while we have buffers */ |
---|
468 | gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps)); |
---|
469 | } |
---|
470 | |
---|
471 | return link_ret; |
---|
472 | } |
---|
473 | |
---|
474 | static void |
---|
475 | gst_queue_locked_flush (GstQueue * queue) |
---|
476 | { |
---|
477 | while (!g_queue_is_empty (queue->queue)) { |
---|
478 | GstData *data = g_queue_pop_head (queue->queue); |
---|
479 | |
---|
480 | /* First loose the reference we added when putting that data in the queue */ |
---|
481 | gst_data_unref (data); |
---|
482 | /* Then loose another reference because we are supposed to destroy that |
---|
483 | data when flushing */ |
---|
484 | gst_data_unref (data); |
---|
485 | } |
---|
486 | queue->timeval = NULL; |
---|
487 | queue->cur_level.buffers = 0; |
---|
488 | queue->cur_level.bytes = 0; |
---|
489 | queue->cur_level.time = 0; |
---|
490 | |
---|
491 | /* make sure any pending buffers to be added are flushed too */ |
---|
492 | queue->flush = TRUE; |
---|
493 | |
---|
494 | /* we deleted something... */ |
---|
495 | g_cond_signal (queue->item_del); |
---|
496 | } |
---|
497 | |
---|
498 | static void |
---|
499 | gst_queue_handle_pending_events (GstQueue * queue) |
---|
500 | { |
---|
501 | /* check for events to send upstream */ |
---|
502 | /* g_queue_get_length is glib 2.4, so don't depend on it yet, use ->length */ |
---|
503 | GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, |
---|
504 | "handling pending events, events queue of size %d", |
---|
505 | queue->events->length); |
---|
506 | g_mutex_lock (queue->event_lock); |
---|
507 | while (!g_queue_is_empty (queue->events)) { |
---|
508 | GstQueueEventResponse *er; |
---|
509 | |
---|
510 | er = g_queue_pop_head (queue->events); |
---|
511 | |
---|
512 | GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, |
---|
513 | "sending event %p (%d) from event response %p upstream", |
---|
514 | er->event, GST_EVENT_TYPE (er->event), er); |
---|
515 | if (er->handled) { |
---|
516 | /* change this to an assert when this file gets reviewed properly. */ |
---|
517 | GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL), |
---|
518 | ("already handled event %p (%d) from event response %p upstream", |
---|
519 | er->event, GST_EVENT_TYPE (er->event), er)); |
---|
520 | break; |
---|
521 | } |
---|
522 | g_mutex_unlock (queue->event_lock); |
---|
523 | er->ret = gst_pad_event_default (queue->srcpad, er->event); |
---|
524 | er->handled = TRUE; |
---|
525 | g_cond_signal (queue->event_done); |
---|
526 | g_mutex_lock (queue->event_lock); |
---|
527 | GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent"); |
---|
528 | } |
---|
529 | g_mutex_unlock (queue->event_lock); |
---|
530 | } |
---|
531 | |
---|
532 | static void |
---|
533 | gst_queue_chain (GstPad * pad, GstData * data) |
---|
534 | { |
---|
535 | GstQueue *queue; |
---|
536 | |
---|
537 | g_return_if_fail (pad != NULL); |
---|
538 | g_return_if_fail (GST_IS_PAD (pad)); |
---|
539 | g_return_if_fail (data != NULL); |
---|
540 | |
---|
541 | queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); |
---|
542 | |
---|
543 | restart: |
---|
544 | /* we have to lock the queue since we span threads */ |
---|
545 | GST_QUEUE_MUTEX_LOCK; |
---|
546 | |
---|
547 | gst_queue_handle_pending_events (queue); |
---|
548 | |
---|
549 | /* assume don't need to flush this buffer when the queue is filled */ |
---|
550 | queue->flush = FALSE; |
---|
551 | |
---|
552 | if (GST_IS_EVENT (data)) { |
---|
553 | switch (GST_EVENT_TYPE (data)) { |
---|
554 | case GST_EVENT_FLUSH: |
---|
555 | STATUS (queue, "received flush event"); |
---|
556 | gst_queue_locked_flush (queue); |
---|
557 | STATUS (queue, "after flush"); |
---|
558 | break; |
---|
559 | case GST_EVENT_EOS: |
---|
560 | STATUS (queue, "received EOS"); |
---|
561 | break; |
---|
562 | default: |
---|
563 | /* we put the event in the queue, we don't have to act ourselves */ |
---|
564 | GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
---|
565 | "adding event %p of type %d", data, GST_EVENT_TYPE (data)); |
---|
566 | break; |
---|
567 | } |
---|
568 | } |
---|
569 | |
---|
570 | if (GST_IS_BUFFER (data)) |
---|
571 | GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
---|
572 | "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data)); |
---|
573 | |
---|
574 | /* We make space available if we're "full" according to whatever |
---|
575 | * the user defined as "full". Note that this only applies to buffers. |
---|
576 | * We always handle events and they don't count in our statistics. */ |
---|
577 | if (GST_IS_BUFFER (data) && |
---|
578 | ((queue->max_size.buffers > 0 && |
---|
579 | queue->cur_level.buffers >= queue->max_size.buffers) || |
---|
580 | (queue->max_size.bytes > 0 && |
---|
581 | queue->cur_level.bytes >= queue->max_size.bytes) || |
---|
582 | (queue->max_size.time > 0 && |
---|
583 | queue->cur_level.time >= queue->max_size.time))) { |
---|
584 | GST_QUEUE_MUTEX_UNLOCK; |
---|
585 | g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0); |
---|
586 | GST_QUEUE_MUTEX_LOCK; |
---|
587 | |
---|
588 | /* how are we going to make space for this buffer? */ |
---|
589 | switch (queue->leaky) { |
---|
590 | /* leak current buffer */ |
---|
591 | case GST_QUEUE_LEAK_UPSTREAM: |
---|
592 | GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, |
---|
593 | "queue is full, leaking buffer on upstream end"); |
---|
594 | /* now we can clean up and exit right away */ |
---|
595 | GST_QUEUE_MUTEX_UNLOCK; |
---|
596 | goto out_unref; |
---|
597 | |
---|
598 | /* leak first buffer in the queue */ |
---|
599 | case GST_QUEUE_LEAK_DOWNSTREAM:{ |
---|
600 | /* this is a bit hacky. We'll manually iterate the list |
---|
601 | * and find the first buffer from the head on. We'll |
---|
602 | * unref that and "fix up" the GQueue object... */ |
---|
603 | GList *item; |
---|
604 | GstData *leak = NULL; |
---|
605 | |
---|
606 | GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, |
---|
607 | "queue is full, leaking buffer on downstream end"); |
---|
608 | |
---|
609 | for (item = queue->queue->head; item != NULL; item = item->next) { |
---|
610 | if (GST_IS_BUFFER (item->data)) { |
---|
611 | leak = item->data; |
---|
612 | break; |
---|
613 | } |
---|
614 | } |
---|
615 | |
---|
616 | /* if we didn't find anything, it means we have no buffers |
---|
617 | * in here. That cannot happen, since we had >= 1 bufs */ |
---|
618 | g_assert (leak); |
---|
619 | |
---|
620 | /* Now remove it from the list, fixing up the GQueue |
---|
621 | * CHECKME: is a queue->head the first or the last item? */ |
---|
622 | item = g_list_delete_link (queue->queue->head, item); |
---|
623 | queue->queue->head = g_list_first (item); |
---|
624 | queue->queue->tail = g_list_last (item); |
---|
625 | queue->queue->length--; |
---|
626 | |
---|
627 | /* and unref the data at the end. Twice, because we keep a ref |
---|
628 | * to make things read-only. Also keep our list uptodate. */ |
---|
629 | queue->cur_level.bytes -= GST_BUFFER_SIZE (data); |
---|
630 | queue->cur_level.buffers--; |
---|
631 | if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) |
---|
632 | queue->cur_level.time -= GST_BUFFER_DURATION (data); |
---|
633 | |
---|
634 | gst_data_unref (data); |
---|
635 | gst_data_unref (data); |
---|
636 | break; |
---|
637 | } |
---|
638 | |
---|
639 | default: |
---|
640 | g_warning ("Unknown leaky type, using default"); |
---|
641 | /* fall-through */ |
---|
642 | |
---|
643 | /* don't leak. Instead, wait for space to be available */ |
---|
644 | case GST_QUEUE_NO_LEAK: |
---|
645 | STATUS (queue, "pre-full wait"); |
---|
646 | |
---|
647 | while ((queue->max_size.buffers > 0 && |
---|
648 | queue->cur_level.buffers >= queue->max_size.buffers) || |
---|
649 | (queue->max_size.bytes > 0 && |
---|
650 | queue->cur_level.bytes >= queue->max_size.bytes) || |
---|
651 | (queue->max_size.time > 0 && |
---|
652 | queue->cur_level.time >= queue->max_size.time)) { |
---|
653 | /* if there's a pending state change for this queue |
---|
654 | * or its manager, switch back to iterator so bottom |
---|
655 | * half of state change executes */ |
---|
656 | if (queue->interrupt) { |
---|
657 | GstScheduler *sched; |
---|
658 | |
---|
659 | GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted"); |
---|
660 | GST_QUEUE_MUTEX_UNLOCK; |
---|
661 | sched = gst_pad_get_scheduler (queue->sinkpad); |
---|
662 | if (!sched || gst_scheduler_interrupt (sched, GST_ELEMENT (queue))) { |
---|
663 | goto out_unref; |
---|
664 | } |
---|
665 | /* if we got here because we were unlocked after a |
---|
666 | * flush, we don't need to add the buffer to the |
---|
667 | * queue again */ |
---|
668 | if (queue->flush) { |
---|
669 | GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, |
---|
670 | "not adding pending buffer after flush"); |
---|
671 | goto out_unref; |
---|
672 | } |
---|
673 | GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, |
---|
674 | "adding pending buffer after interrupt"); |
---|
675 | goto restart; |
---|
676 | } |
---|
677 | |
---|
678 | if (GST_STATE (queue) != GST_STATE_PLAYING) { |
---|
679 | /* this means the other end is shut down. Try to |
---|
680 | * signal to resolve the error */ |
---|
681 | if (!queue->may_deadlock) { |
---|
682 | GST_QUEUE_MUTEX_UNLOCK; |
---|
683 | gst_data_unref (data); |
---|
684 | GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL), |
---|
685 | ("deadlock found, shutting down source pad elements")); |
---|
686 | /* we don't go to out_unref here, since we want to |
---|
687 | * unref the buffer *before* calling GST_ELEMENT_ERROR */ |
---|
688 | return; |
---|
689 | } else { |
---|
690 | GST_CAT_WARNING_OBJECT (queue_dataflow, queue, |
---|
691 | "%s: waiting for the app to restart " |
---|
692 | "source pad elements", GST_ELEMENT_NAME (queue)); |
---|
693 | } |
---|
694 | } |
---|
695 | |
---|
696 | /* OK, we've got a serious issue here. Imagine the situation |
---|
697 | * where the puller (next element) is sending an event here, |
---|
698 | * so it cannot pull events from the queue, and we cannot |
---|
699 | * push data further because the queue is 'full' and therefore, |
---|
700 | * we wait here (and do not handle events): deadlock! to solve |
---|
701 | * that, we handle pending upstream events here, too. */ |
---|
702 | gst_queue_handle_pending_events (queue); |
---|
703 | |
---|
704 | STATUS (queue, "waiting for item_del signal from thread using qlock"); |
---|
705 | g_cond_wait (queue->item_del, queue->qlock); |
---|
706 | STATUS (queue, "received item_del signal from thread using qlock"); |
---|
707 | } |
---|
708 | |
---|
709 | STATUS (queue, "post-full wait"); |
---|
710 | GST_QUEUE_MUTEX_UNLOCK; |
---|
711 | g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); |
---|
712 | GST_QUEUE_MUTEX_LOCK; |
---|
713 | break; |
---|
714 | } |
---|
715 | } |
---|
716 | |
---|
717 | /* put the buffer on the tail of the list. We keep a reference, |
---|
718 | * so that the data is read-only while in here. There's a good |
---|
719 | * reason to do so: we have a size and time counter, and any |
---|
720 | * modification to the content could change any of the two. */ |
---|
721 | gst_data_ref (data); |
---|
722 | g_queue_push_tail (queue->queue, data); |
---|
723 | |
---|
724 | /* Note that we only add buffers (not events) to the statistics */ |
---|
725 | if (GST_IS_BUFFER (data)) { |
---|
726 | queue->cur_level.buffers++; |
---|
727 | queue->cur_level.bytes += GST_BUFFER_SIZE (data); |
---|
728 | if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) |
---|
729 | queue->cur_level.time += GST_BUFFER_DURATION (data); |
---|
730 | } |
---|
731 | |
---|
732 | STATUS (queue, "+ level"); |
---|
733 | |
---|
734 | GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add"); |
---|
735 | g_cond_signal (queue->item_add); |
---|
736 | GST_QUEUE_MUTEX_UNLOCK; |
---|
737 | |
---|
738 | return; |
---|
739 | |
---|
740 | out_unref: |
---|
741 | gst_data_unref (data); |
---|
742 | return; |
---|
743 | } |
---|
744 | |
---|
745 | static GstData * |
---|
746 | gst_queue_get (GstPad * pad) |
---|
747 | { |
---|
748 | GstQueue *queue; |
---|
749 | GstData *data; |
---|
750 | |
---|
751 | g_return_val_if_fail (pad != NULL, NULL); |
---|
752 | g_return_val_if_fail (GST_IS_PAD (pad), NULL); |
---|
753 | |
---|
754 | queue = GST_QUEUE (gst_pad_get_parent (pad)); |
---|
755 | |
---|
756 | restart: |
---|
757 | /* have to lock for thread-safety */ |
---|
758 | GST_QUEUE_MUTEX_LOCK; |
---|
759 | |
---|
760 | if (queue->queue->length == 0 || |
---|
761 | (queue->min_threshold.buffers > 0 && |
---|
762 | queue->cur_level.buffers < queue->min_threshold.buffers) || |
---|
763 | (queue->min_threshold.bytes > 0 && |
---|
764 | queue->cur_level.bytes < queue->min_threshold.bytes) || |
---|
765 | (queue->min_threshold.time > 0 && |
---|
766 | queue->cur_level.time < queue->min_threshold.time)) { |
---|
767 | GST_QUEUE_MUTEX_UNLOCK; |
---|
768 | g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0); |
---|
769 | GST_QUEUE_MUTEX_LOCK; |
---|
770 | |
---|
771 | STATUS (queue, "pre-empty wait"); |
---|
772 | while (queue->queue->length == 0 || |
---|
773 | (queue->min_threshold.buffers > 0 && |
---|
774 | queue->cur_level.buffers < queue->min_threshold.buffers) || |
---|
775 | (queue->min_threshold.bytes > 0 && |
---|
776 | queue->cur_level.bytes < queue->min_threshold.bytes) || |
---|
777 | (queue->min_threshold.time > 0 && |
---|
778 | queue->cur_level.time < queue->min_threshold.time)) { |
---|
779 | /* if there's a pending state change for this queue or its |
---|
780 | * manager, switch back to iterator so bottom half of state |
---|
781 | * change executes. */ |
---|
782 | if (queue->interrupt) { |
---|
783 | GstScheduler *sched; |
---|
784 | |
---|
785 | GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted"); |
---|
786 | GST_QUEUE_MUTEX_UNLOCK; |
---|
787 | sched = gst_pad_get_scheduler (queue->srcpad); |
---|
788 | if (!sched || gst_scheduler_interrupt (sched, GST_ELEMENT (queue))) |
---|
789 | return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT)); |
---|
790 | goto restart; |
---|
791 | } |
---|
792 | if (GST_STATE (queue) != GST_STATE_PLAYING) { |
---|
793 | /* this means the other end is shut down */ |
---|
794 | if (!queue->may_deadlock) { |
---|
795 | GST_QUEUE_MUTEX_UNLOCK; |
---|
796 | GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL), |
---|
797 | ("deadlock found, shutting down sink pad elements")); |
---|
798 | goto restart; |
---|
799 | } else { |
---|
800 | GST_CAT_WARNING_OBJECT (queue_dataflow, queue, |
---|
801 | "%s: waiting for the app to restart " |
---|
802 | "source pad elements", GST_ELEMENT_NAME (queue)); |
---|
803 | } |
---|
804 | } |
---|
805 | |
---|
806 | STATUS (queue, "waiting for item_add"); |
---|
807 | |
---|
808 | if (queue->block_timeout != GST_CLOCK_TIME_NONE) { |
---|
809 | GTimeVal timeout; |
---|
810 | |
---|
811 | g_get_current_time (&timeout); |
---|
812 | g_time_val_add (&timeout, queue->block_timeout / 1000); |
---|
813 | GST_LOG_OBJECT (queue, "g_cond_time_wait using qlock from thread %p", |
---|
814 | g_thread_self ()); |
---|
815 | if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)) { |
---|
816 | GST_QUEUE_MUTEX_UNLOCK; |
---|
817 | GST_CAT_WARNING_OBJECT (queue_dataflow, queue, |
---|
818 | "Sending filler event"); |
---|
819 | return GST_DATA (gst_event_new_filler ()); |
---|
820 | } |
---|
821 | } else { |
---|
822 | GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p", |
---|
823 | g_thread_self ()); |
---|
824 | g_cond_wait (queue->item_add, queue->qlock); |
---|
825 | GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p", |
---|
826 | g_thread_self ()); |
---|
827 | } |
---|
828 | STATUS (queue, "got item_add signal"); |
---|
829 | } |
---|
830 | |
---|
831 | STATUS (queue, "post-empty wait"); |
---|
832 | GST_QUEUE_MUTEX_UNLOCK; |
---|
833 | g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); |
---|
834 | GST_QUEUE_MUTEX_LOCK; |
---|
835 | } |
---|
836 | |
---|
837 | /* There's something in the list now, whatever it is */ |
---|
838 | data = g_queue_pop_head (queue->queue); |
---|
839 | GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
---|
840 | "retrieved data %p from queue", data); |
---|
841 | |
---|
842 | if (data == NULL) |
---|
843 | return NULL; |
---|
844 | |
---|
845 | if (GST_IS_BUFFER (data)) { |
---|
846 | /* Update statistics */ |
---|
847 | queue->cur_level.buffers--; |
---|
848 | queue->cur_level.bytes -= GST_BUFFER_SIZE (data); |
---|
849 | if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) |
---|
850 | queue->cur_level.time -= GST_BUFFER_DURATION (data); |
---|
851 | } |
---|
852 | |
---|
853 | /* Now that we're done, we can lose our own reference to |
---|
854 | * the item, since we're no longer in danger. */ |
---|
855 | gst_data_unref (data); |
---|
856 | |
---|
857 | STATUS (queue, "after _get()"); |
---|
858 | |
---|
859 | GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del"); |
---|
860 | g_cond_signal (queue->item_del); |
---|
861 | GST_QUEUE_MUTEX_UNLOCK; |
---|
862 | |
---|
863 | /* FIXME: I suppose this needs to be locked, since the EOS |
---|
864 | * bit affects the pipeline state. However, that bit is |
---|
865 | * locked too so it'd cause a deadlock. */ |
---|
866 | if (GST_IS_EVENT (data)) { |
---|
867 | GstEvent *event = GST_EVENT (data); |
---|
868 | |
---|
869 | switch (GST_EVENT_TYPE (event)) { |
---|
870 | case GST_EVENT_EOS: |
---|
871 | GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, |
---|
872 | "queue \"%s\" eos", GST_ELEMENT_NAME (queue)); |
---|
873 | gst_element_set_eos (GST_ELEMENT (queue)); |
---|
874 | break; |
---|
875 | default: |
---|
876 | break; |
---|
877 | } |
---|
878 | } |
---|
879 | |
---|
880 | return data; |
---|
881 | } |
---|
882 | |
---|
883 | |
---|
884 | static gboolean |
---|
885 | gst_queue_handle_src_event (GstPad * pad, GstEvent * event) |
---|
886 | { |
---|
887 | GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); |
---|
888 | gboolean res; |
---|
889 | |
---|
890 | GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)", |
---|
891 | event, GST_EVENT_TYPE (event)); |
---|
892 | GST_QUEUE_MUTEX_LOCK; |
---|
893 | |
---|
894 | if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) { |
---|
895 | GstQueueEventResponse er; |
---|
896 | |
---|
897 | /* push the event to the queue and wait for upstream consumption */ |
---|
898 | er.event = event; |
---|
899 | er.handled = FALSE; |
---|
900 | g_mutex_lock (queue->event_lock); |
---|
901 | GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, |
---|
902 | "putting event %p (%d) on internal queue", event, |
---|
903 | GST_EVENT_TYPE (event)); |
---|
904 | g_queue_push_tail (queue->events, &er); |
---|
905 | g_mutex_unlock (queue->event_lock); |
---|
906 | GST_CAT_WARNING_OBJECT (queue_dataflow, queue, |
---|
907 | "Preparing for loop for event handler"); |
---|
908 | /* see the chain function on why this is here - it prevents a deadlock */ |
---|
909 | g_cond_signal (queue->item_del); |
---|
910 | while (!er.handled) { |
---|
911 | GTimeVal timeout; |
---|
912 | |
---|
913 | g_get_current_time (&timeout); |
---|
914 | g_time_val_add (&timeout, 500 * 1000); /* half a second */ |
---|
915 | GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p", |
---|
916 | g_thread_self ()); |
---|
917 | if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) && |
---|
918 | !er.handled) { |
---|
919 | GST_CAT_WARNING_OBJECT (queue_dataflow, queue, |
---|
920 | "timeout in upstream event handling, dropping event %p (%d)", |
---|
921 | er.event, GST_EVENT_TYPE (er.event)); |
---|
922 | g_mutex_lock (queue->event_lock); |
---|
923 | /* since this queue is for src events (ie upstream), this thread is |
---|
924 | * the only one that is pushing stuff on it, so we're sure that |
---|
925 | * it's still the tail element. FIXME: But in practice, we should use |
---|
926 | * GList instead of GQueue for this so we can remove any element in |
---|
927 | * the list. */ |
---|
928 | g_queue_pop_tail (queue->events); |
---|
929 | g_mutex_unlock (queue->event_lock); |
---|
930 | gst_event_unref (er.event); |
---|
931 | res = FALSE; |
---|
932 | goto handled; |
---|
933 | } |
---|
934 | } |
---|
935 | GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Event handled"); |
---|
936 | res = er.ret; |
---|
937 | } else { |
---|
938 | res = gst_pad_event_default (pad, event); |
---|
939 | |
---|
940 | switch (GST_EVENT_TYPE (event)) { |
---|
941 | case GST_EVENT_FLUSH: |
---|
942 | GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, |
---|
943 | "FLUSH event, flushing queue\n"); |
---|
944 | gst_queue_locked_flush (queue); |
---|
945 | break; |
---|
946 | case GST_EVENT_SEEK: |
---|
947 | if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) { |
---|
948 | gst_queue_locked_flush (queue); |
---|
949 | } |
---|
950 | default: |
---|
951 | break; |
---|
952 | } |
---|
953 | } |
---|
954 | handled: |
---|
955 | GST_QUEUE_MUTEX_UNLOCK; |
---|
956 | |
---|
957 | return res; |
---|
958 | } |
---|
959 | |
---|
960 | static gboolean |
---|
961 | gst_queue_handle_src_query (GstPad * pad, |
---|
962 | GstQueryType type, GstFormat * fmt, gint64 * value) |
---|
963 | { |
---|
964 | GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); |
---|
965 | gboolean res; |
---|
966 | |
---|
967 | if (!GST_PAD_PEER (queue->sinkpad)) |
---|
968 | return FALSE; |
---|
969 | res = gst_pad_query (GST_PAD_PEER (queue->sinkpad), type, fmt, value); |
---|
970 | if (!res) |
---|
971 | return FALSE; |
---|
972 | |
---|
973 | if (type == GST_QUERY_POSITION) { |
---|
974 | /* FIXME: this code assumes that there's no discont in the queue */ |
---|
975 | switch (*fmt) { |
---|
976 | case GST_FORMAT_BYTES: |
---|
977 | *value -= queue->cur_level.bytes; |
---|
978 | break; |
---|
979 | case GST_FORMAT_TIME: |
---|
980 | *value -= queue->cur_level.time; |
---|
981 | break; |
---|
982 | default: |
---|
983 | /* FIXME */ |
---|
984 | break; |
---|
985 | } |
---|
986 | } |
---|
987 | |
---|
988 | return TRUE; |
---|
989 | } |
---|
990 | |
---|
991 | static gboolean |
---|
992 | gst_queue_release_locks (GstElement * element) |
---|
993 | { |
---|
994 | GstQueue *queue; |
---|
995 | |
---|
996 | queue = GST_QUEUE (element); |
---|
997 | |
---|
998 | GST_QUEUE_MUTEX_LOCK; |
---|
999 | queue->interrupt = TRUE; |
---|
1000 | g_cond_signal (queue->item_add); |
---|
1001 | g_cond_signal (queue->item_del); |
---|
1002 | GST_QUEUE_MUTEX_UNLOCK; |
---|
1003 | |
---|
1004 | return TRUE; |
---|
1005 | } |
---|
1006 | |
---|
1007 | static GstElementStateReturn |
---|
1008 | gst_queue_change_state (GstElement * element) |
---|
1009 | { |
---|
1010 | GstQueue *queue; |
---|
1011 | GstElementStateReturn ret = GST_STATE_SUCCESS; |
---|
1012 | |
---|
1013 | queue = GST_QUEUE (element); |
---|
1014 | |
---|
1015 | GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, |
---|
1016 | "starting state change 0x%x", GST_STATE_TRANSITION (element)); |
---|
1017 | |
---|
1018 | /* lock the queue so another thread (not in sync with this thread's state) |
---|
1019 | * can't call this queue's _get (or whatever) |
---|
1020 | */ |
---|
1021 | GST_QUEUE_MUTEX_LOCK; |
---|
1022 | |
---|
1023 | switch (GST_STATE_TRANSITION (element)) { |
---|
1024 | case GST_STATE_NULL_TO_READY: |
---|
1025 | gst_queue_locked_flush (queue); |
---|
1026 | break; |
---|
1027 | case GST_STATE_PAUSED_TO_PLAYING: |
---|
1028 | if (!GST_PAD_IS_LINKED (queue->sinkpad)) { |
---|
1029 | GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, |
---|
1030 | "queue %s is not linked", GST_ELEMENT_NAME (queue)); |
---|
1031 | /* FIXME can this be? */ |
---|
1032 | g_cond_signal (queue->item_add); |
---|
1033 | |
---|
1034 | ret = GST_STATE_FAILURE; |
---|
1035 | goto unlock; |
---|
1036 | } else { |
---|
1037 | GstScheduler *src_sched, *sink_sched; |
---|
1038 | |
---|
1039 | src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad)); |
---|
1040 | sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad)); |
---|
1041 | |
---|
1042 | if (src_sched == sink_sched) { |
---|
1043 | GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, |
---|
1044 | "queue %s does not connect different schedulers", |
---|
1045 | GST_ELEMENT_NAME (queue)); |
---|
1046 | |
---|
1047 | g_warning ("queue %s does not connect different schedulers", |
---|
1048 | GST_ELEMENT_NAME (queue)); |
---|
1049 | |
---|
1050 | ret = GST_STATE_FAILURE; |
---|
1051 | goto unlock; |
---|
1052 | } |
---|
1053 | } |
---|
1054 | queue->interrupt = FALSE; |
---|
1055 | break; |
---|
1056 | case GST_STATE_PAUSED_TO_READY: |
---|
1057 | gst_queue_locked_flush (queue); |
---|
1058 | gst_caps_replace (&queue->negotiated_caps, NULL); |
---|
1059 | break; |
---|
1060 | default: |
---|
1061 | break; |
---|
1062 | } |
---|
1063 | |
---|
1064 | GST_QUEUE_MUTEX_UNLOCK; |
---|
1065 | |
---|
1066 | if (GST_ELEMENT_CLASS (parent_class)->change_state) |
---|
1067 | ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); |
---|
1068 | |
---|
1069 | /* this is an ugly hack to make sure our pads are always active. |
---|
1070 | * Reason for this is that pad activation for the queue element |
---|
1071 | * depends on 2 schedulers (ugh) */ |
---|
1072 | gst_pad_set_active (queue->sinkpad, TRUE); |
---|
1073 | gst_pad_set_active (queue->srcpad, TRUE); |
---|
1074 | |
---|
1075 | GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change"); |
---|
1076 | |
---|
1077 | return ret; |
---|
1078 | |
---|
1079 | unlock: |
---|
1080 | GST_QUEUE_MUTEX_UNLOCK; |
---|
1081 | |
---|
1082 | GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change"); |
---|
1083 | |
---|
1084 | return ret; |
---|
1085 | } |
---|
1086 | |
---|
1087 | |
---|
1088 | static void |
---|
1089 | gst_queue_set_property (GObject * object, |
---|
1090 | guint prop_id, const GValue * value, GParamSpec * pspec) |
---|
1091 | { |
---|
1092 | GstQueue *queue = GST_QUEUE (object); |
---|
1093 | |
---|
1094 | /* someone could change levels here, and since this |
---|
1095 | * affects the get/put funcs, we need to lock for safety. */ |
---|
1096 | GST_QUEUE_MUTEX_LOCK; |
---|
1097 | |
---|
1098 | switch (prop_id) { |
---|
1099 | case ARG_MAX_SIZE_BYTES: |
---|
1100 | queue->max_size.bytes = g_value_get_uint (value); |
---|
1101 | break; |
---|
1102 | case ARG_MAX_SIZE_BUFFERS: |
---|
1103 | queue->max_size.buffers = g_value_get_uint (value); |
---|
1104 | break; |
---|
1105 | case ARG_MAX_SIZE_TIME: |
---|
1106 | queue->max_size.time = g_value_get_uint64 (value); |
---|
1107 | break; |
---|
1108 | case ARG_MIN_THRESHOLD_BYTES: |
---|
1109 | queue->min_threshold.bytes = g_value_get_uint (value); |
---|
1110 | break; |
---|
1111 | case ARG_MIN_THRESHOLD_BUFFERS: |
---|
1112 | queue->min_threshold.buffers = g_value_get_uint (value); |
---|
1113 | break; |
---|
1114 | case ARG_MIN_THRESHOLD_TIME: |
---|
1115 | queue->min_threshold.time = g_value_get_uint64 (value); |
---|
1116 | break; |
---|
1117 | case ARG_LEAKY: |
---|
1118 | queue->leaky = g_value_get_enum (value); |
---|
1119 | break; |
---|
1120 | case ARG_MAY_DEADLOCK: |
---|
1121 | queue->may_deadlock = g_value_get_boolean (value); |
---|
1122 | break; |
---|
1123 | case ARG_BLOCK_TIMEOUT: |
---|
1124 | queue->block_timeout = g_value_get_uint64 (value); |
---|
1125 | break; |
---|
1126 | default: |
---|
1127 | G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
---|
1128 | break; |
---|
1129 | } |
---|
1130 | |
---|
1131 | GST_QUEUE_MUTEX_UNLOCK; |
---|
1132 | } |
---|
1133 | |
---|
1134 | static void |
---|
1135 | gst_queue_get_property (GObject * object, |
---|
1136 | guint prop_id, GValue * value, GParamSpec * pspec) |
---|
1137 | { |
---|
1138 | GstQueue *queue = GST_QUEUE (object); |
---|
1139 | |
---|
1140 | switch (prop_id) { |
---|
1141 | case ARG_CUR_LEVEL_BYTES: |
---|
1142 | g_value_set_uint (value, queue->cur_level.bytes); |
---|
1143 | break; |
---|
1144 | case ARG_CUR_LEVEL_BUFFERS: |
---|
1145 | g_value_set_uint (value, queue->cur_level.buffers); |
---|
1146 | break; |
---|
1147 | case ARG_CUR_LEVEL_TIME: |
---|
1148 | g_value_set_uint64 (value, queue->cur_level.time); |
---|
1149 | break; |
---|
1150 | case ARG_MAX_SIZE_BYTES: |
---|
1151 | g_value_set_uint (value, queue->max_size.bytes); |
---|
1152 | break; |
---|
1153 | case ARG_MAX_SIZE_BUFFERS: |
---|
1154 | g_value_set_uint (value, queue->max_size.buffers); |
---|
1155 | break; |
---|
1156 | case ARG_MAX_SIZE_TIME: |
---|
1157 | g_value_set_uint64 (value, queue->max_size.time); |
---|
1158 | break; |
---|
1159 | case ARG_MIN_THRESHOLD_BYTES: |
---|
1160 | g_value_set_uint (value, queue->min_threshold.bytes); |
---|
1161 | break; |
---|
1162 | case ARG_MIN_THRESHOLD_BUFFERS: |
---|
1163 | g_value_set_uint (value, queue->min_threshold.buffers); |
---|
1164 | break; |
---|
1165 | case ARG_MIN_THRESHOLD_TIME: |
---|
1166 | g_value_set_uint64 (value, queue->min_threshold.time); |
---|
1167 | break; |
---|
1168 | case ARG_LEAKY: |
---|
1169 | g_value_set_enum (value, queue->leaky); |
---|
1170 | break; |
---|
1171 | case ARG_MAY_DEADLOCK: |
---|
1172 | g_value_set_boolean (value, queue->may_deadlock); |
---|
1173 | break; |
---|
1174 | case ARG_BLOCK_TIMEOUT: |
---|
1175 | g_value_set_uint64 (value, queue->block_timeout); |
---|
1176 | break; |
---|
1177 | default: |
---|
1178 | G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
---|
1179 | break; |
---|
1180 | } |
---|
1181 | } |
---|