]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
idxset: Use pa_free_cb_t instead of pa_free2_cb_t
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84 pa_context *c,
85 const char *name,
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
88 pa_format_info * const *formats,
89 unsigned int n_formats,
90 pa_proplist *p) {
91
92 pa_stream *s;
93 unsigned int i;
94
95 pa_assert(c);
96 pa_assert(PA_REFCNT_VALUE(c) >= 1);
97 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98 pa_assert(n_formats < PA_MAX_FORMATS);
99
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103 s = pa_xnew(pa_stream, 1);
104 PA_REFCNT_INIT(s);
105 s->context = c;
106 s->mainloop = c->mainloop;
107
108 s->direction = PA_STREAM_NODIRECTION;
109 s->state = PA_STREAM_UNCONNECTED;
110 s->flags = 0;
111
112 if (ss)
113 s->sample_spec = *ss;
114 else
115 pa_sample_spec_init(&s->sample_spec);
116
117 if (map)
118 s->channel_map = *map;
119 else
120 pa_channel_map_init(&s->channel_map);
121
122 s->n_formats = 0;
123 if (formats) {
124 s->n_formats = n_formats;
125 for (i = 0; i < n_formats; i++)
126 s->req_formats[i] = pa_format_info_copy(formats[i]);
127 }
128
129 /* We'll get the final negotiated format after connecting */
130 s->format = NULL;
131
132 s->direct_on_input = PA_INVALID_INDEX;
133
134 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
135 if (name)
136 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
137
138 s->channel = 0;
139 s->channel_valid = FALSE;
140 s->syncid = c->csyncid++;
141 s->stream_index = PA_INVALID_INDEX;
142
143 s->requested_bytes = 0;
144 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
145
146 /* We initialize the target length here, so that if the user
147 * passes no explicit buffering metrics the default is similar to
148 * what older PA versions provided. */
149
150 s->buffer_attr.maxlength = (uint32_t) -1;
151 if (ss)
152 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
153 else {
154 /* FIXME: We assume a worst-case compressed format corresponding to
155 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156 pa_sample_spec tmp_ss = {
157 .format = PA_SAMPLE_S16NE,
158 .rate = 48000,
159 .channels = 2,
160 };
161 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
162 }
163 s->buffer_attr.minreq = (uint32_t) -1;
164 s->buffer_attr.prebuf = (uint32_t) -1;
165 s->buffer_attr.fragsize = (uint32_t) -1;
166
167 s->device_index = PA_INVALID_INDEX;
168 s->device_name = NULL;
169 s->suspended = FALSE;
170 s->corked = FALSE;
171
172 s->write_memblock = NULL;
173 s->write_data = NULL;
174
175 pa_memchunk_reset(&s->peek_memchunk);
176 s->peek_data = NULL;
177 s->record_memblockq = NULL;
178
179 memset(&s->timing_info, 0, sizeof(s->timing_info));
180 s->timing_info_valid = FALSE;
181
182 s->previous_time = 0;
183 s->latest_underrun_at_index = -1;
184
185 s->read_index_not_before = 0;
186 s->write_index_not_before = 0;
187 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188 s->write_index_corrections[i].valid = 0;
189 s->current_write_index_correction = 0;
190
191 s->auto_timing_update_event = NULL;
192 s->auto_timing_update_requested = FALSE;
193 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
194
195 reset_callbacks(s);
196
197 s->smoother = NULL;
198
199 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200 PA_LLIST_PREPEND(pa_stream, c->streams, s);
201 pa_stream_ref(s);
202
203 return s;
204 }
205
206 pa_stream *pa_stream_new_with_proplist(
207 pa_context *c,
208 const char *name,
209 const pa_sample_spec *ss,
210 const pa_channel_map *map,
211 pa_proplist *p) {
212
213 pa_channel_map tmap;
214
215 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
220
221 if (!map)
222 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
223
224 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
225 }
226
227 pa_stream *pa_stream_new_extended(
228 pa_context *c,
229 const char *name,
230 pa_format_info * const *formats,
231 unsigned int n_formats,
232 pa_proplist *p) {
233
234 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
235
236 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
237 }
238
239 static void stream_unlink(pa_stream *s) {
240 pa_operation *o, *n;
241 pa_assert(s);
242
243 if (!s->context)
244 return;
245
246 /* Detach from context */
247
248 /* Unref all operation objects that point to us */
249 for (o = s->context->operations; o; o = n) {
250 n = o->next;
251
252 if (o->stream == s)
253 pa_operation_cancel(o);
254 }
255
256 /* Drop all outstanding replies for this stream */
257 if (s->context->pdispatch)
258 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
259
260 if (s->channel_valid) {
261 pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
262 s->channel = 0;
263 s->channel_valid = FALSE;
264 }
265
266 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
267 pa_stream_unref(s);
268
269 s->context = NULL;
270
271 if (s->auto_timing_update_event) {
272 pa_assert(s->mainloop);
273 s->mainloop->time_free(s->auto_timing_update_event);
274 }
275
276 reset_callbacks(s);
277 }
278
279 static void stream_free(pa_stream *s) {
280 unsigned int i;
281
282 pa_assert(s);
283
284 stream_unlink(s);
285
286 if (s->write_memblock) {
287 if (s->write_data)
288 pa_memblock_release(s->write_memblock);
289 pa_memblock_unref(s->write_memblock);
290 }
291
292 if (s->peek_memchunk.memblock) {
293 if (s->peek_data)
294 pa_memblock_release(s->peek_memchunk.memblock);
295 pa_memblock_unref(s->peek_memchunk.memblock);
296 }
297
298 if (s->record_memblockq)
299 pa_memblockq_free(s->record_memblockq);
300
301 if (s->proplist)
302 pa_proplist_free(s->proplist);
303
304 if (s->smoother)
305 pa_smoother_free(s->smoother);
306
307 for (i = 0; i < s->n_formats; i++)
308 pa_format_info_free(s->req_formats[i]);
309
310 if (s->format)
311 pa_format_info_free(s->format);
312
313 pa_xfree(s->device_name);
314 pa_xfree(s);
315 }
316
317 void pa_stream_unref(pa_stream *s) {
318 pa_assert(s);
319 pa_assert(PA_REFCNT_VALUE(s) >= 1);
320
321 if (PA_REFCNT_DEC(s) <= 0)
322 stream_free(s);
323 }
324
325 pa_stream* pa_stream_ref(pa_stream *s) {
326 pa_assert(s);
327 pa_assert(PA_REFCNT_VALUE(s) >= 1);
328
329 PA_REFCNT_INC(s);
330 return s;
331 }
332
333 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
334 pa_assert(s);
335 pa_assert(PA_REFCNT_VALUE(s) >= 1);
336
337 return s->state;
338 }
339
340 pa_context* pa_stream_get_context(pa_stream *s) {
341 pa_assert(s);
342 pa_assert(PA_REFCNT_VALUE(s) >= 1);
343
344 return s->context;
345 }
346
347 uint32_t pa_stream_get_index(pa_stream *s) {
348 pa_assert(s);
349 pa_assert(PA_REFCNT_VALUE(s) >= 1);
350
351 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
352 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
353
354 return s->stream_index;
355 }
356
357 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
358 pa_assert(s);
359 pa_assert(PA_REFCNT_VALUE(s) >= 1);
360
361 if (s->state == st)
362 return;
363
364 pa_stream_ref(s);
365
366 s->state = st;
367
368 if (s->state_callback)
369 s->state_callback(s, s->state_userdata);
370
371 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
372 stream_unlink(s);
373
374 pa_stream_unref(s);
375 }
376
377 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
378 pa_assert(s);
379 pa_assert(PA_REFCNT_VALUE(s) >= 1);
380
381 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
382 return;
383
384 if (s->state == PA_STREAM_READY &&
385 (force || !s->auto_timing_update_requested)) {
386 pa_operation *o;
387
388 /* pa_log("Automatically requesting new timing data"); */
389
390 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
391 pa_operation_unref(o);
392 s->auto_timing_update_requested = TRUE;
393 }
394 }
395
396 if (s->auto_timing_update_event) {
397 if (s->suspended && !force) {
398 pa_assert(s->mainloop);
399 s->mainloop->time_free(s->auto_timing_update_event);
400 s->auto_timing_update_event = NULL;
401 } else {
402 if (force)
403 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
404
405 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
406
407 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
408 }
409 }
410 }
411
412 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
413 pa_context *c = userdata;
414 pa_stream *s;
415 uint32_t channel;
416
417 pa_assert(pd);
418 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
419 pa_assert(t);
420 pa_assert(c);
421 pa_assert(PA_REFCNT_VALUE(c) >= 1);
422
423 pa_context_ref(c);
424
425 if (pa_tagstruct_getu32(t, &channel) < 0 ||
426 !pa_tagstruct_eof(t)) {
427 pa_context_fail(c, PA_ERR_PROTOCOL);
428 goto finish;
429 }
430
431 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
432 goto finish;
433
434 if (s->state != PA_STREAM_READY)
435 goto finish;
436
437 pa_context_set_error(c, PA_ERR_KILLED);
438 pa_stream_set_state(s, PA_STREAM_FAILED);
439
440 finish:
441 pa_context_unref(c);
442 }
443
444 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
445 pa_usec_t x;
446
447 pa_assert(s);
448 pa_assert(!force_start || !force_stop);
449
450 if (!s->smoother)
451 return;
452
453 x = pa_rtclock_now();
454
455 if (s->timing_info_valid) {
456 if (aposteriori)
457 x -= s->timing_info.transport_usec;
458 else
459 x += s->timing_info.transport_usec;
460 }
461
462 if (s->suspended || s->corked || force_stop)
463 pa_smoother_pause(s->smoother, x);
464 else if (force_start || s->buffer_attr.prebuf == 0) {
465
466 if (!s->timing_info_valid &&
467 !aposteriori &&
468 !force_start &&
469 !force_stop &&
470 s->context->version >= 13) {
471
472 /* If the server supports STARTED events we take them as
473 * indications when audio really starts/stops playing, if
474 * we don't have any timing info yet -- instead of trying
475 * to be smart and guessing the server time. Otherwise the
476 * unknown transport delay adds too much noise to our time
477 * calculations. */
478
479 return;
480 }
481
482 pa_smoother_resume(s->smoother, x, TRUE);
483 }
484
485 /* Please note that we have no idea if playback actually started
486 * if prebuf is non-zero! */
487 }
488
489 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
490
491 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
492 pa_context *c = userdata;
493 pa_stream *s;
494 uint32_t channel;
495 const char *dn;
496 pa_bool_t suspended;
497 uint32_t di;
498 pa_usec_t usec = 0;
499 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
500
501 pa_assert(pd);
502 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
503 pa_assert(t);
504 pa_assert(c);
505 pa_assert(PA_REFCNT_VALUE(c) >= 1);
506
507 pa_context_ref(c);
508
509 if (c->version < 12) {
510 pa_context_fail(c, PA_ERR_PROTOCOL);
511 goto finish;
512 }
513
514 if (pa_tagstruct_getu32(t, &channel) < 0 ||
515 pa_tagstruct_getu32(t, &di) < 0 ||
516 pa_tagstruct_gets(t, &dn) < 0 ||
517 pa_tagstruct_get_boolean(t, &suspended) < 0) {
518 pa_context_fail(c, PA_ERR_PROTOCOL);
519 goto finish;
520 }
521
522 if (c->version >= 13) {
523
524 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
525 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
526 pa_tagstruct_getu32(t, &fragsize) < 0 ||
527 pa_tagstruct_get_usec(t, &usec) < 0) {
528 pa_context_fail(c, PA_ERR_PROTOCOL);
529 goto finish;
530 }
531 } else {
532 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
533 pa_tagstruct_getu32(t, &tlength) < 0 ||
534 pa_tagstruct_getu32(t, &prebuf) < 0 ||
535 pa_tagstruct_getu32(t, &minreq) < 0 ||
536 pa_tagstruct_get_usec(t, &usec) < 0) {
537 pa_context_fail(c, PA_ERR_PROTOCOL);
538 goto finish;
539 }
540 }
541 }
542
543 if (!pa_tagstruct_eof(t)) {
544 pa_context_fail(c, PA_ERR_PROTOCOL);
545 goto finish;
546 }
547
548 if (!dn || di == PA_INVALID_INDEX) {
549 pa_context_fail(c, PA_ERR_PROTOCOL);
550 goto finish;
551 }
552
553 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
554 goto finish;
555
556 if (s->state != PA_STREAM_READY)
557 goto finish;
558
559 if (c->version >= 13) {
560 if (s->direction == PA_STREAM_RECORD)
561 s->timing_info.configured_source_usec = usec;
562 else
563 s->timing_info.configured_sink_usec = usec;
564
565 s->buffer_attr.maxlength = maxlength;
566 s->buffer_attr.fragsize = fragsize;
567 s->buffer_attr.tlength = tlength;
568 s->buffer_attr.prebuf = prebuf;
569 s->buffer_attr.minreq = minreq;
570 }
571
572 pa_xfree(s->device_name);
573 s->device_name = pa_xstrdup(dn);
574 s->device_index = di;
575
576 s->suspended = suspended;
577
578 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
579 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
580 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
581 request_auto_timing_update(s, TRUE);
582 }
583
584 check_smoother_status(s, TRUE, FALSE, FALSE);
585 request_auto_timing_update(s, TRUE);
586
587 if (s->moved_callback)
588 s->moved_callback(s, s->moved_userdata);
589
590 finish:
591 pa_context_unref(c);
592 }
593
594 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
595 pa_context *c = userdata;
596 pa_stream *s;
597 uint32_t channel;
598 pa_usec_t usec = 0;
599 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
600
601 pa_assert(pd);
602 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
603 pa_assert(t);
604 pa_assert(c);
605 pa_assert(PA_REFCNT_VALUE(c) >= 1);
606
607 pa_context_ref(c);
608
609 if (c->version < 15) {
610 pa_context_fail(c, PA_ERR_PROTOCOL);
611 goto finish;
612 }
613
614 if (pa_tagstruct_getu32(t, &channel) < 0) {
615 pa_context_fail(c, PA_ERR_PROTOCOL);
616 goto finish;
617 }
618
619 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
620 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
621 pa_tagstruct_getu32(t, &fragsize) < 0 ||
622 pa_tagstruct_get_usec(t, &usec) < 0) {
623 pa_context_fail(c, PA_ERR_PROTOCOL);
624 goto finish;
625 }
626 } else {
627 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
628 pa_tagstruct_getu32(t, &tlength) < 0 ||
629 pa_tagstruct_getu32(t, &prebuf) < 0 ||
630 pa_tagstruct_getu32(t, &minreq) < 0 ||
631 pa_tagstruct_get_usec(t, &usec) < 0) {
632 pa_context_fail(c, PA_ERR_PROTOCOL);
633 goto finish;
634 }
635 }
636
637 if (!pa_tagstruct_eof(t)) {
638 pa_context_fail(c, PA_ERR_PROTOCOL);
639 goto finish;
640 }
641
642 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
643 goto finish;
644
645 if (s->state != PA_STREAM_READY)
646 goto finish;
647
648 if (s->direction == PA_STREAM_RECORD)
649 s->timing_info.configured_source_usec = usec;
650 else
651 s->timing_info.configured_sink_usec = usec;
652
653 s->buffer_attr.maxlength = maxlength;
654 s->buffer_attr.fragsize = fragsize;
655 s->buffer_attr.tlength = tlength;
656 s->buffer_attr.prebuf = prebuf;
657 s->buffer_attr.minreq = minreq;
658
659 request_auto_timing_update(s, TRUE);
660
661 if (s->buffer_attr_callback)
662 s->buffer_attr_callback(s, s->buffer_attr_userdata);
663
664 finish:
665 pa_context_unref(c);
666 }
667
668 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
669 pa_context *c = userdata;
670 pa_stream *s;
671 uint32_t channel;
672 pa_bool_t suspended;
673
674 pa_assert(pd);
675 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
676 pa_assert(t);
677 pa_assert(c);
678 pa_assert(PA_REFCNT_VALUE(c) >= 1);
679
680 pa_context_ref(c);
681
682 if (c->version < 12) {
683 pa_context_fail(c, PA_ERR_PROTOCOL);
684 goto finish;
685 }
686
687 if (pa_tagstruct_getu32(t, &channel) < 0 ||
688 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
689 !pa_tagstruct_eof(t)) {
690 pa_context_fail(c, PA_ERR_PROTOCOL);
691 goto finish;
692 }
693
694 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
695 goto finish;
696
697 if (s->state != PA_STREAM_READY)
698 goto finish;
699
700 s->suspended = suspended;
701
702 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
703 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
704 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
705 request_auto_timing_update(s, TRUE);
706 }
707
708 check_smoother_status(s, TRUE, FALSE, FALSE);
709 request_auto_timing_update(s, TRUE);
710
711 if (s->suspended_callback)
712 s->suspended_callback(s, s->suspended_userdata);
713
714 finish:
715 pa_context_unref(c);
716 }
717
718 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
719 pa_context *c = userdata;
720 pa_stream *s;
721 uint32_t channel;
722
723 pa_assert(pd);
724 pa_assert(command == PA_COMMAND_STARTED);
725 pa_assert(t);
726 pa_assert(c);
727 pa_assert(PA_REFCNT_VALUE(c) >= 1);
728
729 pa_context_ref(c);
730
731 if (c->version < 13) {
732 pa_context_fail(c, PA_ERR_PROTOCOL);
733 goto finish;
734 }
735
736 if (pa_tagstruct_getu32(t, &channel) < 0 ||
737 !pa_tagstruct_eof(t)) {
738 pa_context_fail(c, PA_ERR_PROTOCOL);
739 goto finish;
740 }
741
742 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
743 goto finish;
744
745 if (s->state != PA_STREAM_READY)
746 goto finish;
747
748 check_smoother_status(s, TRUE, TRUE, FALSE);
749 request_auto_timing_update(s, TRUE);
750
751 if (s->started_callback)
752 s->started_callback(s, s->started_userdata);
753
754 finish:
755 pa_context_unref(c);
756 }
757
758 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
759 pa_context *c = userdata;
760 pa_stream *s;
761 uint32_t channel;
762 pa_proplist *pl = NULL;
763 const char *event;
764
765 pa_assert(pd);
766 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
767 pa_assert(t);
768 pa_assert(c);
769 pa_assert(PA_REFCNT_VALUE(c) >= 1);
770
771 pa_context_ref(c);
772
773 if (c->version < 15) {
774 pa_context_fail(c, PA_ERR_PROTOCOL);
775 goto finish;
776 }
777
778 pl = pa_proplist_new();
779
780 if (pa_tagstruct_getu32(t, &channel) < 0 ||
781 pa_tagstruct_gets(t, &event) < 0 ||
782 pa_tagstruct_get_proplist(t, pl) < 0 ||
783 !pa_tagstruct_eof(t) || !event) {
784 pa_context_fail(c, PA_ERR_PROTOCOL);
785 goto finish;
786 }
787
788 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
789 goto finish;
790
791 if (s->state != PA_STREAM_READY)
792 goto finish;
793
794 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
795 /* Let client know what the running time was when the stream had to be killed */
796 pa_usec_t stream_time;
797 if (pa_stream_get_time(s, &stream_time) == 0)
798 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
799 }
800
801 if (s->event_callback)
802 s->event_callback(s, event, pl, s->event_userdata);
803
804 finish:
805 pa_context_unref(c);
806
807 if (pl)
808 pa_proplist_free(pl);
809 }
810
811 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
812 pa_stream *s;
813 pa_context *c = userdata;
814 uint32_t bytes, channel;
815
816 pa_assert(pd);
817 pa_assert(command == PA_COMMAND_REQUEST);
818 pa_assert(t);
819 pa_assert(c);
820 pa_assert(PA_REFCNT_VALUE(c) >= 1);
821
822 pa_context_ref(c);
823
824 if (pa_tagstruct_getu32(t, &channel) < 0 ||
825 pa_tagstruct_getu32(t, &bytes) < 0 ||
826 !pa_tagstruct_eof(t)) {
827 pa_context_fail(c, PA_ERR_PROTOCOL);
828 goto finish;
829 }
830
831 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
832 goto finish;
833
834 if (s->state != PA_STREAM_READY)
835 goto finish;
836
837 s->requested_bytes += bytes;
838
839 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
840
841 if (s->requested_bytes > 0 && s->write_callback)
842 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
843
844 finish:
845 pa_context_unref(c);
846 }
847
848 int64_t pa_stream_get_underflow_index(pa_stream *p)
849 {
850 pa_assert(p);
851 return p->latest_underrun_at_index;
852 }
853
854 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
855 pa_stream *s;
856 pa_context *c = userdata;
857 uint32_t channel;
858 int64_t offset = -1;
859
860 pa_assert(pd);
861 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
862 pa_assert(t);
863 pa_assert(c);
864 pa_assert(PA_REFCNT_VALUE(c) >= 1);
865
866 pa_context_ref(c);
867
868 if (pa_tagstruct_getu32(t, &channel) < 0) {
869 pa_context_fail(c, PA_ERR_PROTOCOL);
870 goto finish;
871 }
872
873 if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
874 if (pa_tagstruct_gets64(t, &offset) < 0) {
875 pa_context_fail(c, PA_ERR_PROTOCOL);
876 goto finish;
877 }
878 }
879
880 if (!pa_tagstruct_eof(t)) {
881 pa_context_fail(c, PA_ERR_PROTOCOL);
882 goto finish;
883 }
884
885 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
886 goto finish;
887
888 if (s->state != PA_STREAM_READY)
889 goto finish;
890
891 if (offset != -1)
892 s->latest_underrun_at_index = offset;
893
894 if (s->buffer_attr.prebuf > 0)
895 check_smoother_status(s, TRUE, FALSE, TRUE);
896
897 request_auto_timing_update(s, TRUE);
898
899 if (command == PA_COMMAND_OVERFLOW) {
900 if (s->overflow_callback)
901 s->overflow_callback(s, s->overflow_userdata);
902 } else if (command == PA_COMMAND_UNDERFLOW) {
903 if (s->underflow_callback)
904 s->underflow_callback(s, s->underflow_userdata);
905 }
906
907 finish:
908 pa_context_unref(c);
909 }
910
911 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
912 pa_assert(s);
913 pa_assert(PA_REFCNT_VALUE(s) >= 1);
914
915 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
916
917 if (s->state != PA_STREAM_READY)
918 return;
919
920 if (w) {
921 s->write_index_not_before = s->context->ctag;
922
923 if (s->timing_info_valid)
924 s->timing_info.write_index_corrupt = TRUE;
925
926 /* pa_log("write_index invalidated"); */
927 }
928
929 if (r) {
930 s->read_index_not_before = s->context->ctag;
931
932 if (s->timing_info_valid)
933 s->timing_info.read_index_corrupt = TRUE;
934
935 /* pa_log("read_index invalidated"); */
936 }
937
938 request_auto_timing_update(s, TRUE);
939 }
940
941 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
942 pa_stream *s = userdata;
943
944 pa_assert(s);
945 pa_assert(PA_REFCNT_VALUE(s) >= 1);
946
947 pa_stream_ref(s);
948 request_auto_timing_update(s, FALSE);
949 pa_stream_unref(s);
950 }
951
952 static void create_stream_complete(pa_stream *s) {
953 pa_assert(s);
954 pa_assert(PA_REFCNT_VALUE(s) >= 1);
955 pa_assert(s->state == PA_STREAM_CREATING);
956
957 pa_stream_set_state(s, PA_STREAM_READY);
958
959 if (s->requested_bytes > 0 && s->write_callback)
960 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
961
962 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
963 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
964 pa_assert(!s->auto_timing_update_event);
965 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
966
967 request_auto_timing_update(s, TRUE);
968 }
969
970 check_smoother_status(s, TRUE, FALSE, FALSE);
971 }
972
973 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
974 const char *e;
975
976 pa_assert(s);
977 pa_assert(attr);
978
979 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
980 uint32_t ms;
981
982 if (pa_atou(e, &ms) < 0 || ms <= 0)
983 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
984 else {
985 attr->maxlength = (uint32_t) -1;
986 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
987 attr->minreq = (uint32_t) -1;
988 attr->prebuf = (uint32_t) -1;
989 attr->fragsize = attr->tlength;
990 }
991
992 if (flags)
993 *flags |= PA_STREAM_ADJUST_LATENCY;
994 }
995
996 if (s->context->version >= 13)
997 return;
998
999 /* Version older than 0.9.10 didn't do server side buffer_attr
1000 * selection, hence we have to fake it on the client side. */
1001
1002 /* We choose fairly conservative values here, to not confuse
1003 * old clients with extremely large playback buffers */
1004
1005 if (attr->maxlength == (uint32_t) -1)
1006 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1007
1008 if (attr->tlength == (uint32_t) -1)
1009 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1010
1011 if (attr->minreq == (uint32_t) -1)
1012 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1013
1014 if (attr->prebuf == (uint32_t) -1)
1015 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1016
1017 if (attr->fragsize == (uint32_t) -1)
1018 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1019 }
1020
1021 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1022 pa_stream *s = userdata;
1023 uint32_t requested_bytes = 0;
1024
1025 pa_assert(pd);
1026 pa_assert(s);
1027 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1028 pa_assert(s->state == PA_STREAM_CREATING);
1029
1030 pa_stream_ref(s);
1031
1032 if (command != PA_COMMAND_REPLY) {
1033 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1034 goto finish;
1035
1036 pa_stream_set_state(s, PA_STREAM_FAILED);
1037 goto finish;
1038 }
1039
1040 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1041 s->channel == PA_INVALID_INDEX ||
1042 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1043 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1044 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1045 goto finish;
1046 }
1047
1048 s->requested_bytes = (int64_t) requested_bytes;
1049
1050 if (s->context->version >= 9) {
1051 if (s->direction == PA_STREAM_PLAYBACK) {
1052 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1053 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1054 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1055 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1056 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1057 goto finish;
1058 }
1059 } else if (s->direction == PA_STREAM_RECORD) {
1060 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1061 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1062 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1063 goto finish;
1064 }
1065 }
1066 }
1067
1068 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1069 pa_sample_spec ss;
1070 pa_channel_map cm;
1071 const char *dn = NULL;
1072 pa_bool_t suspended;
1073
1074 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1075 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1076 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1077 pa_tagstruct_gets(t, &dn) < 0 ||
1078 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1079 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1080 goto finish;
1081 }
1082
1083 if (!dn || s->device_index == PA_INVALID_INDEX ||
1084 ss.channels != cm.channels ||
1085 !pa_channel_map_valid(&cm) ||
1086 !pa_sample_spec_valid(&ss) ||
1087 (s->n_formats == 0 && (
1088 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1089 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1090 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1091 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1092 goto finish;
1093 }
1094
1095 pa_xfree(s->device_name);
1096 s->device_name = pa_xstrdup(dn);
1097 s->suspended = suspended;
1098
1099 s->channel_map = cm;
1100 s->sample_spec = ss;
1101 }
1102
1103 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1104 pa_usec_t usec;
1105
1106 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1107 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1108 goto finish;
1109 }
1110
1111 if (s->direction == PA_STREAM_RECORD)
1112 s->timing_info.configured_source_usec = usec;
1113 else
1114 s->timing_info.configured_sink_usec = usec;
1115 }
1116
1117 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1118 || s->context->version >= 22) {
1119
1120 pa_format_info *f = pa_format_info_new();
1121 pa_tagstruct_get_format_info(t, f);
1122
1123 if (pa_format_info_valid(f))
1124 s->format = f;
1125 else {
1126 pa_format_info_free(f);
1127 if (s->n_formats > 0) {
1128 /* We used the extended API, so we should have got back a proper format */
1129 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1130 goto finish;
1131 }
1132 }
1133 }
1134
1135 if (!pa_tagstruct_eof(t)) {
1136 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1137 goto finish;
1138 }
1139
1140 if (s->direction == PA_STREAM_RECORD) {
1141 pa_assert(!s->record_memblockq);
1142
1143 s->record_memblockq = pa_memblockq_new(
1144 "client side record memblockq",
1145 0,
1146 s->buffer_attr.maxlength,
1147 0,
1148 &s->sample_spec,
1149 1,
1150 0,
1151 0,
1152 NULL);
1153 }
1154
1155 s->channel_valid = TRUE;
1156 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1157
1158 create_stream_complete(s);
1159
1160 finish:
1161 pa_stream_unref(s);
1162 }
1163
1164 static int create_stream(
1165 pa_stream_direction_t direction,
1166 pa_stream *s,
1167 const char *dev,
1168 const pa_buffer_attr *attr,
1169 pa_stream_flags_t flags,
1170 const pa_cvolume *volume,
1171 pa_stream *sync_stream) {
1172
1173 pa_tagstruct *t;
1174 uint32_t tag;
1175 pa_bool_t volume_set = !!volume;
1176 pa_cvolume cv;
1177 uint32_t i;
1178
1179 pa_assert(s);
1180 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1181 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1182
1183 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1184 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1185 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1186 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1187 PA_STREAM_INTERPOLATE_TIMING|
1188 PA_STREAM_NOT_MONOTONIC|
1189 PA_STREAM_AUTO_TIMING_UPDATE|
1190 PA_STREAM_NO_REMAP_CHANNELS|
1191 PA_STREAM_NO_REMIX_CHANNELS|
1192 PA_STREAM_FIX_FORMAT|
1193 PA_STREAM_FIX_RATE|
1194 PA_STREAM_FIX_CHANNELS|
1195 PA_STREAM_DONT_MOVE|
1196 PA_STREAM_VARIABLE_RATE|
1197 PA_STREAM_PEAK_DETECT|
1198 PA_STREAM_START_MUTED|
1199 PA_STREAM_ADJUST_LATENCY|
1200 PA_STREAM_EARLY_REQUESTS|
1201 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1202 PA_STREAM_START_UNMUTED|
1203 PA_STREAM_FAIL_ON_SUSPEND|
1204 PA_STREAM_RELATIVE_VOLUME|
1205 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1206
1207
1208 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1209 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1210 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1211 /* Although some of the other flags are not supported on older
1212 * version, we don't check for them here, because it doesn't hurt
1213 * when they are passed but actually not supported. This makes
1214 * client development easier */
1215
1216 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1217 PA_CHECK_VALIDITY(s->context, !volume || s->n_formats || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1218 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1219 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1220
1221 pa_stream_ref(s);
1222
1223 s->direction = direction;
1224
1225 if (sync_stream)
1226 s->syncid = sync_stream->syncid;
1227
1228 if (attr)
1229 s->buffer_attr = *attr;
1230 patch_buffer_attr(s, &s->buffer_attr, &flags);
1231
1232 s->flags = flags;
1233 s->corked = !!(flags & PA_STREAM_START_CORKED);
1234
1235 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1236 pa_usec_t x;
1237
1238 x = pa_rtclock_now();
1239
1240 pa_assert(!s->smoother);
1241 s->smoother = pa_smoother_new(
1242 SMOOTHER_ADJUST_TIME,
1243 SMOOTHER_HISTORY_TIME,
1244 !(flags & PA_STREAM_NOT_MONOTONIC),
1245 TRUE,
1246 SMOOTHER_MIN_HISTORY,
1247 x,
1248 TRUE);
1249 }
1250
1251 if (!dev)
1252 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1253
1254 t = pa_tagstruct_command(
1255 s->context,
1256 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1257 &tag);
1258
1259 if (s->context->version < 13)
1260 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1261
1262 pa_tagstruct_put(
1263 t,
1264 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1265 PA_TAG_CHANNEL_MAP, &s->channel_map,
1266 PA_TAG_U32, PA_INVALID_INDEX,
1267 PA_TAG_STRING, dev,
1268 PA_TAG_U32, s->buffer_attr.maxlength,
1269 PA_TAG_BOOLEAN, s->corked,
1270 PA_TAG_INVALID);
1271
1272 if (!volume) {
1273 if (pa_sample_spec_valid(&s->sample_spec))
1274 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1275 else {
1276 /* This is not really relevant, since no volume was set, and
1277 * the real number of channels is embedded in the format_info
1278 * structure */
1279 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1280 }
1281 }
1282
1283 if (s->direction == PA_STREAM_PLAYBACK) {
1284 pa_tagstruct_put(
1285 t,
1286 PA_TAG_U32, s->buffer_attr.tlength,
1287 PA_TAG_U32, s->buffer_attr.prebuf,
1288 PA_TAG_U32, s->buffer_attr.minreq,
1289 PA_TAG_U32, s->syncid,
1290 PA_TAG_INVALID);
1291
1292 pa_tagstruct_put_cvolume(t, volume);
1293 } else
1294 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1295
1296 if (s->context->version >= 12) {
1297 pa_tagstruct_put(
1298 t,
1299 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1300 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1301 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1302 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1303 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1304 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1305 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1306 PA_TAG_INVALID);
1307 }
1308
1309 if (s->context->version >= 13) {
1310
1311 if (s->direction == PA_STREAM_PLAYBACK)
1312 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1313 else
1314 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1315
1316 pa_tagstruct_put(
1317 t,
1318 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1319 PA_TAG_PROPLIST, s->proplist,
1320 PA_TAG_INVALID);
1321
1322 if (s->direction == PA_STREAM_RECORD)
1323 pa_tagstruct_putu32(t, s->direct_on_input);
1324 }
1325
1326 if (s->context->version >= 14) {
1327
1328 if (s->direction == PA_STREAM_PLAYBACK)
1329 pa_tagstruct_put_boolean(t, volume_set);
1330
1331 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1332 }
1333
1334 if (s->context->version >= 15) {
1335
1336 if (s->direction == PA_STREAM_PLAYBACK)
1337 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1338
1339 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1340 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1341 }
1342
1343 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1344 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1345
1346 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1347 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1348
1349 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1350 || s->context->version >= 22) {
1351
1352 pa_tagstruct_putu8(t, s->n_formats);
1353 for (i = 0; i < s->n_formats; i++)
1354 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1355 }
1356
1357 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1358 pa_tagstruct_put_cvolume(t, volume);
1359 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1360 pa_tagstruct_put_boolean(t, volume_set);
1361 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1362 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1363 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1364 }
1365
1366 pa_pstream_send_tagstruct(s->context->pstream, t);
1367 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1368
1369 pa_stream_set_state(s, PA_STREAM_CREATING);
1370
1371 pa_stream_unref(s);
1372 return 0;
1373 }
1374
1375 int pa_stream_connect_playback(
1376 pa_stream *s,
1377 const char *dev,
1378 const pa_buffer_attr *attr,
1379 pa_stream_flags_t flags,
1380 const pa_cvolume *volume,
1381 pa_stream *sync_stream) {
1382
1383 pa_assert(s);
1384 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1385
1386 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1387 }
1388
1389 int pa_stream_connect_record(
1390 pa_stream *s,
1391 const char *dev,
1392 const pa_buffer_attr *attr,
1393 pa_stream_flags_t flags) {
1394
1395 pa_assert(s);
1396 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1397
1398 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1399 }
1400
1401 int pa_stream_begin_write(
1402 pa_stream *s,
1403 void **data,
1404 size_t *nbytes) {
1405
1406 pa_assert(s);
1407 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1408
1409 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1410 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1411 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1412 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1413 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1414
1415 if (*nbytes != (size_t) -1) {
1416 size_t m, fs;
1417
1418 m = pa_mempool_block_size_max(s->context->mempool);
1419 fs = pa_frame_size(&s->sample_spec);
1420
1421 m = (m / fs) * fs;
1422 if (*nbytes > m)
1423 *nbytes = m;
1424 }
1425
1426 if (!s->write_memblock) {
1427 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1428 s->write_data = pa_memblock_acquire(s->write_memblock);
1429 }
1430
1431 *data = s->write_data;
1432 *nbytes = pa_memblock_get_length(s->write_memblock);
1433
1434 return 0;
1435 }
1436
1437 int pa_stream_cancel_write(
1438 pa_stream *s) {
1439
1440 pa_assert(s);
1441 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1442
1443 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1444 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1445 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1446 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1447
1448 pa_assert(s->write_data);
1449
1450 pa_memblock_release(s->write_memblock);
1451 pa_memblock_unref(s->write_memblock);
1452 s->write_memblock = NULL;
1453 s->write_data = NULL;
1454
1455 return 0;
1456 }
1457
1458 int pa_stream_write(
1459 pa_stream *s,
1460 const void *data,
1461 size_t length,
1462 pa_free_cb_t free_cb,
1463 int64_t offset,
1464 pa_seek_mode_t seek) {
1465
1466 pa_assert(s);
1467 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1468 pa_assert(data);
1469
1470 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1471 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1472 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1473 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1474 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1475 PA_CHECK_VALIDITY(s->context,
1476 !s->write_memblock ||
1477 ((data >= s->write_data) &&
1478 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1479 PA_ERR_INVALID);
1480 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1481
1482 if (s->write_memblock) {
1483 pa_memchunk chunk;
1484
1485 /* pa_stream_write_begin() was called before */
1486
1487 pa_memblock_release(s->write_memblock);
1488
1489 chunk.memblock = s->write_memblock;
1490 chunk.index = (const char *) data - (const char *) s->write_data;
1491 chunk.length = length;
1492
1493 s->write_memblock = NULL;
1494 s->write_data = NULL;
1495
1496 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1497 pa_memblock_unref(chunk.memblock);
1498
1499 } else {
1500 pa_seek_mode_t t_seek = seek;
1501 int64_t t_offset = offset;
1502 size_t t_length = length;
1503 const void *t_data = data;
1504
1505 /* pa_stream_write_begin() was not called before */
1506
1507 while (t_length > 0) {
1508 pa_memchunk chunk;
1509
1510 chunk.index = 0;
1511
1512 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1513 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1514 chunk.length = t_length;
1515 } else {
1516 void *d;
1517
1518 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1519 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1520
1521 d = pa_memblock_acquire(chunk.memblock);
1522 memcpy(d, t_data, chunk.length);
1523 pa_memblock_release(chunk.memblock);
1524 }
1525
1526 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1527
1528 t_offset = 0;
1529 t_seek = PA_SEEK_RELATIVE;
1530
1531 t_data = (const uint8_t*) t_data + chunk.length;
1532 t_length -= chunk.length;
1533
1534 pa_memblock_unref(chunk.memblock);
1535 }
1536
1537 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1538 free_cb((void*) data);
1539 }
1540
1541 /* This is obviously wrong since we ignore the seeking index . But
1542 * that's OK, the server side applies the same error */
1543 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1544
1545 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1546
1547 if (s->direction == PA_STREAM_PLAYBACK) {
1548
1549 /* Update latency request correction */
1550 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1551
1552 if (seek == PA_SEEK_ABSOLUTE) {
1553 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1554 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1555 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1556 } else if (seek == PA_SEEK_RELATIVE) {
1557 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1558 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1559 } else
1560 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1561 }
1562
1563 /* Update the write index in the already available latency data */
1564 if (s->timing_info_valid) {
1565
1566 if (seek == PA_SEEK_ABSOLUTE) {
1567 s->timing_info.write_index_corrupt = FALSE;
1568 s->timing_info.write_index = offset + (int64_t) length;
1569 } else if (seek == PA_SEEK_RELATIVE) {
1570 if (!s->timing_info.write_index_corrupt)
1571 s->timing_info.write_index += offset + (int64_t) length;
1572 } else
1573 s->timing_info.write_index_corrupt = TRUE;
1574 }
1575
1576 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1577 request_auto_timing_update(s, TRUE);
1578 }
1579
1580 return 0;
1581 }
1582
1583 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1584 pa_assert(s);
1585 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1586 pa_assert(data);
1587 pa_assert(length);
1588
1589 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1590 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1591 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1592
1593 if (!s->peek_memchunk.memblock) {
1594
1595 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1596 /* record_memblockq is empty. */
1597 *data = NULL;
1598 *length = 0;
1599 return 0;
1600
1601 } else if (!s->peek_memchunk.memblock) {
1602 /* record_memblockq isn't empty, but it doesn't have any data at
1603 * the current read index. */
1604 *data = NULL;
1605 *length = s->peek_memchunk.length;
1606 return 0;
1607 }
1608
1609 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1610 }
1611
1612 pa_assert(s->peek_data);
1613 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1614 *length = s->peek_memchunk.length;
1615 return 0;
1616 }
1617
1618 int pa_stream_drop(pa_stream *s) {
1619 pa_assert(s);
1620 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1621
1622 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1623 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1624 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1625 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1626
1627 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1628
1629 /* Fix the simulated local read index */
1630 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1631 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1632
1633 if (s->peek_memchunk.memblock) {
1634 pa_assert(s->peek_data);
1635 s->peek_data = NULL;
1636 pa_memblock_release(s->peek_memchunk.memblock);
1637 pa_memblock_unref(s->peek_memchunk.memblock);
1638 }
1639
1640 pa_memchunk_reset(&s->peek_memchunk);
1641
1642 return 0;
1643 }
1644
1645 size_t pa_stream_writable_size(pa_stream *s) {
1646 pa_assert(s);
1647 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1648
1649 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1650 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1651 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1652
1653 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1654 }
1655
1656 size_t pa_stream_readable_size(pa_stream *s) {
1657 pa_assert(s);
1658 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1659
1660 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1661 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1662 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1663
1664 return pa_memblockq_get_length(s->record_memblockq);
1665 }
1666
1667 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1668 pa_operation *o;
1669 pa_tagstruct *t;
1670 uint32_t tag;
1671
1672 pa_assert(s);
1673 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1674
1675 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1676 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1677 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1678
1679 /* Ask for a timing update before we cork/uncork to get the best
1680 * accuracy for the transport latency suitable for the
1681 * check_smoother_status() call in the started callback */
1682 request_auto_timing_update(s, TRUE);
1683
1684 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1685
1686 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1687 pa_tagstruct_putu32(t, s->channel);
1688 pa_pstream_send_tagstruct(s->context->pstream, t);
1689 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1690
1691 /* This might cause the read index to continue again, hence
1692 * let's request a timing update */
1693 request_auto_timing_update(s, TRUE);
1694
1695 return o;
1696 }
1697
1698 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1699 pa_usec_t usec;
1700
1701 pa_assert(s);
1702 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1703 pa_assert(s->state == PA_STREAM_READY);
1704 pa_assert(s->direction != PA_STREAM_UPLOAD);
1705 pa_assert(s->timing_info_valid);
1706 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1707 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1708
1709 if (s->direction == PA_STREAM_PLAYBACK) {
1710 /* The last byte that was written into the output device
1711 * had this time value associated */
1712 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1713
1714 if (!s->corked && !s->suspended) {
1715
1716 if (!ignore_transport)
1717 /* Because the latency info took a little time to come
1718 * to us, we assume that the real output time is actually
1719 * a little ahead */
1720 usec += s->timing_info.transport_usec;
1721
1722 /* However, the output device usually maintains a buffer
1723 too, hence the real sample currently played is a little
1724 back */
1725 if (s->timing_info.sink_usec >= usec)
1726 usec = 0;
1727 else
1728 usec -= s->timing_info.sink_usec;
1729 }
1730
1731 } else {
1732 pa_assert(s->direction == PA_STREAM_RECORD);
1733
1734 /* The last byte written into the server side queue had
1735 * this time value associated */
1736 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1737
1738 if (!s->corked && !s->suspended) {
1739
1740 if (!ignore_transport)
1741 /* Add transport latency */
1742 usec += s->timing_info.transport_usec;
1743
1744 /* Add latency of data in device buffer */
1745 usec += s->timing_info.source_usec;
1746
1747 /* If this is a monitor source, we need to correct the
1748 * time by the playback device buffer */
1749 if (s->timing_info.sink_usec >= usec)
1750 usec = 0;
1751 else
1752 usec -= s->timing_info.sink_usec;
1753 }
1754 }
1755
1756 return usec;
1757 }
1758
1759 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1760 pa_operation *o = userdata;
1761 struct timeval local, remote, now;
1762 pa_timing_info *i;
1763 pa_bool_t playing = FALSE;
1764 uint64_t underrun_for = 0, playing_for = 0;
1765
1766 pa_assert(pd);
1767 pa_assert(o);
1768 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1769
1770 if (!o->context || !o->stream)
1771 goto finish;
1772
1773 i = &o->stream->timing_info;
1774
1775 o->stream->timing_info_valid = FALSE;
1776 i->write_index_corrupt = TRUE;
1777 i->read_index_corrupt = TRUE;
1778
1779 if (command != PA_COMMAND_REPLY) {
1780 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1781 goto finish;
1782
1783 } else {
1784
1785 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1786 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1787 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1788 pa_tagstruct_get_timeval(t, &local) < 0 ||
1789 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1790 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1791 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1792
1793 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1794 goto finish;
1795 }
1796
1797 if (o->context->version >= 13 &&
1798 o->stream->direction == PA_STREAM_PLAYBACK)
1799 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1800 pa_tagstruct_getu64(t, &playing_for) < 0) {
1801
1802 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1803 goto finish;
1804 }
1805
1806
1807 if (!pa_tagstruct_eof(t)) {
1808 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1809 goto finish;
1810 }
1811 o->stream->timing_info_valid = TRUE;
1812 i->write_index_corrupt = FALSE;
1813 i->read_index_corrupt = FALSE;
1814
1815 i->playing = (int) playing;
1816 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1817
1818 pa_gettimeofday(&now);
1819
1820 /* Calculate timestamps */
1821 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1822 /* local and remote seem to have synchronized clocks */
1823
1824 if (o->stream->direction == PA_STREAM_PLAYBACK)
1825 i->transport_usec = pa_timeval_diff(&remote, &local);
1826 else
1827 i->transport_usec = pa_timeval_diff(&now, &remote);
1828
1829 i->synchronized_clocks = TRUE;
1830 i->timestamp = remote;
1831 } else {
1832 /* clocks are not synchronized, let's estimate latency then */
1833 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1834 i->synchronized_clocks = FALSE;
1835 i->timestamp = local;
1836 pa_timeval_add(&i->timestamp, i->transport_usec);
1837 }
1838
1839 /* Invalidate read and write indexes if necessary */
1840 if (tag < o->stream->read_index_not_before)
1841 i->read_index_corrupt = TRUE;
1842
1843 if (tag < o->stream->write_index_not_before)
1844 i->write_index_corrupt = TRUE;
1845
1846 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1847 /* Write index correction */
1848
1849 int n, j;
1850 uint32_t ctag = tag;
1851
1852 /* Go through the saved correction values and add up the
1853 * total correction.*/
1854 for (n = 0, j = o->stream->current_write_index_correction+1;
1855 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1856 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1857
1858 /* Step over invalid data or out-of-date data */
1859 if (!o->stream->write_index_corrections[j].valid ||
1860 o->stream->write_index_corrections[j].tag < ctag)
1861 continue;
1862
1863 /* Make sure that everything is in order */
1864 ctag = o->stream->write_index_corrections[j].tag+1;
1865
1866 /* Now fix the write index */
1867 if (o->stream->write_index_corrections[j].corrupt) {
1868 /* A corrupting seek was made */
1869 i->write_index_corrupt = TRUE;
1870 } else if (o->stream->write_index_corrections[j].absolute) {
1871 /* An absolute seek was made */
1872 i->write_index = o->stream->write_index_corrections[j].value;
1873 i->write_index_corrupt = FALSE;
1874 } else if (!i->write_index_corrupt) {
1875 /* A relative seek was made */
1876 i->write_index += o->stream->write_index_corrections[j].value;
1877 }
1878 }
1879
1880 /* Clear old correction entries */
1881 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1882 if (!o->stream->write_index_corrections[n].valid)
1883 continue;
1884
1885 if (o->stream->write_index_corrections[n].tag <= tag)
1886 o->stream->write_index_corrections[n].valid = FALSE;
1887 }
1888 }
1889
1890 if (o->stream->direction == PA_STREAM_RECORD) {
1891 /* Read index correction */
1892
1893 if (!i->read_index_corrupt)
1894 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1895 }
1896
1897 /* Update smoother if we're not corked */
1898 if (o->stream->smoother && !o->stream->corked) {
1899 pa_usec_t u, x;
1900
1901 u = x = pa_rtclock_now() - i->transport_usec;
1902
1903 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1904 pa_usec_t su;
1905
1906 /* If we weren't playing then it will take some time
1907 * until the audio will actually come out through the
1908 * speakers. Since we follow that timing here, we need
1909 * to try to fix this up */
1910
1911 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1912
1913 if (su < i->sink_usec)
1914 x += i->sink_usec - su;
1915 }
1916
1917 if (!i->playing)
1918 pa_smoother_pause(o->stream->smoother, x);
1919
1920 /* Update the smoother */
1921 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1922 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1923 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1924
1925 if (i->playing)
1926 pa_smoother_resume(o->stream->smoother, x, TRUE);
1927 }
1928 }
1929
1930 o->stream->auto_timing_update_requested = FALSE;
1931
1932 if (o->stream->latency_update_callback)
1933 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1934
1935 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1936 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1937 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1938 }
1939
1940 finish:
1941
1942 pa_operation_done(o);
1943 pa_operation_unref(o);
1944 }
1945
1946 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1947 uint32_t tag;
1948 pa_operation *o;
1949 pa_tagstruct *t;
1950 struct timeval now;
1951 int cidx = 0;
1952
1953 pa_assert(s);
1954 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1955
1956 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1957 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1958 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1959
1960 if (s->direction == PA_STREAM_PLAYBACK) {
1961 /* Find a place to store the write_index correction data for this entry */
1962 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1963
1964 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1965 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1966 }
1967 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1968
1969 t = pa_tagstruct_command(
1970 s->context,
1971 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1972 &tag);
1973 pa_tagstruct_putu32(t, s->channel);
1974 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1975
1976 pa_pstream_send_tagstruct(s->context->pstream, t);
1977 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1978
1979 if (s->direction == PA_STREAM_PLAYBACK) {
1980 /* Fill in initial correction data */
1981
1982 s->current_write_index_correction = cidx;
1983
1984 s->write_index_corrections[cidx].valid = TRUE;
1985 s->write_index_corrections[cidx].absolute = FALSE;
1986 s->write_index_corrections[cidx].corrupt = FALSE;
1987 s->write_index_corrections[cidx].tag = tag;
1988 s->write_index_corrections[cidx].value = 0;
1989 }
1990
1991 return o;
1992 }
1993
1994 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1995 pa_stream *s = userdata;
1996
1997 pa_assert(pd);
1998 pa_assert(s);
1999 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2000
2001 pa_stream_ref(s);
2002
2003 if (command != PA_COMMAND_REPLY) {
2004 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
2005 goto finish;
2006
2007 pa_stream_set_state(s, PA_STREAM_FAILED);
2008 goto finish;
2009 } else if (!pa_tagstruct_eof(t)) {
2010 pa_context_fail(s->context, PA_ERR_PROTOCOL);
2011 goto finish;
2012 }
2013
2014 pa_stream_set_state(s, PA_STREAM_TERMINATED);
2015
2016 finish:
2017 pa_stream_unref(s);
2018 }
2019
2020 int pa_stream_disconnect(pa_stream *s) {
2021 pa_tagstruct *t;
2022 uint32_t tag;
2023
2024 pa_assert(s);
2025 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2026
2027 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2028 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2029 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2030
2031 pa_stream_ref(s);
2032
2033 t = pa_tagstruct_command(
2034 s->context,
2035 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2036 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2037 &tag);
2038 pa_tagstruct_putu32(t, s->channel);
2039 pa_pstream_send_tagstruct(s->context->pstream, t);
2040 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2041
2042 pa_stream_unref(s);
2043 return 0;
2044 }
2045
2046 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2047 pa_assert(s);
2048 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2049
2050 if (pa_detect_fork())
2051 return;
2052
2053 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2054 return;
2055
2056 s->read_callback = cb;
2057 s->read_userdata = userdata;
2058 }
2059
2060 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2061 pa_assert(s);
2062 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2063
2064 if (pa_detect_fork())
2065 return;
2066
2067 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2068 return;
2069
2070 s->write_callback = cb;
2071 s->write_userdata = userdata;
2072 }
2073
2074 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2075 pa_assert(s);
2076 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2077
2078 if (pa_detect_fork())
2079 return;
2080
2081 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2082 return;
2083
2084 s->state_callback = cb;
2085 s->state_userdata = userdata;
2086 }
2087
2088 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2089 pa_assert(s);
2090 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2091
2092 if (pa_detect_fork())
2093 return;
2094
2095 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2096 return;
2097
2098 s->overflow_callback = cb;
2099 s->overflow_userdata = userdata;
2100 }
2101
2102 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2103 pa_assert(s);
2104 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2105
2106 if (pa_detect_fork())
2107 return;
2108
2109 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2110 return;
2111
2112 s->underflow_callback = cb;
2113 s->underflow_userdata = userdata;
2114 }
2115
2116 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2117 pa_assert(s);
2118 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2119
2120 if (pa_detect_fork())
2121 return;
2122
2123 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2124 return;
2125
2126 s->latency_update_callback = cb;
2127 s->latency_update_userdata = userdata;
2128 }
2129
2130 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2131 pa_assert(s);
2132 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2133
2134 if (pa_detect_fork())
2135 return;
2136
2137 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2138 return;
2139
2140 s->moved_callback = cb;
2141 s->moved_userdata = userdata;
2142 }
2143
2144 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2145 pa_assert(s);
2146 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2147
2148 if (pa_detect_fork())
2149 return;
2150
2151 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2152 return;
2153
2154 s->suspended_callback = cb;
2155 s->suspended_userdata = userdata;
2156 }
2157
2158 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2159 pa_assert(s);
2160 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2161
2162 if (pa_detect_fork())
2163 return;
2164
2165 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2166 return;
2167
2168 s->started_callback = cb;
2169 s->started_userdata = userdata;
2170 }
2171
2172 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2173 pa_assert(s);
2174 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2175
2176 if (pa_detect_fork())
2177 return;
2178
2179 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2180 return;
2181
2182 s->event_callback = cb;
2183 s->event_userdata = userdata;
2184 }
2185
2186 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2187 pa_assert(s);
2188 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2189
2190 if (pa_detect_fork())
2191 return;
2192
2193 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2194 return;
2195
2196 s->buffer_attr_callback = cb;
2197 s->buffer_attr_userdata = userdata;
2198 }
2199
2200 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2201 pa_operation *o = userdata;
2202 int success = 1;
2203
2204 pa_assert(pd);
2205 pa_assert(o);
2206 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2207
2208 if (!o->context)
2209 goto finish;
2210
2211 if (command != PA_COMMAND_REPLY) {
2212 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2213 goto finish;
2214
2215 success = 0;
2216 } else if (!pa_tagstruct_eof(t)) {
2217 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2218 goto finish;
2219 }
2220
2221 if (o->callback) {
2222 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2223 cb(o->stream, success, o->userdata);
2224 }
2225
2226 finish:
2227 pa_operation_done(o);
2228 pa_operation_unref(o);
2229 }
2230
2231 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2232 pa_operation *o;
2233 pa_tagstruct *t;
2234 uint32_t tag;
2235
2236 pa_assert(s);
2237 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2238
2239 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2240 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2241 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2242
2243 /* Ask for a timing update before we cork/uncork to get the best
2244 * accuracy for the transport latency suitable for the
2245 * check_smoother_status() call in the started callback */
2246 request_auto_timing_update(s, TRUE);
2247
2248 s->corked = b;
2249
2250 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2251
2252 t = pa_tagstruct_command(
2253 s->context,
2254 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2255 &tag);
2256 pa_tagstruct_putu32(t, s->channel);
2257 pa_tagstruct_put_boolean(t, !!b);
2258 pa_pstream_send_tagstruct(s->context->pstream, t);
2259 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2260
2261 check_smoother_status(s, FALSE, FALSE, FALSE);
2262
2263 /* This might cause the indexes to hang/start again, hence let's
2264 * request a timing update, after the cork/uncork, too */
2265 request_auto_timing_update(s, TRUE);
2266
2267 return o;
2268 }
2269
2270 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2271 pa_tagstruct *t;
2272 pa_operation *o;
2273 uint32_t tag;
2274
2275 pa_assert(s);
2276 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2277
2278 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2279 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2280
2281 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2282
2283 t = pa_tagstruct_command(s->context, command, &tag);
2284 pa_tagstruct_putu32(t, s->channel);
2285 pa_pstream_send_tagstruct(s->context->pstream, t);
2286 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2287
2288 return o;
2289 }
2290
2291 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2292 pa_operation *o;
2293
2294 pa_assert(s);
2295 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2296
2297 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2298 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2299 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2300
2301 /* Ask for a timing update *before* the flush, so that the
2302 * transport usec is as up to date as possible when we get the
2303 * underflow message and update the smoother status*/
2304 request_auto_timing_update(s, TRUE);
2305
2306 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2307 return NULL;
2308
2309 if (s->direction == PA_STREAM_PLAYBACK) {
2310
2311 if (s->write_index_corrections[s->current_write_index_correction].valid)
2312 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2313
2314 if (s->buffer_attr.prebuf > 0)
2315 check_smoother_status(s, FALSE, FALSE, TRUE);
2316
2317 /* This will change the write index, but leave the
2318 * read index untouched. */
2319 invalidate_indexes(s, FALSE, TRUE);
2320
2321 } else
2322 /* For record streams this has no influence on the write
2323 * index, but the read index might jump. */
2324 invalidate_indexes(s, TRUE, FALSE);
2325
2326 /* Note that we do not update requested_bytes here. This is
2327 * because we cannot really know how data actually was dropped
2328 * from the write index due to this. This 'error' will be applied
2329 * by both client and server and hence we should be fine. */
2330
2331 return o;
2332 }
2333
2334 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2335 pa_operation *o;
2336
2337 pa_assert(s);
2338 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2339
2340 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2341 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2342 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2343 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2344
2345 /* Ask for a timing update before we cork/uncork to get the best
2346 * accuracy for the transport latency suitable for the
2347 * check_smoother_status() call in the started callback */
2348 request_auto_timing_update(s, TRUE);
2349
2350 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2351 return NULL;
2352
2353 /* This might cause the read index to hang again, hence
2354 * let's request a timing update */
2355 request_auto_timing_update(s, TRUE);
2356
2357 return o;
2358 }
2359
2360 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2361 pa_operation *o;
2362
2363 pa_assert(s);
2364 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2365
2366 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2367 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2368 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2369 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2370
2371 /* Ask for a timing update before we cork/uncork to get the best
2372 * accuracy for the transport latency suitable for the
2373 * check_smoother_status() call in the started callback */
2374 request_auto_timing_update(s, TRUE);
2375
2376 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2377 return NULL;
2378
2379 /* This might cause the read index to start moving again, hence
2380 * let's request a timing update */
2381 request_auto_timing_update(s, TRUE);
2382
2383 return o;
2384 }
2385
2386 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2387 pa_operation *o;
2388
2389 pa_assert(s);
2390 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2391 pa_assert(name);
2392
2393 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2394 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2395 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2396
2397 if (s->context->version >= 13) {
2398 pa_proplist *p = pa_proplist_new();
2399
2400 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2401 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2402 pa_proplist_free(p);
2403 } else {
2404 pa_tagstruct *t;
2405 uint32_t tag;
2406
2407 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2408 t = pa_tagstruct_command(
2409 s->context,
2410 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2411 &tag);
2412 pa_tagstruct_putu32(t, s->channel);
2413 pa_tagstruct_puts(t, name);
2414 pa_pstream_send_tagstruct(s->context->pstream, t);
2415 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2416 }
2417
2418 return o;
2419 }
2420
2421 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2422 pa_usec_t usec;
2423
2424 pa_assert(s);
2425 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2426
2427 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2428 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2429 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2430 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2431 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2432 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2433
2434 if (s->smoother)
2435 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2436 else
2437 usec = calc_time(s, FALSE);
2438
2439 /* Make sure the time runs monotonically */
2440 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2441 if (usec < s->previous_time)
2442 usec = s->previous_time;
2443 else
2444 s->previous_time = usec;
2445 }
2446
2447 if (r_usec)
2448 *r_usec = usec;
2449
2450 return 0;
2451 }
2452
2453 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2454 pa_assert(s);
2455 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2456
2457 if (negative)
2458 *negative = 0;
2459
2460 if (a >= b)
2461 return a-b;
2462 else {
2463 if (negative && s->direction == PA_STREAM_RECORD) {
2464 *negative = 1;
2465 return b-a;
2466 } else
2467 return 0;
2468 }
2469 }
2470
2471 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2472 pa_usec_t t, c;
2473 int r;
2474 int64_t cindex;
2475
2476 pa_assert(s);
2477 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2478 pa_assert(r_usec);
2479
2480 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2481 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2482 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2483 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2484 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2485 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2486
2487 if ((r = pa_stream_get_time(s, &t)) < 0)
2488 return r;
2489
2490 if (s->direction == PA_STREAM_PLAYBACK)
2491 cindex = s->timing_info.write_index;
2492 else
2493 cindex = s->timing_info.read_index;
2494
2495 if (cindex < 0)
2496 cindex = 0;
2497
2498 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2499
2500 if (s->direction == PA_STREAM_PLAYBACK)
2501 *r_usec = time_counter_diff(s, c, t, negative);
2502 else
2503 *r_usec = time_counter_diff(s, t, c, negative);
2504
2505 return 0;
2506 }
2507
2508 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2509 pa_assert(s);
2510 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2511
2512 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2513 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2514 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2515 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2516
2517 return &s->timing_info;
2518 }
2519
2520 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2521 pa_assert(s);
2522 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2523
2524 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2525
2526 return &s->sample_spec;
2527 }
2528
2529 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2530 pa_assert(s);
2531 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2532
2533 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2534
2535 return &s->channel_map;
2536 }
2537
2538 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2539 pa_assert(s);
2540 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2541
2542 /* We don't have the format till routing is done */
2543 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2544 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2545
2546 return s->format;
2547 }
2548 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2549 pa_assert(s);
2550 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2551
2552 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2553 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2554 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2555
2556 return &s->buffer_attr;
2557 }
2558
2559 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2560 pa_operation *o = userdata;
2561 int success = 1;
2562
2563 pa_assert(pd);
2564 pa_assert(o);
2565 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2566
2567 if (!o->context)
2568 goto finish;
2569
2570 if (command != PA_COMMAND_REPLY) {
2571 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2572 goto finish;
2573
2574 success = 0;
2575 } else {
2576 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2577 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2578 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2579 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2580 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2581 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2582 goto finish;
2583 }
2584 } else if (o->stream->direction == PA_STREAM_RECORD) {
2585 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2586 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2587 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2588 goto finish;
2589 }
2590 }
2591
2592 if (o->stream->context->version >= 13) {
2593 pa_usec_t usec;
2594
2595 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2596 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2597 goto finish;
2598 }
2599
2600 if (o->stream->direction == PA_STREAM_RECORD)
2601 o->stream->timing_info.configured_source_usec = usec;
2602 else
2603 o->stream->timing_info.configured_sink_usec = usec;
2604 }
2605
2606 if (!pa_tagstruct_eof(t)) {
2607 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2608 goto finish;
2609 }
2610 }
2611
2612 if (o->callback) {
2613 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2614 cb(o->stream, success, o->userdata);
2615 }
2616
2617 finish:
2618 pa_operation_done(o);
2619 pa_operation_unref(o);
2620 }
2621
2622
2623 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2624 pa_operation *o;
2625 pa_tagstruct *t;
2626 uint32_t tag;
2627 pa_buffer_attr copy;
2628
2629 pa_assert(s);
2630 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2631 pa_assert(attr);
2632
2633 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2634 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2635 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2636 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2637
2638 /* Ask for a timing update before we cork/uncork to get the best
2639 * accuracy for the transport latency suitable for the
2640 * check_smoother_status() call in the started callback */
2641 request_auto_timing_update(s, TRUE);
2642
2643 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2644
2645 t = pa_tagstruct_command(
2646 s->context,
2647 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2648 &tag);
2649 pa_tagstruct_putu32(t, s->channel);
2650
2651 copy = *attr;
2652 patch_buffer_attr(s, &copy, NULL);
2653 attr = &copy;
2654
2655 pa_tagstruct_putu32(t, attr->maxlength);
2656
2657 if (s->direction == PA_STREAM_PLAYBACK)
2658 pa_tagstruct_put(
2659 t,
2660 PA_TAG_U32, attr->tlength,
2661 PA_TAG_U32, attr->prebuf,
2662 PA_TAG_U32, attr->minreq,
2663 PA_TAG_INVALID);
2664 else
2665 pa_tagstruct_putu32(t, attr->fragsize);
2666
2667 if (s->context->version >= 13)
2668 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2669
2670 if (s->context->version >= 14)
2671 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2672
2673 pa_pstream_send_tagstruct(s->context->pstream, t);
2674 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2675
2676 /* This might cause changes in the read/write index, hence let's
2677 * request a timing update */
2678 request_auto_timing_update(s, TRUE);
2679
2680 return o;
2681 }
2682
2683 uint32_t pa_stream_get_device_index(pa_stream *s) {
2684 pa_assert(s);
2685 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2686
2687 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2688 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2689 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2690 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2691 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2692
2693 return s->device_index;
2694 }
2695
2696 const char *pa_stream_get_device_name(pa_stream *s) {
2697 pa_assert(s);
2698 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2699
2700 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2701 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2702 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2703 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2704 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2705
2706 return s->device_name;
2707 }
2708
2709 int pa_stream_is_suspended(pa_stream *s) {
2710 pa_assert(s);
2711 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2712
2713 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2714 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2715 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2716 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2717
2718 return s->suspended;
2719 }
2720
2721 int pa_stream_is_corked(pa_stream *s) {
2722 pa_assert(s);
2723 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2724
2725 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2726 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2727 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2728
2729 return s->corked;
2730 }
2731
2732 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2733 pa_operation *o = userdata;
2734 int success = 1;
2735
2736 pa_assert(pd);
2737 pa_assert(o);
2738 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2739
2740 if (!o->context)
2741 goto finish;
2742
2743 if (command != PA_COMMAND_REPLY) {
2744 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2745 goto finish;
2746
2747 success = 0;
2748 } else {
2749
2750 if (!pa_tagstruct_eof(t)) {
2751 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2752 goto finish;
2753 }
2754 }
2755
2756 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2757 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2758
2759 if (o->callback) {
2760 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2761 cb(o->stream, success, o->userdata);
2762 }
2763
2764 finish:
2765 pa_operation_done(o);
2766 pa_operation_unref(o);
2767 }
2768
2769
2770 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2771 pa_operation *o;
2772 pa_tagstruct *t;
2773 uint32_t tag;
2774
2775 pa_assert(s);
2776 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2777
2778 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2779 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2780 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2781 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2782 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2783 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2784
2785 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2786 o->private = PA_UINT_TO_PTR(rate);
2787
2788 t = pa_tagstruct_command(
2789 s->context,
2790 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2791 &tag);
2792 pa_tagstruct_putu32(t, s->channel);
2793 pa_tagstruct_putu32(t, rate);
2794
2795 pa_pstream_send_tagstruct(s->context->pstream, t);
2796 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2797
2798 return o;
2799 }
2800
2801 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2802 pa_operation *o;
2803 pa_tagstruct *t;
2804 uint32_t tag;
2805
2806 pa_assert(s);
2807 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2808
2809 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2810 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2811 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2812 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2813 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2814
2815 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2816
2817 t = pa_tagstruct_command(
2818 s->context,
2819 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2820 &tag);
2821 pa_tagstruct_putu32(t, s->channel);
2822 pa_tagstruct_putu32(t, (uint32_t) mode);
2823 pa_tagstruct_put_proplist(t, p);
2824
2825 pa_pstream_send_tagstruct(s->context->pstream, t);
2826 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2827
2828 /* Please note that we don't update s->proplist here, because we
2829 * don't export that field */
2830
2831 return o;
2832 }
2833
2834 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2835 pa_operation *o;
2836 pa_tagstruct *t;
2837 uint32_t tag;
2838 const char * const*k;
2839
2840 pa_assert(s);
2841 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2842
2843 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2844 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2845 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2846 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2847 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2848
2849 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2850
2851 t = pa_tagstruct_command(
2852 s->context,
2853 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2854 &tag);
2855 pa_tagstruct_putu32(t, s->channel);
2856
2857 for (k = keys; *k; k++)
2858 pa_tagstruct_puts(t, *k);
2859
2860 pa_tagstruct_puts(t, NULL);
2861
2862 pa_pstream_send_tagstruct(s->context->pstream, t);
2863 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2864
2865 /* Please note that we don't update s->proplist here, because we
2866 * don't export that field */
2867
2868 return o;
2869 }
2870
2871 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2872 pa_assert(s);
2873 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2874
2875 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2876 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2877 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2878 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2879
2880 s->direct_on_input = sink_input_idx;
2881
2882 return 0;
2883 }
2884
2885 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2886 pa_assert(s);
2887 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2888
2889 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2890 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2891 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2892
2893 return s->direct_on_input;
2894 }