]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
pulse: Use more intuitive indexing with port infos in introspect.c.
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84 pa_context *c,
85 const char *name,
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
88 pa_format_info * const *formats,
89 unsigned int n_formats,
90 pa_proplist *p) {
91
92 pa_stream *s;
93 unsigned int i;
94
95 pa_assert(c);
96 pa_assert(PA_REFCNT_VALUE(c) >= 1);
97 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98 pa_assert(n_formats < PA_MAX_FORMATS);
99
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103 s = pa_xnew(pa_stream, 1);
104 PA_REFCNT_INIT(s);
105 s->context = c;
106 s->mainloop = c->mainloop;
107
108 s->direction = PA_STREAM_NODIRECTION;
109 s->state = PA_STREAM_UNCONNECTED;
110 s->flags = 0;
111
112 if (ss)
113 s->sample_spec = *ss;
114 else
115 pa_sample_spec_init(&s->sample_spec);
116
117 if (map)
118 s->channel_map = *map;
119 else
120 pa_channel_map_init(&s->channel_map);
121
122 s->n_formats = 0;
123 if (formats) {
124 s->n_formats = n_formats;
125 for (i = 0; i < n_formats; i++)
126 s->req_formats[i] = pa_format_info_copy(formats[i]);
127 }
128
129 /* We'll get the final negotiated format after connecting */
130 s->format = NULL;
131
132 s->direct_on_input = PA_INVALID_INDEX;
133
134 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
135 if (name)
136 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
137
138 s->channel = 0;
139 s->channel_valid = FALSE;
140 s->syncid = c->csyncid++;
141 s->stream_index = PA_INVALID_INDEX;
142
143 s->requested_bytes = 0;
144 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
145
146 /* We initialize the target length here, so that if the user
147 * passes no explicit buffering metrics the default is similar to
148 * what older PA versions provided. */
149
150 s->buffer_attr.maxlength = (uint32_t) -1;
151 if (ss)
152 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
153 else {
154 /* FIXME: We assume a worst-case compressed format corresponding to
155 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156 pa_sample_spec tmp_ss = {
157 .format = PA_SAMPLE_S16NE,
158 .rate = 48000,
159 .channels = 2,
160 };
161 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
162 }
163 s->buffer_attr.minreq = (uint32_t) -1;
164 s->buffer_attr.prebuf = (uint32_t) -1;
165 s->buffer_attr.fragsize = (uint32_t) -1;
166
167 s->device_index = PA_INVALID_INDEX;
168 s->device_name = NULL;
169 s->suspended = FALSE;
170 s->corked = FALSE;
171
172 s->write_memblock = NULL;
173 s->write_data = NULL;
174
175 pa_memchunk_reset(&s->peek_memchunk);
176 s->peek_data = NULL;
177 s->record_memblockq = NULL;
178
179 memset(&s->timing_info, 0, sizeof(s->timing_info));
180 s->timing_info_valid = FALSE;
181
182 s->previous_time = 0;
183 s->latest_underrun_at_index = -1;
184
185 s->read_index_not_before = 0;
186 s->write_index_not_before = 0;
187 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188 s->write_index_corrections[i].valid = 0;
189 s->current_write_index_correction = 0;
190
191 s->auto_timing_update_event = NULL;
192 s->auto_timing_update_requested = FALSE;
193 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
194
195 reset_callbacks(s);
196
197 s->smoother = NULL;
198
199 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200 PA_LLIST_PREPEND(pa_stream, c->streams, s);
201 pa_stream_ref(s);
202
203 return s;
204 }
205
206 pa_stream *pa_stream_new_with_proplist(
207 pa_context *c,
208 const char *name,
209 const pa_sample_spec *ss,
210 const pa_channel_map *map,
211 pa_proplist *p) {
212
213 pa_channel_map tmap;
214
215 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
220
221 if (!map)
222 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
223
224 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
225 }
226
227 pa_stream *pa_stream_new_extended(
228 pa_context *c,
229 const char *name,
230 pa_format_info * const *formats,
231 unsigned int n_formats,
232 pa_proplist *p) {
233
234 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
235
236 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
237 }
238
239 static void stream_unlink(pa_stream *s) {
240 pa_operation *o, *n;
241 pa_assert(s);
242
243 if (!s->context)
244 return;
245
246 /* Detach from context */
247
248 /* Unref all operation objects that point to us */
249 for (o = s->context->operations; o; o = n) {
250 n = o->next;
251
252 if (o->stream == s)
253 pa_operation_cancel(o);
254 }
255
256 /* Drop all outstanding replies for this stream */
257 if (s->context->pdispatch)
258 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
259
260 if (s->channel_valid) {
261 pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
262 s->channel = 0;
263 s->channel_valid = FALSE;
264 }
265
266 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
267 pa_stream_unref(s);
268
269 s->context = NULL;
270
271 if (s->auto_timing_update_event) {
272 pa_assert(s->mainloop);
273 s->mainloop->time_free(s->auto_timing_update_event);
274 }
275
276 reset_callbacks(s);
277 }
278
279 static void stream_free(pa_stream *s) {
280 unsigned int i;
281
282 pa_assert(s);
283
284 stream_unlink(s);
285
286 if (s->write_memblock) {
287 if (s->write_data)
288 pa_memblock_release(s->write_memblock);
289 pa_memblock_unref(s->write_memblock);
290 }
291
292 if (s->peek_memchunk.memblock) {
293 if (s->peek_data)
294 pa_memblock_release(s->peek_memchunk.memblock);
295 pa_memblock_unref(s->peek_memchunk.memblock);
296 }
297
298 if (s->record_memblockq)
299 pa_memblockq_free(s->record_memblockq);
300
301 if (s->proplist)
302 pa_proplist_free(s->proplist);
303
304 if (s->smoother)
305 pa_smoother_free(s->smoother);
306
307 for (i = 0; i < s->n_formats; i++)
308 pa_format_info_free(s->req_formats[i]);
309
310 if (s->format)
311 pa_format_info_free(s->format);
312
313 pa_xfree(s->device_name);
314 pa_xfree(s);
315 }
316
317 void pa_stream_unref(pa_stream *s) {
318 pa_assert(s);
319 pa_assert(PA_REFCNT_VALUE(s) >= 1);
320
321 if (PA_REFCNT_DEC(s) <= 0)
322 stream_free(s);
323 }
324
325 pa_stream* pa_stream_ref(pa_stream *s) {
326 pa_assert(s);
327 pa_assert(PA_REFCNT_VALUE(s) >= 1);
328
329 PA_REFCNT_INC(s);
330 return s;
331 }
332
333 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
334 pa_assert(s);
335 pa_assert(PA_REFCNT_VALUE(s) >= 1);
336
337 return s->state;
338 }
339
340 pa_context* pa_stream_get_context(pa_stream *s) {
341 pa_assert(s);
342 pa_assert(PA_REFCNT_VALUE(s) >= 1);
343
344 return s->context;
345 }
346
347 uint32_t pa_stream_get_index(pa_stream *s) {
348 pa_assert(s);
349 pa_assert(PA_REFCNT_VALUE(s) >= 1);
350
351 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
352 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
353
354 return s->stream_index;
355 }
356
357 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
358 pa_assert(s);
359 pa_assert(PA_REFCNT_VALUE(s) >= 1);
360
361 if (s->state == st)
362 return;
363
364 pa_stream_ref(s);
365
366 s->state = st;
367
368 if (s->state_callback)
369 s->state_callback(s, s->state_userdata);
370
371 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
372 stream_unlink(s);
373
374 pa_stream_unref(s);
375 }
376
377 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
378 pa_assert(s);
379 pa_assert(PA_REFCNT_VALUE(s) >= 1);
380
381 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
382 return;
383
384 if (s->state == PA_STREAM_READY &&
385 (force || !s->auto_timing_update_requested)) {
386 pa_operation *o;
387
388 /* pa_log("Automatically requesting new timing data"); */
389
390 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
391 pa_operation_unref(o);
392 s->auto_timing_update_requested = TRUE;
393 }
394 }
395
396 if (s->auto_timing_update_event) {
397 if (s->suspended && !force) {
398 pa_assert(s->mainloop);
399 s->mainloop->time_free(s->auto_timing_update_event);
400 s->auto_timing_update_event = NULL;
401 } else {
402 if (force)
403 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
404
405 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
406
407 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
408 }
409 }
410 }
411
412 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
413 pa_context *c = userdata;
414 pa_stream *s;
415 uint32_t channel;
416
417 pa_assert(pd);
418 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
419 pa_assert(t);
420 pa_assert(c);
421 pa_assert(PA_REFCNT_VALUE(c) >= 1);
422
423 pa_context_ref(c);
424
425 if (pa_tagstruct_getu32(t, &channel) < 0 ||
426 !pa_tagstruct_eof(t)) {
427 pa_context_fail(c, PA_ERR_PROTOCOL);
428 goto finish;
429 }
430
431 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
432 goto finish;
433
434 if (s->state != PA_STREAM_READY)
435 goto finish;
436
437 pa_context_set_error(c, PA_ERR_KILLED);
438 pa_stream_set_state(s, PA_STREAM_FAILED);
439
440 finish:
441 pa_context_unref(c);
442 }
443
444 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
445 pa_usec_t x;
446
447 pa_assert(s);
448 pa_assert(!force_start || !force_stop);
449
450 if (!s->smoother)
451 return;
452
453 x = pa_rtclock_now();
454
455 if (s->timing_info_valid) {
456 if (aposteriori)
457 x -= s->timing_info.transport_usec;
458 else
459 x += s->timing_info.transport_usec;
460 }
461
462 if (s->suspended || s->corked || force_stop)
463 pa_smoother_pause(s->smoother, x);
464 else if (force_start || s->buffer_attr.prebuf == 0) {
465
466 if (!s->timing_info_valid &&
467 !aposteriori &&
468 !force_start &&
469 !force_stop &&
470 s->context->version >= 13) {
471
472 /* If the server supports STARTED events we take them as
473 * indications when audio really starts/stops playing, if
474 * we don't have any timing info yet -- instead of trying
475 * to be smart and guessing the server time. Otherwise the
476 * unknown transport delay adds too much noise to our time
477 * calculations. */
478
479 return;
480 }
481
482 pa_smoother_resume(s->smoother, x, TRUE);
483 }
484
485 /* Please note that we have no idea if playback actually started
486 * if prebuf is non-zero! */
487 }
488
489 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
490
491 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
492 pa_context *c = userdata;
493 pa_stream *s;
494 uint32_t channel;
495 const char *dn;
496 pa_bool_t suspended;
497 uint32_t di;
498 pa_usec_t usec = 0;
499 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
500
501 pa_assert(pd);
502 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
503 pa_assert(t);
504 pa_assert(c);
505 pa_assert(PA_REFCNT_VALUE(c) >= 1);
506
507 pa_context_ref(c);
508
509 if (c->version < 12) {
510 pa_context_fail(c, PA_ERR_PROTOCOL);
511 goto finish;
512 }
513
514 if (pa_tagstruct_getu32(t, &channel) < 0 ||
515 pa_tagstruct_getu32(t, &di) < 0 ||
516 pa_tagstruct_gets(t, &dn) < 0 ||
517 pa_tagstruct_get_boolean(t, &suspended) < 0) {
518 pa_context_fail(c, PA_ERR_PROTOCOL);
519 goto finish;
520 }
521
522 if (c->version >= 13) {
523
524 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
525 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
526 pa_tagstruct_getu32(t, &fragsize) < 0 ||
527 pa_tagstruct_get_usec(t, &usec) < 0) {
528 pa_context_fail(c, PA_ERR_PROTOCOL);
529 goto finish;
530 }
531 } else {
532 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
533 pa_tagstruct_getu32(t, &tlength) < 0 ||
534 pa_tagstruct_getu32(t, &prebuf) < 0 ||
535 pa_tagstruct_getu32(t, &minreq) < 0 ||
536 pa_tagstruct_get_usec(t, &usec) < 0) {
537 pa_context_fail(c, PA_ERR_PROTOCOL);
538 goto finish;
539 }
540 }
541 }
542
543 if (!pa_tagstruct_eof(t)) {
544 pa_context_fail(c, PA_ERR_PROTOCOL);
545 goto finish;
546 }
547
548 if (!dn || di == PA_INVALID_INDEX) {
549 pa_context_fail(c, PA_ERR_PROTOCOL);
550 goto finish;
551 }
552
553 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
554 goto finish;
555
556 if (s->state != PA_STREAM_READY)
557 goto finish;
558
559 if (c->version >= 13) {
560 if (s->direction == PA_STREAM_RECORD)
561 s->timing_info.configured_source_usec = usec;
562 else
563 s->timing_info.configured_sink_usec = usec;
564
565 s->buffer_attr.maxlength = maxlength;
566 s->buffer_attr.fragsize = fragsize;
567 s->buffer_attr.tlength = tlength;
568 s->buffer_attr.prebuf = prebuf;
569 s->buffer_attr.minreq = minreq;
570 }
571
572 pa_xfree(s->device_name);
573 s->device_name = pa_xstrdup(dn);
574 s->device_index = di;
575
576 s->suspended = suspended;
577
578 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
579 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
580 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
581 request_auto_timing_update(s, TRUE);
582 }
583
584 check_smoother_status(s, TRUE, FALSE, FALSE);
585 request_auto_timing_update(s, TRUE);
586
587 if (s->moved_callback)
588 s->moved_callback(s, s->moved_userdata);
589
590 finish:
591 pa_context_unref(c);
592 }
593
594 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
595 pa_context *c = userdata;
596 pa_stream *s;
597 uint32_t channel;
598 pa_usec_t usec = 0;
599 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
600
601 pa_assert(pd);
602 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
603 pa_assert(t);
604 pa_assert(c);
605 pa_assert(PA_REFCNT_VALUE(c) >= 1);
606
607 pa_context_ref(c);
608
609 if (c->version < 15) {
610 pa_context_fail(c, PA_ERR_PROTOCOL);
611 goto finish;
612 }
613
614 if (pa_tagstruct_getu32(t, &channel) < 0) {
615 pa_context_fail(c, PA_ERR_PROTOCOL);
616 goto finish;
617 }
618
619 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
620 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
621 pa_tagstruct_getu32(t, &fragsize) < 0 ||
622 pa_tagstruct_get_usec(t, &usec) < 0) {
623 pa_context_fail(c, PA_ERR_PROTOCOL);
624 goto finish;
625 }
626 } else {
627 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
628 pa_tagstruct_getu32(t, &tlength) < 0 ||
629 pa_tagstruct_getu32(t, &prebuf) < 0 ||
630 pa_tagstruct_getu32(t, &minreq) < 0 ||
631 pa_tagstruct_get_usec(t, &usec) < 0) {
632 pa_context_fail(c, PA_ERR_PROTOCOL);
633 goto finish;
634 }
635 }
636
637 if (!pa_tagstruct_eof(t)) {
638 pa_context_fail(c, PA_ERR_PROTOCOL);
639 goto finish;
640 }
641
642 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
643 goto finish;
644
645 if (s->state != PA_STREAM_READY)
646 goto finish;
647
648 if (s->direction == PA_STREAM_RECORD)
649 s->timing_info.configured_source_usec = usec;
650 else
651 s->timing_info.configured_sink_usec = usec;
652
653 s->buffer_attr.maxlength = maxlength;
654 s->buffer_attr.fragsize = fragsize;
655 s->buffer_attr.tlength = tlength;
656 s->buffer_attr.prebuf = prebuf;
657 s->buffer_attr.minreq = minreq;
658
659 request_auto_timing_update(s, TRUE);
660
661 if (s->buffer_attr_callback)
662 s->buffer_attr_callback(s, s->buffer_attr_userdata);
663
664 finish:
665 pa_context_unref(c);
666 }
667
668 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
669 pa_context *c = userdata;
670 pa_stream *s;
671 uint32_t channel;
672 pa_bool_t suspended;
673
674 pa_assert(pd);
675 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
676 pa_assert(t);
677 pa_assert(c);
678 pa_assert(PA_REFCNT_VALUE(c) >= 1);
679
680 pa_context_ref(c);
681
682 if (c->version < 12) {
683 pa_context_fail(c, PA_ERR_PROTOCOL);
684 goto finish;
685 }
686
687 if (pa_tagstruct_getu32(t, &channel) < 0 ||
688 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
689 !pa_tagstruct_eof(t)) {
690 pa_context_fail(c, PA_ERR_PROTOCOL);
691 goto finish;
692 }
693
694 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
695 goto finish;
696
697 if (s->state != PA_STREAM_READY)
698 goto finish;
699
700 s->suspended = suspended;
701
702 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
703 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
704 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
705 request_auto_timing_update(s, TRUE);
706 }
707
708 check_smoother_status(s, TRUE, FALSE, FALSE);
709 request_auto_timing_update(s, TRUE);
710
711 if (s->suspended_callback)
712 s->suspended_callback(s, s->suspended_userdata);
713
714 finish:
715 pa_context_unref(c);
716 }
717
718 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
719 pa_context *c = userdata;
720 pa_stream *s;
721 uint32_t channel;
722
723 pa_assert(pd);
724 pa_assert(command == PA_COMMAND_STARTED);
725 pa_assert(t);
726 pa_assert(c);
727 pa_assert(PA_REFCNT_VALUE(c) >= 1);
728
729 pa_context_ref(c);
730
731 if (c->version < 13) {
732 pa_context_fail(c, PA_ERR_PROTOCOL);
733 goto finish;
734 }
735
736 if (pa_tagstruct_getu32(t, &channel) < 0 ||
737 !pa_tagstruct_eof(t)) {
738 pa_context_fail(c, PA_ERR_PROTOCOL);
739 goto finish;
740 }
741
742 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
743 goto finish;
744
745 if (s->state != PA_STREAM_READY)
746 goto finish;
747
748 check_smoother_status(s, TRUE, TRUE, FALSE);
749 request_auto_timing_update(s, TRUE);
750
751 if (s->started_callback)
752 s->started_callback(s, s->started_userdata);
753
754 finish:
755 pa_context_unref(c);
756 }
757
758 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
759 pa_context *c = userdata;
760 pa_stream *s;
761 uint32_t channel;
762 pa_proplist *pl = NULL;
763 const char *event;
764
765 pa_assert(pd);
766 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
767 pa_assert(t);
768 pa_assert(c);
769 pa_assert(PA_REFCNT_VALUE(c) >= 1);
770
771 pa_context_ref(c);
772
773 if (c->version < 15) {
774 pa_context_fail(c, PA_ERR_PROTOCOL);
775 goto finish;
776 }
777
778 pl = pa_proplist_new();
779
780 if (pa_tagstruct_getu32(t, &channel) < 0 ||
781 pa_tagstruct_gets(t, &event) < 0 ||
782 pa_tagstruct_get_proplist(t, pl) < 0 ||
783 !pa_tagstruct_eof(t) || !event) {
784 pa_context_fail(c, PA_ERR_PROTOCOL);
785 goto finish;
786 }
787
788 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
789 goto finish;
790
791 if (s->state != PA_STREAM_READY)
792 goto finish;
793
794 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
795 /* Let client know what the running time was when the stream had to be killed */
796 pa_usec_t stream_time;
797 if (pa_stream_get_time(s, &stream_time) == 0)
798 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
799 }
800
801 if (s->event_callback)
802 s->event_callback(s, event, pl, s->event_userdata);
803
804 finish:
805 pa_context_unref(c);
806
807 if (pl)
808 pa_proplist_free(pl);
809 }
810
811 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
812 pa_stream *s;
813 pa_context *c = userdata;
814 uint32_t bytes, channel;
815
816 pa_assert(pd);
817 pa_assert(command == PA_COMMAND_REQUEST);
818 pa_assert(t);
819 pa_assert(c);
820 pa_assert(PA_REFCNT_VALUE(c) >= 1);
821
822 pa_context_ref(c);
823
824 if (pa_tagstruct_getu32(t, &channel) < 0 ||
825 pa_tagstruct_getu32(t, &bytes) < 0 ||
826 !pa_tagstruct_eof(t)) {
827 pa_context_fail(c, PA_ERR_PROTOCOL);
828 goto finish;
829 }
830
831 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
832 goto finish;
833
834 if (s->state != PA_STREAM_READY)
835 goto finish;
836
837 s->requested_bytes += bytes;
838
839 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
840
841 if (s->requested_bytes > 0 && s->write_callback)
842 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
843
844 finish:
845 pa_context_unref(c);
846 }
847
848 int64_t pa_stream_get_underflow_index(pa_stream *p)
849 {
850 pa_assert(p);
851 return p->latest_underrun_at_index;
852 }
853
854 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
855 pa_stream *s;
856 pa_context *c = userdata;
857 uint32_t channel;
858 int64_t offset = -1;
859
860 pa_assert(pd);
861 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
862 pa_assert(t);
863 pa_assert(c);
864 pa_assert(PA_REFCNT_VALUE(c) >= 1);
865
866 pa_context_ref(c);
867
868 if (pa_tagstruct_getu32(t, &channel) < 0) {
869 pa_context_fail(c, PA_ERR_PROTOCOL);
870 goto finish;
871 }
872
873 if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
874 if (pa_tagstruct_gets64(t, &offset) < 0) {
875 pa_context_fail(c, PA_ERR_PROTOCOL);
876 goto finish;
877 }
878 }
879
880 if (!pa_tagstruct_eof(t)) {
881 pa_context_fail(c, PA_ERR_PROTOCOL);
882 goto finish;
883 }
884
885 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
886 goto finish;
887
888 if (s->state != PA_STREAM_READY)
889 goto finish;
890
891 if (offset != -1)
892 s->latest_underrun_at_index = offset;
893
894 if (s->buffer_attr.prebuf > 0)
895 check_smoother_status(s, TRUE, FALSE, TRUE);
896
897 request_auto_timing_update(s, TRUE);
898
899 if (command == PA_COMMAND_OVERFLOW) {
900 if (s->overflow_callback)
901 s->overflow_callback(s, s->overflow_userdata);
902 } else if (command == PA_COMMAND_UNDERFLOW) {
903 if (s->underflow_callback)
904 s->underflow_callback(s, s->underflow_userdata);
905 }
906
907 finish:
908 pa_context_unref(c);
909 }
910
911 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
912 pa_assert(s);
913 pa_assert(PA_REFCNT_VALUE(s) >= 1);
914
915 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
916
917 if (s->state != PA_STREAM_READY)
918 return;
919
920 if (w) {
921 s->write_index_not_before = s->context->ctag;
922
923 if (s->timing_info_valid)
924 s->timing_info.write_index_corrupt = TRUE;
925
926 /* pa_log("write_index invalidated"); */
927 }
928
929 if (r) {
930 s->read_index_not_before = s->context->ctag;
931
932 if (s->timing_info_valid)
933 s->timing_info.read_index_corrupt = TRUE;
934
935 /* pa_log("read_index invalidated"); */
936 }
937
938 request_auto_timing_update(s, TRUE);
939 }
940
941 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
942 pa_stream *s = userdata;
943
944 pa_assert(s);
945 pa_assert(PA_REFCNT_VALUE(s) >= 1);
946
947 pa_stream_ref(s);
948 request_auto_timing_update(s, FALSE);
949 pa_stream_unref(s);
950 }
951
952 static void create_stream_complete(pa_stream *s) {
953 pa_assert(s);
954 pa_assert(PA_REFCNT_VALUE(s) >= 1);
955 pa_assert(s->state == PA_STREAM_CREATING);
956
957 pa_stream_set_state(s, PA_STREAM_READY);
958
959 if (s->requested_bytes > 0 && s->write_callback)
960 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
961
962 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
963 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
964 pa_assert(!s->auto_timing_update_event);
965 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
966
967 request_auto_timing_update(s, TRUE);
968 }
969
970 check_smoother_status(s, TRUE, FALSE, FALSE);
971 }
972
973 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
974 const char *e;
975
976 pa_assert(s);
977 pa_assert(attr);
978
979 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
980 uint32_t ms;
981
982 if (pa_atou(e, &ms) < 0 || ms <= 0)
983 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
984 else {
985 attr->maxlength = (uint32_t) -1;
986 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
987 attr->minreq = (uint32_t) -1;
988 attr->prebuf = (uint32_t) -1;
989 attr->fragsize = attr->tlength;
990 }
991
992 if (flags)
993 *flags |= PA_STREAM_ADJUST_LATENCY;
994 }
995
996 if (s->context->version >= 13)
997 return;
998
999 /* Version older than 0.9.10 didn't do server side buffer_attr
1000 * selection, hence we have to fake it on the client side. */
1001
1002 /* We choose fairly conservative values here, to not confuse
1003 * old clients with extremely large playback buffers */
1004
1005 if (attr->maxlength == (uint32_t) -1)
1006 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1007
1008 if (attr->tlength == (uint32_t) -1)
1009 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1010
1011 if (attr->minreq == (uint32_t) -1)
1012 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1013
1014 if (attr->prebuf == (uint32_t) -1)
1015 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1016
1017 if (attr->fragsize == (uint32_t) -1)
1018 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1019 }
1020
1021 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1022 pa_stream *s = userdata;
1023 uint32_t requested_bytes = 0;
1024
1025 pa_assert(pd);
1026 pa_assert(s);
1027 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1028 pa_assert(s->state == PA_STREAM_CREATING);
1029
1030 pa_stream_ref(s);
1031
1032 if (command != PA_COMMAND_REPLY) {
1033 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1034 goto finish;
1035
1036 pa_stream_set_state(s, PA_STREAM_FAILED);
1037 goto finish;
1038 }
1039
1040 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1041 s->channel == PA_INVALID_INDEX ||
1042 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1043 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1044 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1045 goto finish;
1046 }
1047
1048 s->requested_bytes = (int64_t) requested_bytes;
1049
1050 if (s->context->version >= 9) {
1051 if (s->direction == PA_STREAM_PLAYBACK) {
1052 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1053 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1054 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1055 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1056 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1057 goto finish;
1058 }
1059 } else if (s->direction == PA_STREAM_RECORD) {
1060 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1061 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1062 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1063 goto finish;
1064 }
1065 }
1066 }
1067
1068 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1069 pa_sample_spec ss;
1070 pa_channel_map cm;
1071 const char *dn = NULL;
1072 pa_bool_t suspended;
1073
1074 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1075 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1076 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1077 pa_tagstruct_gets(t, &dn) < 0 ||
1078 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1079 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1080 goto finish;
1081 }
1082
1083 if (!dn || s->device_index == PA_INVALID_INDEX ||
1084 ss.channels != cm.channels ||
1085 !pa_channel_map_valid(&cm) ||
1086 !pa_sample_spec_valid(&ss) ||
1087 (s->n_formats == 0 && (
1088 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1089 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1090 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1091 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1092 goto finish;
1093 }
1094
1095 pa_xfree(s->device_name);
1096 s->device_name = pa_xstrdup(dn);
1097 s->suspended = suspended;
1098
1099 s->channel_map = cm;
1100 s->sample_spec = ss;
1101 }
1102
1103 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1104 pa_usec_t usec;
1105
1106 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1107 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1108 goto finish;
1109 }
1110
1111 if (s->direction == PA_STREAM_RECORD)
1112 s->timing_info.configured_source_usec = usec;
1113 else
1114 s->timing_info.configured_sink_usec = usec;
1115 }
1116
1117 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1118 || s->context->version >= 22) {
1119
1120 pa_format_info *f = pa_format_info_new();
1121 pa_tagstruct_get_format_info(t, f);
1122
1123 if (pa_format_info_valid(f))
1124 s->format = f;
1125 else {
1126 pa_format_info_free(f);
1127 if (s->n_formats > 0) {
1128 /* We used the extended API, so we should have got back a proper format */
1129 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1130 goto finish;
1131 }
1132 }
1133 }
1134
1135 if (!pa_tagstruct_eof(t)) {
1136 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1137 goto finish;
1138 }
1139
1140 if (s->direction == PA_STREAM_RECORD) {
1141 pa_assert(!s->record_memblockq);
1142
1143 s->record_memblockq = pa_memblockq_new(
1144 "client side record memblockq",
1145 0,
1146 s->buffer_attr.maxlength,
1147 0,
1148 &s->sample_spec,
1149 1,
1150 0,
1151 0,
1152 NULL);
1153 }
1154
1155 s->channel_valid = TRUE;
1156 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1157
1158 create_stream_complete(s);
1159
1160 finish:
1161 pa_stream_unref(s);
1162 }
1163
1164 static int create_stream(
1165 pa_stream_direction_t direction,
1166 pa_stream *s,
1167 const char *dev,
1168 const pa_buffer_attr *attr,
1169 pa_stream_flags_t flags,
1170 const pa_cvolume *volume,
1171 pa_stream *sync_stream) {
1172
1173 pa_tagstruct *t;
1174 uint32_t tag;
1175 pa_bool_t volume_set = !!volume;
1176 pa_cvolume cv;
1177 uint32_t i;
1178
1179 pa_assert(s);
1180 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1181 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1182
1183 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1184 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1185 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1186 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1187 PA_STREAM_INTERPOLATE_TIMING|
1188 PA_STREAM_NOT_MONOTONIC|
1189 PA_STREAM_AUTO_TIMING_UPDATE|
1190 PA_STREAM_NO_REMAP_CHANNELS|
1191 PA_STREAM_NO_REMIX_CHANNELS|
1192 PA_STREAM_FIX_FORMAT|
1193 PA_STREAM_FIX_RATE|
1194 PA_STREAM_FIX_CHANNELS|
1195 PA_STREAM_DONT_MOVE|
1196 PA_STREAM_VARIABLE_RATE|
1197 PA_STREAM_PEAK_DETECT|
1198 PA_STREAM_START_MUTED|
1199 PA_STREAM_ADJUST_LATENCY|
1200 PA_STREAM_EARLY_REQUESTS|
1201 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1202 PA_STREAM_START_UNMUTED|
1203 PA_STREAM_FAIL_ON_SUSPEND|
1204 PA_STREAM_RELATIVE_VOLUME|
1205 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1206
1207
1208 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1209 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1210 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1211 /* Although some of the other flags are not supported on older
1212 * version, we don't check for them here, because it doesn't hurt
1213 * when they are passed but actually not supported. This makes
1214 * client development easier */
1215
1216 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1217 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1218 PA_CHECK_VALIDITY(s->context, !volume || s->n_formats || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1219 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1220 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1221
1222 pa_stream_ref(s);
1223
1224 s->direction = direction;
1225
1226 if (sync_stream)
1227 s->syncid = sync_stream->syncid;
1228
1229 if (attr)
1230 s->buffer_attr = *attr;
1231 patch_buffer_attr(s, &s->buffer_attr, &flags);
1232
1233 s->flags = flags;
1234 s->corked = !!(flags & PA_STREAM_START_CORKED);
1235
1236 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1237 pa_usec_t x;
1238
1239 x = pa_rtclock_now();
1240
1241 pa_assert(!s->smoother);
1242 s->smoother = pa_smoother_new(
1243 SMOOTHER_ADJUST_TIME,
1244 SMOOTHER_HISTORY_TIME,
1245 !(flags & PA_STREAM_NOT_MONOTONIC),
1246 TRUE,
1247 SMOOTHER_MIN_HISTORY,
1248 x,
1249 TRUE);
1250 }
1251
1252 if (!dev)
1253 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1254
1255 t = pa_tagstruct_command(
1256 s->context,
1257 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1258 &tag);
1259
1260 if (s->context->version < 13)
1261 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1262
1263 pa_tagstruct_put(
1264 t,
1265 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1266 PA_TAG_CHANNEL_MAP, &s->channel_map,
1267 PA_TAG_U32, PA_INVALID_INDEX,
1268 PA_TAG_STRING, dev,
1269 PA_TAG_U32, s->buffer_attr.maxlength,
1270 PA_TAG_BOOLEAN, s->corked,
1271 PA_TAG_INVALID);
1272
1273 if (!volume) {
1274 if (pa_sample_spec_valid(&s->sample_spec))
1275 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1276 else {
1277 /* This is not really relevant, since no volume was set, and
1278 * the real number of channels is embedded in the format_info
1279 * structure */
1280 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1281 }
1282 }
1283
1284 if (s->direction == PA_STREAM_PLAYBACK) {
1285 pa_tagstruct_put(
1286 t,
1287 PA_TAG_U32, s->buffer_attr.tlength,
1288 PA_TAG_U32, s->buffer_attr.prebuf,
1289 PA_TAG_U32, s->buffer_attr.minreq,
1290 PA_TAG_U32, s->syncid,
1291 PA_TAG_INVALID);
1292
1293 pa_tagstruct_put_cvolume(t, volume);
1294 } else
1295 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1296
1297 if (s->context->version >= 12) {
1298 pa_tagstruct_put(
1299 t,
1300 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1301 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1302 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1303 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1304 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1305 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1306 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1307 PA_TAG_INVALID);
1308 }
1309
1310 if (s->context->version >= 13) {
1311
1312 if (s->direction == PA_STREAM_PLAYBACK)
1313 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1314 else
1315 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1316
1317 pa_tagstruct_put(
1318 t,
1319 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1320 PA_TAG_PROPLIST, s->proplist,
1321 PA_TAG_INVALID);
1322
1323 if (s->direction == PA_STREAM_RECORD)
1324 pa_tagstruct_putu32(t, s->direct_on_input);
1325 }
1326
1327 if (s->context->version >= 14) {
1328
1329 if (s->direction == PA_STREAM_PLAYBACK)
1330 pa_tagstruct_put_boolean(t, volume_set);
1331
1332 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1333 }
1334
1335 if (s->context->version >= 15) {
1336
1337 if (s->direction == PA_STREAM_PLAYBACK)
1338 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1339
1340 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1341 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1342 }
1343
1344 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1345 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1346
1347 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1348 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1349
1350 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1351 || s->context->version >= 22) {
1352
1353 pa_tagstruct_putu8(t, s->n_formats);
1354 for (i = 0; i < s->n_formats; i++)
1355 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1356 }
1357
1358 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1359 pa_tagstruct_put_cvolume(t, volume);
1360 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1361 pa_tagstruct_put_boolean(t, volume_set);
1362 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1363 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1364 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1365 }
1366
1367 pa_pstream_send_tagstruct(s->context->pstream, t);
1368 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1369
1370 pa_stream_set_state(s, PA_STREAM_CREATING);
1371
1372 pa_stream_unref(s);
1373 return 0;
1374 }
1375
1376 int pa_stream_connect_playback(
1377 pa_stream *s,
1378 const char *dev,
1379 const pa_buffer_attr *attr,
1380 pa_stream_flags_t flags,
1381 const pa_cvolume *volume,
1382 pa_stream *sync_stream) {
1383
1384 pa_assert(s);
1385 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1386
1387 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1388 }
1389
1390 int pa_stream_connect_record(
1391 pa_stream *s,
1392 const char *dev,
1393 const pa_buffer_attr *attr,
1394 pa_stream_flags_t flags) {
1395
1396 pa_assert(s);
1397 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1398
1399 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1400 }
1401
1402 int pa_stream_begin_write(
1403 pa_stream *s,
1404 void **data,
1405 size_t *nbytes) {
1406
1407 pa_assert(s);
1408 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1409
1410 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1411 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1412 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1413 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1414 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1415
1416 if (*nbytes != (size_t) -1) {
1417 size_t m, fs;
1418
1419 m = pa_mempool_block_size_max(s->context->mempool);
1420 fs = pa_frame_size(&s->sample_spec);
1421
1422 m = (m / fs) * fs;
1423 if (*nbytes > m)
1424 *nbytes = m;
1425 }
1426
1427 if (!s->write_memblock) {
1428 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1429 s->write_data = pa_memblock_acquire(s->write_memblock);
1430 }
1431
1432 *data = s->write_data;
1433 *nbytes = pa_memblock_get_length(s->write_memblock);
1434
1435 return 0;
1436 }
1437
1438 int pa_stream_cancel_write(
1439 pa_stream *s) {
1440
1441 pa_assert(s);
1442 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1443
1444 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1445 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1446 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1447 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1448
1449 pa_assert(s->write_data);
1450
1451 pa_memblock_release(s->write_memblock);
1452 pa_memblock_unref(s->write_memblock);
1453 s->write_memblock = NULL;
1454 s->write_data = NULL;
1455
1456 return 0;
1457 }
1458
1459 int pa_stream_write(
1460 pa_stream *s,
1461 const void *data,
1462 size_t length,
1463 pa_free_cb_t free_cb,
1464 int64_t offset,
1465 pa_seek_mode_t seek) {
1466
1467 pa_assert(s);
1468 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1469 pa_assert(data);
1470
1471 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1472 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1473 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1474 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1475 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1476 PA_CHECK_VALIDITY(s->context,
1477 !s->write_memblock ||
1478 ((data >= s->write_data) &&
1479 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1480 PA_ERR_INVALID);
1481 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1482
1483 if (s->write_memblock) {
1484 pa_memchunk chunk;
1485
1486 /* pa_stream_write_begin() was called before */
1487
1488 pa_memblock_release(s->write_memblock);
1489
1490 chunk.memblock = s->write_memblock;
1491 chunk.index = (const char *) data - (const char *) s->write_data;
1492 chunk.length = length;
1493
1494 s->write_memblock = NULL;
1495 s->write_data = NULL;
1496
1497 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1498 pa_memblock_unref(chunk.memblock);
1499
1500 } else {
1501 pa_seek_mode_t t_seek = seek;
1502 int64_t t_offset = offset;
1503 size_t t_length = length;
1504 const void *t_data = data;
1505
1506 /* pa_stream_write_begin() was not called before */
1507
1508 while (t_length > 0) {
1509 pa_memchunk chunk;
1510
1511 chunk.index = 0;
1512
1513 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1514 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1515 chunk.length = t_length;
1516 } else {
1517 void *d;
1518
1519 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1520 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1521
1522 d = pa_memblock_acquire(chunk.memblock);
1523 memcpy(d, t_data, chunk.length);
1524 pa_memblock_release(chunk.memblock);
1525 }
1526
1527 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1528
1529 t_offset = 0;
1530 t_seek = PA_SEEK_RELATIVE;
1531
1532 t_data = (const uint8_t*) t_data + chunk.length;
1533 t_length -= chunk.length;
1534
1535 pa_memblock_unref(chunk.memblock);
1536 }
1537
1538 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1539 free_cb((void*) data);
1540 }
1541
1542 /* This is obviously wrong since we ignore the seeking index . But
1543 * that's OK, the server side applies the same error */
1544 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1545
1546 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1547
1548 if (s->direction == PA_STREAM_PLAYBACK) {
1549
1550 /* Update latency request correction */
1551 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1552
1553 if (seek == PA_SEEK_ABSOLUTE) {
1554 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1555 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1556 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1557 } else if (seek == PA_SEEK_RELATIVE) {
1558 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1559 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1560 } else
1561 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1562 }
1563
1564 /* Update the write index in the already available latency data */
1565 if (s->timing_info_valid) {
1566
1567 if (seek == PA_SEEK_ABSOLUTE) {
1568 s->timing_info.write_index_corrupt = FALSE;
1569 s->timing_info.write_index = offset + (int64_t) length;
1570 } else if (seek == PA_SEEK_RELATIVE) {
1571 if (!s->timing_info.write_index_corrupt)
1572 s->timing_info.write_index += offset + (int64_t) length;
1573 } else
1574 s->timing_info.write_index_corrupt = TRUE;
1575 }
1576
1577 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1578 request_auto_timing_update(s, TRUE);
1579 }
1580
1581 return 0;
1582 }
1583
1584 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1585 pa_assert(s);
1586 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1587 pa_assert(data);
1588 pa_assert(length);
1589
1590 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1591 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1592 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1593
1594 if (!s->peek_memchunk.memblock) {
1595
1596 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1597 *data = NULL;
1598 *length = 0;
1599 return 0;
1600 }
1601
1602 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1603 }
1604
1605 pa_assert(s->peek_data);
1606 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1607 *length = s->peek_memchunk.length;
1608 return 0;
1609 }
1610
1611 int pa_stream_drop(pa_stream *s) {
1612 pa_assert(s);
1613 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1614
1615 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1616 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1617 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1618 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1619
1620 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1621
1622 /* Fix the simulated local read index */
1623 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1624 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1625
1626 pa_assert(s->peek_data);
1627 pa_memblock_release(s->peek_memchunk.memblock);
1628 pa_memblock_unref(s->peek_memchunk.memblock);
1629 pa_memchunk_reset(&s->peek_memchunk);
1630
1631 return 0;
1632 }
1633
1634 size_t pa_stream_writable_size(pa_stream *s) {
1635 pa_assert(s);
1636 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1637
1638 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1639 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1640 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1641
1642 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1643 }
1644
1645 size_t pa_stream_readable_size(pa_stream *s) {
1646 pa_assert(s);
1647 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1648
1649 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1650 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1651 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1652
1653 return pa_memblockq_get_length(s->record_memblockq);
1654 }
1655
1656 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1657 pa_operation *o;
1658 pa_tagstruct *t;
1659 uint32_t tag;
1660
1661 pa_assert(s);
1662 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1663
1664 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1665 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1666 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1667
1668 /* Ask for a timing update before we cork/uncork to get the best
1669 * accuracy for the transport latency suitable for the
1670 * check_smoother_status() call in the started callback */
1671 request_auto_timing_update(s, TRUE);
1672
1673 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1674
1675 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1676 pa_tagstruct_putu32(t, s->channel);
1677 pa_pstream_send_tagstruct(s->context->pstream, t);
1678 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1679
1680 /* This might cause the read index to continue again, hence
1681 * let's request a timing update */
1682 request_auto_timing_update(s, TRUE);
1683
1684 return o;
1685 }
1686
1687 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1688 pa_usec_t usec;
1689
1690 pa_assert(s);
1691 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1692 pa_assert(s->state == PA_STREAM_READY);
1693 pa_assert(s->direction != PA_STREAM_UPLOAD);
1694 pa_assert(s->timing_info_valid);
1695 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1696 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1697
1698 if (s->direction == PA_STREAM_PLAYBACK) {
1699 /* The last byte that was written into the output device
1700 * had this time value associated */
1701 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1702
1703 if (!s->corked && !s->suspended) {
1704
1705 if (!ignore_transport)
1706 /* Because the latency info took a little time to come
1707 * to us, we assume that the real output time is actually
1708 * a little ahead */
1709 usec += s->timing_info.transport_usec;
1710
1711 /* However, the output device usually maintains a buffer
1712 too, hence the real sample currently played is a little
1713 back */
1714 if (s->timing_info.sink_usec >= usec)
1715 usec = 0;
1716 else
1717 usec -= s->timing_info.sink_usec;
1718 }
1719
1720 } else {
1721 pa_assert(s->direction == PA_STREAM_RECORD);
1722
1723 /* The last byte written into the server side queue had
1724 * this time value associated */
1725 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1726
1727 if (!s->corked && !s->suspended) {
1728
1729 if (!ignore_transport)
1730 /* Add transport latency */
1731 usec += s->timing_info.transport_usec;
1732
1733 /* Add latency of data in device buffer */
1734 usec += s->timing_info.source_usec;
1735
1736 /* If this is a monitor source, we need to correct the
1737 * time by the playback device buffer */
1738 if (s->timing_info.sink_usec >= usec)
1739 usec = 0;
1740 else
1741 usec -= s->timing_info.sink_usec;
1742 }
1743 }
1744
1745 return usec;
1746 }
1747
1748 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1749 pa_operation *o = userdata;
1750 struct timeval local, remote, now;
1751 pa_timing_info *i;
1752 pa_bool_t playing = FALSE;
1753 uint64_t underrun_for = 0, playing_for = 0;
1754
1755 pa_assert(pd);
1756 pa_assert(o);
1757 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1758
1759 if (!o->context || !o->stream)
1760 goto finish;
1761
1762 i = &o->stream->timing_info;
1763
1764 o->stream->timing_info_valid = FALSE;
1765 i->write_index_corrupt = TRUE;
1766 i->read_index_corrupt = TRUE;
1767
1768 if (command != PA_COMMAND_REPLY) {
1769 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1770 goto finish;
1771
1772 } else {
1773
1774 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1775 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1776 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1777 pa_tagstruct_get_timeval(t, &local) < 0 ||
1778 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1779 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1780 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1781
1782 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1783 goto finish;
1784 }
1785
1786 if (o->context->version >= 13 &&
1787 o->stream->direction == PA_STREAM_PLAYBACK)
1788 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1789 pa_tagstruct_getu64(t, &playing_for) < 0) {
1790
1791 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1792 goto finish;
1793 }
1794
1795
1796 if (!pa_tagstruct_eof(t)) {
1797 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1798 goto finish;
1799 }
1800 o->stream->timing_info_valid = TRUE;
1801 i->write_index_corrupt = FALSE;
1802 i->read_index_corrupt = FALSE;
1803
1804 i->playing = (int) playing;
1805 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1806
1807 pa_gettimeofday(&now);
1808
1809 /* Calculate timestamps */
1810 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1811 /* local and remote seem to have synchronized clocks */
1812
1813 if (o->stream->direction == PA_STREAM_PLAYBACK)
1814 i->transport_usec = pa_timeval_diff(&remote, &local);
1815 else
1816 i->transport_usec = pa_timeval_diff(&now, &remote);
1817
1818 i->synchronized_clocks = TRUE;
1819 i->timestamp = remote;
1820 } else {
1821 /* clocks are not synchronized, let's estimate latency then */
1822 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1823 i->synchronized_clocks = FALSE;
1824 i->timestamp = local;
1825 pa_timeval_add(&i->timestamp, i->transport_usec);
1826 }
1827
1828 /* Invalidate read and write indexes if necessary */
1829 if (tag < o->stream->read_index_not_before)
1830 i->read_index_corrupt = TRUE;
1831
1832 if (tag < o->stream->write_index_not_before)
1833 i->write_index_corrupt = TRUE;
1834
1835 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1836 /* Write index correction */
1837
1838 int n, j;
1839 uint32_t ctag = tag;
1840
1841 /* Go through the saved correction values and add up the
1842 * total correction.*/
1843 for (n = 0, j = o->stream->current_write_index_correction+1;
1844 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1845 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1846
1847 /* Step over invalid data or out-of-date data */
1848 if (!o->stream->write_index_corrections[j].valid ||
1849 o->stream->write_index_corrections[j].tag < ctag)
1850 continue;
1851
1852 /* Make sure that everything is in order */
1853 ctag = o->stream->write_index_corrections[j].tag+1;
1854
1855 /* Now fix the write index */
1856 if (o->stream->write_index_corrections[j].corrupt) {
1857 /* A corrupting seek was made */
1858 i->write_index_corrupt = TRUE;
1859 } else if (o->stream->write_index_corrections[j].absolute) {
1860 /* An absolute seek was made */
1861 i->write_index = o->stream->write_index_corrections[j].value;
1862 i->write_index_corrupt = FALSE;
1863 } else if (!i->write_index_corrupt) {
1864 /* A relative seek was made */
1865 i->write_index += o->stream->write_index_corrections[j].value;
1866 }
1867 }
1868
1869 /* Clear old correction entries */
1870 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1871 if (!o->stream->write_index_corrections[n].valid)
1872 continue;
1873
1874 if (o->stream->write_index_corrections[n].tag <= tag)
1875 o->stream->write_index_corrections[n].valid = FALSE;
1876 }
1877 }
1878
1879 if (o->stream->direction == PA_STREAM_RECORD) {
1880 /* Read index correction */
1881
1882 if (!i->read_index_corrupt)
1883 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1884 }
1885
1886 /* Update smoother if we're not corked */
1887 if (o->stream->smoother && !o->stream->corked) {
1888 pa_usec_t u, x;
1889
1890 u = x = pa_rtclock_now() - i->transport_usec;
1891
1892 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1893 pa_usec_t su;
1894
1895 /* If we weren't playing then it will take some time
1896 * until the audio will actually come out through the
1897 * speakers. Since we follow that timing here, we need
1898 * to try to fix this up */
1899
1900 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1901
1902 if (su < i->sink_usec)
1903 x += i->sink_usec - su;
1904 }
1905
1906 if (!i->playing)
1907 pa_smoother_pause(o->stream->smoother, x);
1908
1909 /* Update the smoother */
1910 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1911 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1912 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1913
1914 if (i->playing)
1915 pa_smoother_resume(o->stream->smoother, x, TRUE);
1916 }
1917 }
1918
1919 o->stream->auto_timing_update_requested = FALSE;
1920
1921 if (o->stream->latency_update_callback)
1922 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1923
1924 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1925 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1926 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1927 }
1928
1929 finish:
1930
1931 pa_operation_done(o);
1932 pa_operation_unref(o);
1933 }
1934
1935 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1936 uint32_t tag;
1937 pa_operation *o;
1938 pa_tagstruct *t;
1939 struct timeval now;
1940 int cidx = 0;
1941
1942 pa_assert(s);
1943 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1944
1945 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1946 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1947 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1948
1949 if (s->direction == PA_STREAM_PLAYBACK) {
1950 /* Find a place to store the write_index correction data for this entry */
1951 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1952
1953 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1954 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1955 }
1956 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1957
1958 t = pa_tagstruct_command(
1959 s->context,
1960 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1961 &tag);
1962 pa_tagstruct_putu32(t, s->channel);
1963 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1964
1965 pa_pstream_send_tagstruct(s->context->pstream, t);
1966 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1967
1968 if (s->direction == PA_STREAM_PLAYBACK) {
1969 /* Fill in initial correction data */
1970
1971 s->current_write_index_correction = cidx;
1972
1973 s->write_index_corrections[cidx].valid = TRUE;
1974 s->write_index_corrections[cidx].absolute = FALSE;
1975 s->write_index_corrections[cidx].corrupt = FALSE;
1976 s->write_index_corrections[cidx].tag = tag;
1977 s->write_index_corrections[cidx].value = 0;
1978 }
1979
1980 return o;
1981 }
1982
1983 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1984 pa_stream *s = userdata;
1985
1986 pa_assert(pd);
1987 pa_assert(s);
1988 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1989
1990 pa_stream_ref(s);
1991
1992 if (command != PA_COMMAND_REPLY) {
1993 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1994 goto finish;
1995
1996 pa_stream_set_state(s, PA_STREAM_FAILED);
1997 goto finish;
1998 } else if (!pa_tagstruct_eof(t)) {
1999 pa_context_fail(s->context, PA_ERR_PROTOCOL);
2000 goto finish;
2001 }
2002
2003 pa_stream_set_state(s, PA_STREAM_TERMINATED);
2004
2005 finish:
2006 pa_stream_unref(s);
2007 }
2008
2009 int pa_stream_disconnect(pa_stream *s) {
2010 pa_tagstruct *t;
2011 uint32_t tag;
2012
2013 pa_assert(s);
2014 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2015
2016 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2017 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2018 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2019
2020 pa_stream_ref(s);
2021
2022 t = pa_tagstruct_command(
2023 s->context,
2024 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2025 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2026 &tag);
2027 pa_tagstruct_putu32(t, s->channel);
2028 pa_pstream_send_tagstruct(s->context->pstream, t);
2029 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2030
2031 pa_stream_unref(s);
2032 return 0;
2033 }
2034
2035 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2036 pa_assert(s);
2037 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2038
2039 if (pa_detect_fork())
2040 return;
2041
2042 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2043 return;
2044
2045 s->read_callback = cb;
2046 s->read_userdata = userdata;
2047 }
2048
2049 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2050 pa_assert(s);
2051 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2052
2053 if (pa_detect_fork())
2054 return;
2055
2056 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2057 return;
2058
2059 s->write_callback = cb;
2060 s->write_userdata = userdata;
2061 }
2062
2063 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2064 pa_assert(s);
2065 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2066
2067 if (pa_detect_fork())
2068 return;
2069
2070 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2071 return;
2072
2073 s->state_callback = cb;
2074 s->state_userdata = userdata;
2075 }
2076
2077 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2078 pa_assert(s);
2079 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2080
2081 if (pa_detect_fork())
2082 return;
2083
2084 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2085 return;
2086
2087 s->overflow_callback = cb;
2088 s->overflow_userdata = userdata;
2089 }
2090
2091 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2092 pa_assert(s);
2093 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2094
2095 if (pa_detect_fork())
2096 return;
2097
2098 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2099 return;
2100
2101 s->underflow_callback = cb;
2102 s->underflow_userdata = userdata;
2103 }
2104
2105 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2106 pa_assert(s);
2107 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2108
2109 if (pa_detect_fork())
2110 return;
2111
2112 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2113 return;
2114
2115 s->latency_update_callback = cb;
2116 s->latency_update_userdata = userdata;
2117 }
2118
2119 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2120 pa_assert(s);
2121 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2122
2123 if (pa_detect_fork())
2124 return;
2125
2126 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2127 return;
2128
2129 s->moved_callback = cb;
2130 s->moved_userdata = userdata;
2131 }
2132
2133 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2134 pa_assert(s);
2135 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2136
2137 if (pa_detect_fork())
2138 return;
2139
2140 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2141 return;
2142
2143 s->suspended_callback = cb;
2144 s->suspended_userdata = userdata;
2145 }
2146
2147 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2148 pa_assert(s);
2149 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2150
2151 if (pa_detect_fork())
2152 return;
2153
2154 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2155 return;
2156
2157 s->started_callback = cb;
2158 s->started_userdata = userdata;
2159 }
2160
2161 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2162 pa_assert(s);
2163 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2164
2165 if (pa_detect_fork())
2166 return;
2167
2168 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2169 return;
2170
2171 s->event_callback = cb;
2172 s->event_userdata = userdata;
2173 }
2174
2175 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2176 pa_assert(s);
2177 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2178
2179 if (pa_detect_fork())
2180 return;
2181
2182 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2183 return;
2184
2185 s->buffer_attr_callback = cb;
2186 s->buffer_attr_userdata = userdata;
2187 }
2188
2189 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2190 pa_operation *o = userdata;
2191 int success = 1;
2192
2193 pa_assert(pd);
2194 pa_assert(o);
2195 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2196
2197 if (!o->context)
2198 goto finish;
2199
2200 if (command != PA_COMMAND_REPLY) {
2201 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2202 goto finish;
2203
2204 success = 0;
2205 } else if (!pa_tagstruct_eof(t)) {
2206 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2207 goto finish;
2208 }
2209
2210 if (o->callback) {
2211 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2212 cb(o->stream, success, o->userdata);
2213 }
2214
2215 finish:
2216 pa_operation_done(o);
2217 pa_operation_unref(o);
2218 }
2219
2220 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2221 pa_operation *o;
2222 pa_tagstruct *t;
2223 uint32_t tag;
2224
2225 pa_assert(s);
2226 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2227
2228 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2229 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2230 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2231
2232 /* Ask for a timing update before we cork/uncork to get the best
2233 * accuracy for the transport latency suitable for the
2234 * check_smoother_status() call in the started callback */
2235 request_auto_timing_update(s, TRUE);
2236
2237 s->corked = b;
2238
2239 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2240
2241 t = pa_tagstruct_command(
2242 s->context,
2243 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2244 &tag);
2245 pa_tagstruct_putu32(t, s->channel);
2246 pa_tagstruct_put_boolean(t, !!b);
2247 pa_pstream_send_tagstruct(s->context->pstream, t);
2248 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2249
2250 check_smoother_status(s, FALSE, FALSE, FALSE);
2251
2252 /* This might cause the indexes to hang/start again, hence let's
2253 * request a timing update, after the cork/uncork, too */
2254 request_auto_timing_update(s, TRUE);
2255
2256 return o;
2257 }
2258
2259 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2260 pa_tagstruct *t;
2261 pa_operation *o;
2262 uint32_t tag;
2263
2264 pa_assert(s);
2265 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2266
2267 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2268 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2269
2270 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2271
2272 t = pa_tagstruct_command(s->context, command, &tag);
2273 pa_tagstruct_putu32(t, s->channel);
2274 pa_pstream_send_tagstruct(s->context->pstream, t);
2275 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2276
2277 return o;
2278 }
2279
2280 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2281 pa_operation *o;
2282
2283 pa_assert(s);
2284 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2285
2286 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2287 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2288 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2289
2290 /* Ask for a timing update *before* the flush, so that the
2291 * transport usec is as up to date as possible when we get the
2292 * underflow message and update the smoother status*/
2293 request_auto_timing_update(s, TRUE);
2294
2295 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2296 return NULL;
2297
2298 if (s->direction == PA_STREAM_PLAYBACK) {
2299
2300 if (s->write_index_corrections[s->current_write_index_correction].valid)
2301 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2302
2303 if (s->buffer_attr.prebuf > 0)
2304 check_smoother_status(s, FALSE, FALSE, TRUE);
2305
2306 /* This will change the write index, but leave the
2307 * read index untouched. */
2308 invalidate_indexes(s, FALSE, TRUE);
2309
2310 } else
2311 /* For record streams this has no influence on the write
2312 * index, but the read index might jump. */
2313 invalidate_indexes(s, TRUE, FALSE);
2314
2315 /* Note that we do not update requested_bytes here. This is
2316 * because we cannot really know how data actually was dropped
2317 * from the write index due to this. This 'error' will be applied
2318 * by both client and server and hence we should be fine. */
2319
2320 return o;
2321 }
2322
2323 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2324 pa_operation *o;
2325
2326 pa_assert(s);
2327 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2328
2329 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2330 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2331 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2332 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2333
2334 /* Ask for a timing update before we cork/uncork to get the best
2335 * accuracy for the transport latency suitable for the
2336 * check_smoother_status() call in the started callback */
2337 request_auto_timing_update(s, TRUE);
2338
2339 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2340 return NULL;
2341
2342 /* This might cause the read index to hang again, hence
2343 * let's request a timing update */
2344 request_auto_timing_update(s, TRUE);
2345
2346 return o;
2347 }
2348
2349 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2350 pa_operation *o;
2351
2352 pa_assert(s);
2353 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2354
2355 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2356 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2357 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2358 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2359
2360 /* Ask for a timing update before we cork/uncork to get the best
2361 * accuracy for the transport latency suitable for the
2362 * check_smoother_status() call in the started callback */
2363 request_auto_timing_update(s, TRUE);
2364
2365 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2366 return NULL;
2367
2368 /* This might cause the read index to start moving again, hence
2369 * let's request a timing update */
2370 request_auto_timing_update(s, TRUE);
2371
2372 return o;
2373 }
2374
2375 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2376 pa_operation *o;
2377
2378 pa_assert(s);
2379 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2380 pa_assert(name);
2381
2382 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2383 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2384 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2385
2386 if (s->context->version >= 13) {
2387 pa_proplist *p = pa_proplist_new();
2388
2389 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2390 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2391 pa_proplist_free(p);
2392 } else {
2393 pa_tagstruct *t;
2394 uint32_t tag;
2395
2396 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2397 t = pa_tagstruct_command(
2398 s->context,
2399 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2400 &tag);
2401 pa_tagstruct_putu32(t, s->channel);
2402 pa_tagstruct_puts(t, name);
2403 pa_pstream_send_tagstruct(s->context->pstream, t);
2404 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2405 }
2406
2407 return o;
2408 }
2409
2410 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2411 pa_usec_t usec;
2412
2413 pa_assert(s);
2414 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2415
2416 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2417 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2418 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2419 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2420 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2421 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2422
2423 if (s->smoother)
2424 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2425 else
2426 usec = calc_time(s, FALSE);
2427
2428 /* Make sure the time runs monotonically */
2429 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2430 if (usec < s->previous_time)
2431 usec = s->previous_time;
2432 else
2433 s->previous_time = usec;
2434 }
2435
2436 if (r_usec)
2437 *r_usec = usec;
2438
2439 return 0;
2440 }
2441
2442 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2443 pa_assert(s);
2444 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2445
2446 if (negative)
2447 *negative = 0;
2448
2449 if (a >= b)
2450 return a-b;
2451 else {
2452 if (negative && s->direction == PA_STREAM_RECORD) {
2453 *negative = 1;
2454 return b-a;
2455 } else
2456 return 0;
2457 }
2458 }
2459
2460 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2461 pa_usec_t t, c;
2462 int r;
2463 int64_t cindex;
2464
2465 pa_assert(s);
2466 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2467 pa_assert(r_usec);
2468
2469 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2470 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2471 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2472 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2473 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2474 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2475
2476 if ((r = pa_stream_get_time(s, &t)) < 0)
2477 return r;
2478
2479 if (s->direction == PA_STREAM_PLAYBACK)
2480 cindex = s->timing_info.write_index;
2481 else
2482 cindex = s->timing_info.read_index;
2483
2484 if (cindex < 0)
2485 cindex = 0;
2486
2487 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2488
2489 if (s->direction == PA_STREAM_PLAYBACK)
2490 *r_usec = time_counter_diff(s, c, t, negative);
2491 else
2492 *r_usec = time_counter_diff(s, t, c, negative);
2493
2494 return 0;
2495 }
2496
2497 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2498 pa_assert(s);
2499 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2500
2501 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2502 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2503 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2504 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2505
2506 return &s->timing_info;
2507 }
2508
2509 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2510 pa_assert(s);
2511 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2512
2513 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2514
2515 return &s->sample_spec;
2516 }
2517
2518 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2519 pa_assert(s);
2520 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2521
2522 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2523
2524 return &s->channel_map;
2525 }
2526
2527 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2528 pa_assert(s);
2529 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2530
2531 /* We don't have the format till routing is done */
2532 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2533 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2534
2535 return s->format;
2536 }
2537 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2538 pa_assert(s);
2539 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2540
2541 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2542 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2543 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2544
2545 return &s->buffer_attr;
2546 }
2547
2548 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2549 pa_operation *o = userdata;
2550 int success = 1;
2551
2552 pa_assert(pd);
2553 pa_assert(o);
2554 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2555
2556 if (!o->context)
2557 goto finish;
2558
2559 if (command != PA_COMMAND_REPLY) {
2560 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2561 goto finish;
2562
2563 success = 0;
2564 } else {
2565 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2566 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2567 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2568 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2569 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2570 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2571 goto finish;
2572 }
2573 } else if (o->stream->direction == PA_STREAM_RECORD) {
2574 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2575 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2576 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2577 goto finish;
2578 }
2579 }
2580
2581 if (o->stream->context->version >= 13) {
2582 pa_usec_t usec;
2583
2584 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2585 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2586 goto finish;
2587 }
2588
2589 if (o->stream->direction == PA_STREAM_RECORD)
2590 o->stream->timing_info.configured_source_usec = usec;
2591 else
2592 o->stream->timing_info.configured_sink_usec = usec;
2593 }
2594
2595 if (!pa_tagstruct_eof(t)) {
2596 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2597 goto finish;
2598 }
2599 }
2600
2601 if (o->callback) {
2602 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2603 cb(o->stream, success, o->userdata);
2604 }
2605
2606 finish:
2607 pa_operation_done(o);
2608 pa_operation_unref(o);
2609 }
2610
2611
2612 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2613 pa_operation *o;
2614 pa_tagstruct *t;
2615 uint32_t tag;
2616 pa_buffer_attr copy;
2617
2618 pa_assert(s);
2619 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2620 pa_assert(attr);
2621
2622 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2623 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2624 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2625 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2626
2627 /* Ask for a timing update before we cork/uncork to get the best
2628 * accuracy for the transport latency suitable for the
2629 * check_smoother_status() call in the started callback */
2630 request_auto_timing_update(s, TRUE);
2631
2632 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2633
2634 t = pa_tagstruct_command(
2635 s->context,
2636 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2637 &tag);
2638 pa_tagstruct_putu32(t, s->channel);
2639
2640 copy = *attr;
2641 patch_buffer_attr(s, &copy, NULL);
2642 attr = &copy;
2643
2644 pa_tagstruct_putu32(t, attr->maxlength);
2645
2646 if (s->direction == PA_STREAM_PLAYBACK)
2647 pa_tagstruct_put(
2648 t,
2649 PA_TAG_U32, attr->tlength,
2650 PA_TAG_U32, attr->prebuf,
2651 PA_TAG_U32, attr->minreq,
2652 PA_TAG_INVALID);
2653 else
2654 pa_tagstruct_putu32(t, attr->fragsize);
2655
2656 if (s->context->version >= 13)
2657 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2658
2659 if (s->context->version >= 14)
2660 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2661
2662 pa_pstream_send_tagstruct(s->context->pstream, t);
2663 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2664
2665 /* This might cause changes in the read/write index, hence let's
2666 * request a timing update */
2667 request_auto_timing_update(s, TRUE);
2668
2669 return o;
2670 }
2671
2672 uint32_t pa_stream_get_device_index(pa_stream *s) {
2673 pa_assert(s);
2674 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2675
2676 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2677 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2678 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2679 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2680 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2681
2682 return s->device_index;
2683 }
2684
2685 const char *pa_stream_get_device_name(pa_stream *s) {
2686 pa_assert(s);
2687 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2688
2689 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2690 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2691 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2692 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2693 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2694
2695 return s->device_name;
2696 }
2697
2698 int pa_stream_is_suspended(pa_stream *s) {
2699 pa_assert(s);
2700 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2701
2702 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2703 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2704 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2705 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2706
2707 return s->suspended;
2708 }
2709
2710 int pa_stream_is_corked(pa_stream *s) {
2711 pa_assert(s);
2712 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2713
2714 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2715 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2716 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2717
2718 return s->corked;
2719 }
2720
2721 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2722 pa_operation *o = userdata;
2723 int success = 1;
2724
2725 pa_assert(pd);
2726 pa_assert(o);
2727 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2728
2729 if (!o->context)
2730 goto finish;
2731
2732 if (command != PA_COMMAND_REPLY) {
2733 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2734 goto finish;
2735
2736 success = 0;
2737 } else {
2738
2739 if (!pa_tagstruct_eof(t)) {
2740 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2741 goto finish;
2742 }
2743 }
2744
2745 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2746 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2747
2748 if (o->callback) {
2749 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2750 cb(o->stream, success, o->userdata);
2751 }
2752
2753 finish:
2754 pa_operation_done(o);
2755 pa_operation_unref(o);
2756 }
2757
2758
2759 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2760 pa_operation *o;
2761 pa_tagstruct *t;
2762 uint32_t tag;
2763
2764 pa_assert(s);
2765 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2766
2767 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2768 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2769 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2770 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2771 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2772 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2773
2774 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2775 o->private = PA_UINT_TO_PTR(rate);
2776
2777 t = pa_tagstruct_command(
2778 s->context,
2779 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2780 &tag);
2781 pa_tagstruct_putu32(t, s->channel);
2782 pa_tagstruct_putu32(t, rate);
2783
2784 pa_pstream_send_tagstruct(s->context->pstream, t);
2785 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2786
2787 return o;
2788 }
2789
2790 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2791 pa_operation *o;
2792 pa_tagstruct *t;
2793 uint32_t tag;
2794
2795 pa_assert(s);
2796 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2797
2798 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2799 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2800 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2801 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2802 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2803
2804 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2805
2806 t = pa_tagstruct_command(
2807 s->context,
2808 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2809 &tag);
2810 pa_tagstruct_putu32(t, s->channel);
2811 pa_tagstruct_putu32(t, (uint32_t) mode);
2812 pa_tagstruct_put_proplist(t, p);
2813
2814 pa_pstream_send_tagstruct(s->context->pstream, t);
2815 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2816
2817 /* Please note that we don't update s->proplist here, because we
2818 * don't export that field */
2819
2820 return o;
2821 }
2822
2823 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2824 pa_operation *o;
2825 pa_tagstruct *t;
2826 uint32_t tag;
2827 const char * const*k;
2828
2829 pa_assert(s);
2830 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2831
2832 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2833 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2834 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2835 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2836 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2837
2838 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2839
2840 t = pa_tagstruct_command(
2841 s->context,
2842 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2843 &tag);
2844 pa_tagstruct_putu32(t, s->channel);
2845
2846 for (k = keys; *k; k++)
2847 pa_tagstruct_puts(t, *k);
2848
2849 pa_tagstruct_puts(t, NULL);
2850
2851 pa_pstream_send_tagstruct(s->context->pstream, t);
2852 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2853
2854 /* Please note that we don't update s->proplist here, because we
2855 * don't export that field */
2856
2857 return o;
2858 }
2859
2860 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2861 pa_assert(s);
2862 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2863
2864 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2865 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2866 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2867 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2868
2869 s->direct_on_input = sink_input_idx;
2870
2871 return 0;
2872 }
2873
2874 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2875 pa_assert(s);
2876 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2877
2878 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2879 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2880 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2881
2882 return s->direct_on_input;
2883 }