]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
alsa: synthesize volume values more sensibly for channels that are not controllable...
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
40
41 #include "fork-detect.h"
42 #include "internal.h"
43
44 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
45 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
46
47 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_MIN_HISTORY (4)
50
51 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
52 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
53 }
54
55 static void reset_callbacks(pa_stream *s) {
56 s->read_callback = NULL;
57 s->read_userdata = NULL;
58 s->write_callback = NULL;
59 s->write_userdata = NULL;
60 s->state_callback = NULL;
61 s->state_userdata = NULL;
62 s->overflow_callback = NULL;
63 s->overflow_userdata = NULL;
64 s->underflow_callback = NULL;
65 s->underflow_userdata = NULL;
66 s->latency_update_callback = NULL;
67 s->latency_update_userdata = NULL;
68 s->moved_callback = NULL;
69 s->moved_userdata = NULL;
70 s->suspended_callback = NULL;
71 s->suspended_userdata = NULL;
72 s->started_callback = NULL;
73 s->started_userdata = NULL;
74 s->event_callback = NULL;
75 s->event_userdata = NULL;
76 s->buffer_attr_callback = NULL;
77 s->buffer_attr_userdata = NULL;
78 }
79
80 pa_stream *pa_stream_new_with_proplist(
81 pa_context *c,
82 const char *name,
83 const pa_sample_spec *ss,
84 const pa_channel_map *map,
85 pa_proplist *p) {
86
87 pa_stream *s;
88 int i;
89 pa_channel_map tmap;
90
91 pa_assert(c);
92 pa_assert(PA_REFCNT_VALUE(c) >= 1);
93
94 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
101
102 if (!map)
103 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
104
105 s = pa_xnew(pa_stream, 1);
106 PA_REFCNT_INIT(s);
107 s->context = c;
108 s->mainloop = c->mainloop;
109
110 s->direction = PA_STREAM_NODIRECTION;
111 s->state = PA_STREAM_UNCONNECTED;
112 s->flags = 0;
113
114 s->sample_spec = *ss;
115 s->channel_map = *map;
116
117 s->direct_on_input = PA_INVALID_INDEX;
118
119 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
120 if (name)
121 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
122
123 s->channel = 0;
124 s->channel_valid = FALSE;
125 s->syncid = c->csyncid++;
126 s->stream_index = PA_INVALID_INDEX;
127
128 s->requested_bytes = 0;
129 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
130
131 /* We initialize der target length here, so that if the user
132 * passes no explicit buffering metrics the default is similar to
133 * what older PA versions provided. */
134
135 s->buffer_attr.maxlength = (uint32_t) -1;
136 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
137 s->buffer_attr.minreq = (uint32_t) -1;
138 s->buffer_attr.prebuf = (uint32_t) -1;
139 s->buffer_attr.fragsize = (uint32_t) -1;
140
141 s->device_index = PA_INVALID_INDEX;
142 s->device_name = NULL;
143 s->suspended = FALSE;
144 s->corked = FALSE;
145
146 pa_memchunk_reset(&s->peek_memchunk);
147 s->peek_data = NULL;
148
149 s->record_memblockq = NULL;
150
151
152 memset(&s->timing_info, 0, sizeof(s->timing_info));
153 s->timing_info_valid = FALSE;
154
155 s->previous_time = 0;
156
157 s->read_index_not_before = 0;
158 s->write_index_not_before = 0;
159 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
160 s->write_index_corrections[i].valid = 0;
161 s->current_write_index_correction = 0;
162
163 s->auto_timing_update_event = NULL;
164 s->auto_timing_update_requested = FALSE;
165 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
166
167 reset_callbacks(s);
168
169 s->smoother = NULL;
170
171 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
172 PA_LLIST_PREPEND(pa_stream, c->streams, s);
173 pa_stream_ref(s);
174
175 return s;
176 }
177
178 static void stream_unlink(pa_stream *s) {
179 pa_operation *o, *n;
180 pa_assert(s);
181
182 if (!s->context)
183 return;
184
185 /* Detach from context */
186
187 /* Unref all operatio object that point to us */
188 for (o = s->context->operations; o; o = n) {
189 n = o->next;
190
191 if (o->stream == s)
192 pa_operation_cancel(o);
193 }
194
195 /* Drop all outstanding replies for this stream */
196 if (s->context->pdispatch)
197 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
198
199 if (s->channel_valid) {
200 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
201 s->channel = 0;
202 s->channel_valid = FALSE;
203 }
204
205 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
206 pa_stream_unref(s);
207
208 s->context = NULL;
209
210 if (s->auto_timing_update_event) {
211 pa_assert(s->mainloop);
212 s->mainloop->time_free(s->auto_timing_update_event);
213 }
214
215 reset_callbacks(s);
216 }
217
218 static void stream_free(pa_stream *s) {
219 pa_assert(s);
220
221 stream_unlink(s);
222
223 if (s->peek_memchunk.memblock) {
224 if (s->peek_data)
225 pa_memblock_release(s->peek_memchunk.memblock);
226 pa_memblock_unref(s->peek_memchunk.memblock);
227 }
228
229 if (s->record_memblockq)
230 pa_memblockq_free(s->record_memblockq);
231
232 if (s->proplist)
233 pa_proplist_free(s->proplist);
234
235 if (s->smoother)
236 pa_smoother_free(s->smoother);
237
238 pa_xfree(s->device_name);
239 pa_xfree(s);
240 }
241
242 void pa_stream_unref(pa_stream *s) {
243 pa_assert(s);
244 pa_assert(PA_REFCNT_VALUE(s) >= 1);
245
246 if (PA_REFCNT_DEC(s) <= 0)
247 stream_free(s);
248 }
249
250 pa_stream* pa_stream_ref(pa_stream *s) {
251 pa_assert(s);
252 pa_assert(PA_REFCNT_VALUE(s) >= 1);
253
254 PA_REFCNT_INC(s);
255 return s;
256 }
257
258 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
259 pa_assert(s);
260 pa_assert(PA_REFCNT_VALUE(s) >= 1);
261
262 return s->state;
263 }
264
265 pa_context* pa_stream_get_context(pa_stream *s) {
266 pa_assert(s);
267 pa_assert(PA_REFCNT_VALUE(s) >= 1);
268
269 return s->context;
270 }
271
272 uint32_t pa_stream_get_index(pa_stream *s) {
273 pa_assert(s);
274 pa_assert(PA_REFCNT_VALUE(s) >= 1);
275
276 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
277 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
278
279 return s->stream_index;
280 }
281
282 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
283 pa_assert(s);
284 pa_assert(PA_REFCNT_VALUE(s) >= 1);
285
286 if (s->state == st)
287 return;
288
289 pa_stream_ref(s);
290
291 s->state = st;
292
293 if (s->state_callback)
294 s->state_callback(s, s->state_userdata);
295
296 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
297 stream_unlink(s);
298
299 pa_stream_unref(s);
300 }
301
302 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
303 pa_assert(s);
304 pa_assert(PA_REFCNT_VALUE(s) >= 1);
305
306 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
307 return;
308
309 if (s->state == PA_STREAM_READY &&
310 (force || !s->auto_timing_update_requested)) {
311 pa_operation *o;
312
313 /* pa_log("Automatically requesting new timing data"); */
314
315 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
316 pa_operation_unref(o);
317 s->auto_timing_update_requested = TRUE;
318 }
319 }
320
321 if (s->auto_timing_update_event) {
322 struct timeval next;
323
324 if (force)
325 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
326
327 pa_gettimeofday(&next);
328 pa_timeval_add(&next, s->auto_timing_interval_usec);
329 s->mainloop->time_restart(s->auto_timing_update_event, &next);
330
331 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
332 }
333 }
334
335 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
336 pa_context *c = userdata;
337 pa_stream *s;
338 uint32_t channel;
339
340 pa_assert(pd);
341 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
342 pa_assert(t);
343 pa_assert(c);
344 pa_assert(PA_REFCNT_VALUE(c) >= 1);
345
346 pa_context_ref(c);
347
348 if (pa_tagstruct_getu32(t, &channel) < 0 ||
349 !pa_tagstruct_eof(t)) {
350 pa_context_fail(c, PA_ERR_PROTOCOL);
351 goto finish;
352 }
353
354 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
355 goto finish;
356
357 if (s->state != PA_STREAM_READY)
358 goto finish;
359
360 pa_context_set_error(c, PA_ERR_KILLED);
361 pa_stream_set_state(s, PA_STREAM_FAILED);
362
363 finish:
364 pa_context_unref(c);
365 }
366
367 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
368 pa_usec_t x;
369
370 pa_assert(s);
371 pa_assert(!force_start || !force_stop);
372
373 if (!s->smoother)
374 return;
375
376 x = pa_rtclock_usec();
377
378 if (s->timing_info_valid) {
379 if (aposteriori)
380 x -= s->timing_info.transport_usec;
381 else
382 x += s->timing_info.transport_usec;
383 }
384
385 if (s->suspended || s->corked || force_stop)
386 pa_smoother_pause(s->smoother, x);
387 else if (force_start || s->buffer_attr.prebuf == 0)
388 pa_smoother_resume(s->smoother, x, TRUE);
389
390
391 /* Please note that we have no idea if playback actually started
392 * if prebuf is non-zero! */
393 }
394
395 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
396 pa_context *c = userdata;
397 pa_stream *s;
398 uint32_t channel;
399 const char *dn;
400 pa_bool_t suspended;
401 uint32_t di;
402 pa_usec_t usec = 0;
403 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
404
405 pa_assert(pd);
406 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
407 pa_assert(t);
408 pa_assert(c);
409 pa_assert(PA_REFCNT_VALUE(c) >= 1);
410
411 pa_context_ref(c);
412
413 if (c->version < 12) {
414 pa_context_fail(c, PA_ERR_PROTOCOL);
415 goto finish;
416 }
417
418 if (pa_tagstruct_getu32(t, &channel) < 0 ||
419 pa_tagstruct_getu32(t, &di) < 0 ||
420 pa_tagstruct_gets(t, &dn) < 0 ||
421 pa_tagstruct_get_boolean(t, &suspended) < 0) {
422 pa_context_fail(c, PA_ERR_PROTOCOL);
423 goto finish;
424 }
425
426 if (c->version >= 13) {
427
428 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
429 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
430 pa_tagstruct_getu32(t, &fragsize) < 0 ||
431 pa_tagstruct_get_usec(t, &usec) < 0) {
432 pa_context_fail(c, PA_ERR_PROTOCOL);
433 goto finish;
434 }
435 } else {
436 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
437 pa_tagstruct_getu32(t, &tlength) < 0 ||
438 pa_tagstruct_getu32(t, &prebuf) < 0 ||
439 pa_tagstruct_getu32(t, &minreq) < 0 ||
440 pa_tagstruct_get_usec(t, &usec) < 0) {
441 pa_context_fail(c, PA_ERR_PROTOCOL);
442 goto finish;
443 }
444 }
445 }
446
447 if (!pa_tagstruct_eof(t)) {
448 pa_context_fail(c, PA_ERR_PROTOCOL);
449 goto finish;
450 }
451
452 if (!dn || di == PA_INVALID_INDEX) {
453 pa_context_fail(c, PA_ERR_PROTOCOL);
454 goto finish;
455 }
456
457 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
458 goto finish;
459
460 if (s->state != PA_STREAM_READY)
461 goto finish;
462
463 if (c->version >= 13) {
464 if (s->direction == PA_STREAM_RECORD)
465 s->timing_info.configured_source_usec = usec;
466 else
467 s->timing_info.configured_sink_usec = usec;
468
469 s->buffer_attr.maxlength = maxlength;
470 s->buffer_attr.fragsize = fragsize;
471 s->buffer_attr.tlength = tlength;
472 s->buffer_attr.prebuf = prebuf;
473 s->buffer_attr.minreq = minreq;
474 }
475
476 pa_xfree(s->device_name);
477 s->device_name = pa_xstrdup(dn);
478 s->device_index = di;
479
480 s->suspended = suspended;
481
482 check_smoother_status(s, TRUE, FALSE, FALSE);
483 request_auto_timing_update(s, TRUE);
484
485 if (s->moved_callback)
486 s->moved_callback(s, s->moved_userdata);
487
488 finish:
489 pa_context_unref(c);
490 }
491
492 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
493 pa_context *c = userdata;
494 pa_stream *s;
495 uint32_t channel;
496 pa_usec_t usec = 0;
497 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
498
499 pa_assert(pd);
500 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
501 pa_assert(t);
502 pa_assert(c);
503 pa_assert(PA_REFCNT_VALUE(c) >= 1);
504
505 pa_context_ref(c);
506
507 if (c->version < 15) {
508 pa_context_fail(c, PA_ERR_PROTOCOL);
509 goto finish;
510 }
511
512 if (pa_tagstruct_getu32(t, &channel) < 0) {
513 pa_context_fail(c, PA_ERR_PROTOCOL);
514 goto finish;
515 }
516
517 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
518 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
519 pa_tagstruct_getu32(t, &fragsize) < 0 ||
520 pa_tagstruct_get_usec(t, &usec) < 0) {
521 pa_context_fail(c, PA_ERR_PROTOCOL);
522 goto finish;
523 }
524 } else {
525 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
526 pa_tagstruct_getu32(t, &tlength) < 0 ||
527 pa_tagstruct_getu32(t, &prebuf) < 0 ||
528 pa_tagstruct_getu32(t, &minreq) < 0 ||
529 pa_tagstruct_get_usec(t, &usec) < 0) {
530 pa_context_fail(c, PA_ERR_PROTOCOL);
531 goto finish;
532 }
533 }
534
535 if (!pa_tagstruct_eof(t)) {
536 pa_context_fail(c, PA_ERR_PROTOCOL);
537 goto finish;
538 }
539
540 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
541 goto finish;
542
543 if (s->state != PA_STREAM_READY)
544 goto finish;
545
546 if (s->direction == PA_STREAM_RECORD)
547 s->timing_info.configured_source_usec = usec;
548 else
549 s->timing_info.configured_sink_usec = usec;
550
551 s->buffer_attr.maxlength = maxlength;
552 s->buffer_attr.fragsize = fragsize;
553 s->buffer_attr.tlength = tlength;
554 s->buffer_attr.prebuf = prebuf;
555 s->buffer_attr.minreq = minreq;
556
557 request_auto_timing_update(s, TRUE);
558
559 if (s->buffer_attr_callback)
560 s->buffer_attr_callback(s, s->buffer_attr_userdata);
561
562 finish:
563 pa_context_unref(c);
564 }
565
566 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
567 pa_context *c = userdata;
568 pa_stream *s;
569 uint32_t channel;
570 pa_bool_t suspended;
571
572 pa_assert(pd);
573 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
574 pa_assert(t);
575 pa_assert(c);
576 pa_assert(PA_REFCNT_VALUE(c) >= 1);
577
578 pa_context_ref(c);
579
580 if (c->version < 12) {
581 pa_context_fail(c, PA_ERR_PROTOCOL);
582 goto finish;
583 }
584
585 if (pa_tagstruct_getu32(t, &channel) < 0 ||
586 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
587 !pa_tagstruct_eof(t)) {
588 pa_context_fail(c, PA_ERR_PROTOCOL);
589 goto finish;
590 }
591
592 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
593 goto finish;
594
595 if (s->state != PA_STREAM_READY)
596 goto finish;
597
598 s->suspended = suspended;
599
600 check_smoother_status(s, TRUE, FALSE, FALSE);
601 request_auto_timing_update(s, TRUE);
602
603 if (s->suspended_callback)
604 s->suspended_callback(s, s->suspended_userdata);
605
606 finish:
607 pa_context_unref(c);
608 }
609
610 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
611 pa_context *c = userdata;
612 pa_stream *s;
613 uint32_t channel;
614
615 pa_assert(pd);
616 pa_assert(command == PA_COMMAND_STARTED);
617 pa_assert(t);
618 pa_assert(c);
619 pa_assert(PA_REFCNT_VALUE(c) >= 1);
620
621 pa_context_ref(c);
622
623 if (c->version < 13) {
624 pa_context_fail(c, PA_ERR_PROTOCOL);
625 goto finish;
626 }
627
628 if (pa_tagstruct_getu32(t, &channel) < 0 ||
629 !pa_tagstruct_eof(t)) {
630 pa_context_fail(c, PA_ERR_PROTOCOL);
631 goto finish;
632 }
633
634 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
635 goto finish;
636
637 if (s->state != PA_STREAM_READY)
638 goto finish;
639
640 check_smoother_status(s, TRUE, TRUE, FALSE);
641 request_auto_timing_update(s, TRUE);
642
643 if (s->started_callback)
644 s->started_callback(s, s->started_userdata);
645
646 finish:
647 pa_context_unref(c);
648 }
649
650 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
651 pa_context *c = userdata;
652 pa_stream *s;
653 uint32_t channel;
654 pa_proplist *pl = NULL;
655 const char *event;
656
657 pa_assert(pd);
658 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
659 pa_assert(t);
660 pa_assert(c);
661 pa_assert(PA_REFCNT_VALUE(c) >= 1);
662
663 pa_context_ref(c);
664
665 if (c->version < 15) {
666 pa_context_fail(c, PA_ERR_PROTOCOL);
667 goto finish;
668 }
669
670 pl = pa_proplist_new();
671
672 if (pa_tagstruct_getu32(t, &channel) < 0 ||
673 pa_tagstruct_gets(t, &event) < 0 ||
674 pa_tagstruct_get_proplist(t, pl) < 0 ||
675 !pa_tagstruct_eof(t) || !event) {
676 pa_context_fail(c, PA_ERR_PROTOCOL);
677 goto finish;
678 }
679
680 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
681 goto finish;
682
683 if (s->state != PA_STREAM_READY)
684 goto finish;
685
686 if (s->event_callback)
687 s->event_callback(s, event, pl, s->event_userdata);
688
689 finish:
690 pa_context_unref(c);
691
692 if (pl)
693 pa_proplist_free(pl);
694 }
695
696 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
697 pa_stream *s;
698 pa_context *c = userdata;
699 uint32_t bytes, channel;
700
701 pa_assert(pd);
702 pa_assert(command == PA_COMMAND_REQUEST);
703 pa_assert(t);
704 pa_assert(c);
705 pa_assert(PA_REFCNT_VALUE(c) >= 1);
706
707 pa_context_ref(c);
708
709 if (pa_tagstruct_getu32(t, &channel) < 0 ||
710 pa_tagstruct_getu32(t, &bytes) < 0 ||
711 !pa_tagstruct_eof(t)) {
712 pa_context_fail(c, PA_ERR_PROTOCOL);
713 goto finish;
714 }
715
716 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
717 goto finish;
718
719 if (s->state != PA_STREAM_READY)
720 goto finish;
721
722 s->requested_bytes += bytes;
723
724 if (s->requested_bytes > 0 && s->write_callback)
725 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
726
727 finish:
728 pa_context_unref(c);
729 }
730
731 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
732 pa_stream *s;
733 pa_context *c = userdata;
734 uint32_t channel;
735
736 pa_assert(pd);
737 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
738 pa_assert(t);
739 pa_assert(c);
740 pa_assert(PA_REFCNT_VALUE(c) >= 1);
741
742 pa_context_ref(c);
743
744 if (pa_tagstruct_getu32(t, &channel) < 0 ||
745 !pa_tagstruct_eof(t)) {
746 pa_context_fail(c, PA_ERR_PROTOCOL);
747 goto finish;
748 }
749
750 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
751 goto finish;
752
753 if (s->state != PA_STREAM_READY)
754 goto finish;
755
756 if (s->buffer_attr.prebuf > 0)
757 check_smoother_status(s, TRUE, FALSE, TRUE);
758
759 request_auto_timing_update(s, TRUE);
760
761 if (command == PA_COMMAND_OVERFLOW) {
762 if (s->overflow_callback)
763 s->overflow_callback(s, s->overflow_userdata);
764 } else if (command == PA_COMMAND_UNDERFLOW) {
765 if (s->underflow_callback)
766 s->underflow_callback(s, s->underflow_userdata);
767 }
768
769 finish:
770 pa_context_unref(c);
771 }
772
773 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
774 pa_assert(s);
775 pa_assert(PA_REFCNT_VALUE(s) >= 1);
776
777 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
778
779 if (s->state != PA_STREAM_READY)
780 return;
781
782 if (w) {
783 s->write_index_not_before = s->context->ctag;
784
785 if (s->timing_info_valid)
786 s->timing_info.write_index_corrupt = TRUE;
787
788 /* pa_log("write_index invalidated"); */
789 }
790
791 if (r) {
792 s->read_index_not_before = s->context->ctag;
793
794 if (s->timing_info_valid)
795 s->timing_info.read_index_corrupt = TRUE;
796
797 /* pa_log("read_index invalidated"); */
798 }
799
800 request_auto_timing_update(s, TRUE);
801 }
802
803 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
804 pa_stream *s = userdata;
805
806 pa_assert(s);
807 pa_assert(PA_REFCNT_VALUE(s) >= 1);
808
809 pa_stream_ref(s);
810 request_auto_timing_update(s, FALSE);
811 pa_stream_unref(s);
812 }
813
814 static void create_stream_complete(pa_stream *s) {
815 pa_assert(s);
816 pa_assert(PA_REFCNT_VALUE(s) >= 1);
817 pa_assert(s->state == PA_STREAM_CREATING);
818
819 pa_stream_set_state(s, PA_STREAM_READY);
820
821 if (s->requested_bytes > 0 && s->write_callback)
822 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
823
824 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
825 struct timeval tv;
826 pa_gettimeofday(&tv);
827 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
828 pa_timeval_add(&tv, s->auto_timing_interval_usec);
829 pa_assert(!s->auto_timing_update_event);
830 s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
831
832 request_auto_timing_update(s, TRUE);
833 }
834
835 check_smoother_status(s, TRUE, FALSE, FALSE);
836 }
837
838 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
839 pa_assert(s);
840 pa_assert(attr);
841 pa_assert(ss);
842
843 if (s->context->version >= 13)
844 return;
845
846 /* Version older than 0.9.10 didn't do server side buffer_attr
847 * selection, hence we have to fake it on the client side. */
848
849 /* We choose fairly conservative values here, to not confuse
850 * old clients with extremely large playback buffers */
851
852 if (attr->maxlength == (uint32_t) -1)
853 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
854
855 if (attr->tlength == (uint32_t) -1)
856 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
857
858 if (attr->minreq == (uint32_t) -1)
859 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
860
861 if (attr->prebuf == (uint32_t) -1)
862 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
863
864 if (attr->fragsize == (uint32_t) -1)
865 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
866 }
867
868 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
869 pa_stream *s = userdata;
870 uint32_t requested_bytes;
871
872 pa_assert(pd);
873 pa_assert(s);
874 pa_assert(PA_REFCNT_VALUE(s) >= 1);
875 pa_assert(s->state == PA_STREAM_CREATING);
876
877 pa_stream_ref(s);
878
879 if (command != PA_COMMAND_REPLY) {
880 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
881 goto finish;
882
883 pa_stream_set_state(s, PA_STREAM_FAILED);
884 goto finish;
885 }
886
887 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
888 s->channel == PA_INVALID_INDEX ||
889 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
890 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
891 pa_context_fail(s->context, PA_ERR_PROTOCOL);
892 goto finish;
893 }
894
895 s->requested_bytes = (int64_t) requested_bytes;
896
897 if (s->context->version >= 9) {
898 if (s->direction == PA_STREAM_PLAYBACK) {
899 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
900 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
901 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
902 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
903 pa_context_fail(s->context, PA_ERR_PROTOCOL);
904 goto finish;
905 }
906 } else if (s->direction == PA_STREAM_RECORD) {
907 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
908 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
909 pa_context_fail(s->context, PA_ERR_PROTOCOL);
910 goto finish;
911 }
912 }
913 }
914
915 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
916 pa_sample_spec ss;
917 pa_channel_map cm;
918 const char *dn = NULL;
919 pa_bool_t suspended;
920
921 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
922 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
923 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
924 pa_tagstruct_gets(t, &dn) < 0 ||
925 pa_tagstruct_get_boolean(t, &suspended) < 0) {
926 pa_context_fail(s->context, PA_ERR_PROTOCOL);
927 goto finish;
928 }
929
930 if (!dn || s->device_index == PA_INVALID_INDEX ||
931 ss.channels != cm.channels ||
932 !pa_channel_map_valid(&cm) ||
933 !pa_sample_spec_valid(&ss) ||
934 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
935 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
936 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
937 pa_context_fail(s->context, PA_ERR_PROTOCOL);
938 goto finish;
939 }
940
941 pa_xfree(s->device_name);
942 s->device_name = pa_xstrdup(dn);
943 s->suspended = suspended;
944
945 s->channel_map = cm;
946 s->sample_spec = ss;
947 }
948
949 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
950 pa_usec_t usec;
951
952 if (pa_tagstruct_get_usec(t, &usec) < 0) {
953 pa_context_fail(s->context, PA_ERR_PROTOCOL);
954 goto finish;
955 }
956
957 if (s->direction == PA_STREAM_RECORD)
958 s->timing_info.configured_source_usec = usec;
959 else
960 s->timing_info.configured_sink_usec = usec;
961 }
962
963 if (!pa_tagstruct_eof(t)) {
964 pa_context_fail(s->context, PA_ERR_PROTOCOL);
965 goto finish;
966 }
967
968 if (s->direction == PA_STREAM_RECORD) {
969 pa_assert(!s->record_memblockq);
970
971 s->record_memblockq = pa_memblockq_new(
972 0,
973 s->buffer_attr.maxlength,
974 0,
975 pa_frame_size(&s->sample_spec),
976 1,
977 0,
978 0,
979 NULL);
980 }
981
982 s->channel_valid = TRUE;
983 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
984
985 create_stream_complete(s);
986
987 finish:
988 pa_stream_unref(s);
989 }
990
991 static int create_stream(
992 pa_stream_direction_t direction,
993 pa_stream *s,
994 const char *dev,
995 const pa_buffer_attr *attr,
996 pa_stream_flags_t flags,
997 const pa_cvolume *volume,
998 pa_stream *sync_stream) {
999
1000 pa_tagstruct *t;
1001 uint32_t tag;
1002 pa_bool_t volume_set = FALSE;
1003
1004 pa_assert(s);
1005 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1006 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1007
1008 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1009 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1010 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1011 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1012 PA_STREAM_INTERPOLATE_TIMING|
1013 PA_STREAM_NOT_MONOTONIC|
1014 PA_STREAM_AUTO_TIMING_UPDATE|
1015 PA_STREAM_NO_REMAP_CHANNELS|
1016 PA_STREAM_NO_REMIX_CHANNELS|
1017 PA_STREAM_FIX_FORMAT|
1018 PA_STREAM_FIX_RATE|
1019 PA_STREAM_FIX_CHANNELS|
1020 PA_STREAM_DONT_MOVE|
1021 PA_STREAM_VARIABLE_RATE|
1022 PA_STREAM_PEAK_DETECT|
1023 PA_STREAM_START_MUTED|
1024 PA_STREAM_ADJUST_LATENCY|
1025 PA_STREAM_EARLY_REQUESTS|
1026 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1027 PA_STREAM_START_UNMUTED|
1028 PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1029
1030 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1031 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1032 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1033 /* Althought some of the other flags are not supported on older
1034 * version, we don't check for them here, because it doesn't hurt
1035 * when they are passed but actually not supported. This makes
1036 * client development easier */
1037
1038 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1039 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1040 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1041 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1042 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1043
1044 pa_stream_ref(s);
1045
1046 s->direction = direction;
1047 s->flags = flags;
1048 s->corked = !!(flags & PA_STREAM_START_CORKED);
1049
1050 if (sync_stream)
1051 s->syncid = sync_stream->syncid;
1052
1053 if (attr)
1054 s->buffer_attr = *attr;
1055 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1056
1057 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1058 pa_usec_t x;
1059
1060 x = pa_rtclock_usec();
1061
1062 pa_assert(!s->smoother);
1063 s->smoother = pa_smoother_new(
1064 SMOOTHER_ADJUST_TIME,
1065 SMOOTHER_HISTORY_TIME,
1066 !(flags & PA_STREAM_NOT_MONOTONIC),
1067 TRUE,
1068 SMOOTHER_MIN_HISTORY,
1069 x,
1070 TRUE);
1071 }
1072
1073 if (!dev)
1074 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1075
1076 t = pa_tagstruct_command(
1077 s->context,
1078 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1079 &tag);
1080
1081 if (s->context->version < 13)
1082 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1083
1084 pa_tagstruct_put(
1085 t,
1086 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1087 PA_TAG_CHANNEL_MAP, &s->channel_map,
1088 PA_TAG_U32, PA_INVALID_INDEX,
1089 PA_TAG_STRING, dev,
1090 PA_TAG_U32, s->buffer_attr.maxlength,
1091 PA_TAG_BOOLEAN, s->corked,
1092 PA_TAG_INVALID);
1093
1094 if (s->direction == PA_STREAM_PLAYBACK) {
1095 pa_cvolume cv;
1096
1097 pa_tagstruct_put(
1098 t,
1099 PA_TAG_U32, s->buffer_attr.tlength,
1100 PA_TAG_U32, s->buffer_attr.prebuf,
1101 PA_TAG_U32, s->buffer_attr.minreq,
1102 PA_TAG_U32, s->syncid,
1103 PA_TAG_INVALID);
1104
1105 volume_set = !!volume;
1106
1107 if (!volume)
1108 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1109
1110 pa_tagstruct_put_cvolume(t, volume);
1111 } else
1112 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1113
1114 if (s->context->version >= 12) {
1115 pa_tagstruct_put(
1116 t,
1117 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1118 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1119 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1120 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1121 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1122 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1123 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1124 PA_TAG_INVALID);
1125 }
1126
1127 if (s->context->version >= 13) {
1128
1129 if (s->direction == PA_STREAM_PLAYBACK)
1130 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1131 else
1132 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1133
1134 pa_tagstruct_put(
1135 t,
1136 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1137 PA_TAG_PROPLIST, s->proplist,
1138 PA_TAG_INVALID);
1139
1140 if (s->direction == PA_STREAM_RECORD)
1141 pa_tagstruct_putu32(t, s->direct_on_input);
1142 }
1143
1144 if (s->context->version >= 14) {
1145
1146 if (s->direction == PA_STREAM_PLAYBACK)
1147 pa_tagstruct_put_boolean(t, volume_set);
1148
1149 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1150 }
1151
1152 if (s->context->version >= 15) {
1153
1154 if (s->direction == PA_STREAM_PLAYBACK)
1155 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1156
1157 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1158 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1159 }
1160
1161 pa_pstream_send_tagstruct(s->context->pstream, t);
1162 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1163
1164 pa_stream_set_state(s, PA_STREAM_CREATING);
1165
1166 pa_stream_unref(s);
1167 return 0;
1168 }
1169
1170 int pa_stream_connect_playback(
1171 pa_stream *s,
1172 const char *dev,
1173 const pa_buffer_attr *attr,
1174 pa_stream_flags_t flags,
1175 pa_cvolume *volume,
1176 pa_stream *sync_stream) {
1177
1178 pa_assert(s);
1179 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1180
1181 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1182 }
1183
1184 int pa_stream_connect_record(
1185 pa_stream *s,
1186 const char *dev,
1187 const pa_buffer_attr *attr,
1188 pa_stream_flags_t flags) {
1189
1190 pa_assert(s);
1191 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1192
1193 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1194 }
1195
1196 int pa_stream_write(
1197 pa_stream *s,
1198 const void *data,
1199 size_t length,
1200 void (*free_cb)(void *p),
1201 int64_t offset,
1202 pa_seek_mode_t seek) {
1203
1204 pa_memchunk chunk;
1205 pa_seek_mode_t t_seek;
1206 int64_t t_offset;
1207 size_t t_length;
1208 const void *t_data;
1209
1210 pa_assert(s);
1211 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1212 pa_assert(data);
1213
1214 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1215 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1216 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1217 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1218 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1219
1220 if (length <= 0)
1221 return 0;
1222
1223 t_seek = seek;
1224 t_offset = offset;
1225 t_length = length;
1226 t_data = data;
1227
1228 while (t_length > 0) {
1229
1230 chunk.index = 0;
1231
1232 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1233 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1234 chunk.length = t_length;
1235 } else {
1236 void *d;
1237
1238 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1239 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1240
1241 d = pa_memblock_acquire(chunk.memblock);
1242 memcpy(d, t_data, chunk.length);
1243 pa_memblock_release(chunk.memblock);
1244 }
1245
1246 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1247
1248 t_offset = 0;
1249 t_seek = PA_SEEK_RELATIVE;
1250
1251 t_data = (const uint8_t*) t_data + chunk.length;
1252 t_length -= chunk.length;
1253
1254 pa_memblock_unref(chunk.memblock);
1255 }
1256
1257 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1258 free_cb((void*) data);
1259
1260 /* This is obviously wrong since we ignore the seeking index . But
1261 * that's OK, the server side applies the same error */
1262 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1263
1264 if (s->direction == PA_STREAM_PLAYBACK) {
1265
1266 /* Update latency request correction */
1267 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1268
1269 if (seek == PA_SEEK_ABSOLUTE) {
1270 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1271 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1272 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1273 } else if (seek == PA_SEEK_RELATIVE) {
1274 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1275 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1276 } else
1277 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1278 }
1279
1280 /* Update the write index in the already available latency data */
1281 if (s->timing_info_valid) {
1282
1283 if (seek == PA_SEEK_ABSOLUTE) {
1284 s->timing_info.write_index_corrupt = FALSE;
1285 s->timing_info.write_index = offset + (int64_t) length;
1286 } else if (seek == PA_SEEK_RELATIVE) {
1287 if (!s->timing_info.write_index_corrupt)
1288 s->timing_info.write_index += offset + (int64_t) length;
1289 } else
1290 s->timing_info.write_index_corrupt = TRUE;
1291 }
1292
1293 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1294 request_auto_timing_update(s, TRUE);
1295 }
1296
1297 return 0;
1298 }
1299
1300 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1301 pa_assert(s);
1302 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1303 pa_assert(data);
1304 pa_assert(length);
1305
1306 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1307 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1308 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1309
1310 if (!s->peek_memchunk.memblock) {
1311
1312 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1313 *data = NULL;
1314 *length = 0;
1315 return 0;
1316 }
1317
1318 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1319 }
1320
1321 pa_assert(s->peek_data);
1322 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1323 *length = s->peek_memchunk.length;
1324 return 0;
1325 }
1326
1327 int pa_stream_drop(pa_stream *s) {
1328 pa_assert(s);
1329 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1330
1331 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1332 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1333 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1334 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1335
1336 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1337
1338 /* Fix the simulated local read index */
1339 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1340 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1341
1342 pa_assert(s->peek_data);
1343 pa_memblock_release(s->peek_memchunk.memblock);
1344 pa_memblock_unref(s->peek_memchunk.memblock);
1345 pa_memchunk_reset(&s->peek_memchunk);
1346
1347 return 0;
1348 }
1349
1350 size_t pa_stream_writable_size(pa_stream *s) {
1351 pa_assert(s);
1352 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1353
1354 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1355 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1356 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1357
1358 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1359 }
1360
1361 size_t pa_stream_readable_size(pa_stream *s) {
1362 pa_assert(s);
1363 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1364
1365 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1366 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1367 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1368
1369 return pa_memblockq_get_length(s->record_memblockq);
1370 }
1371
1372 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1373 pa_operation *o;
1374 pa_tagstruct *t;
1375 uint32_t tag;
1376
1377 pa_assert(s);
1378 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1379
1380 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1381 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1382 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1383
1384 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1385
1386 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1387 pa_tagstruct_putu32(t, s->channel);
1388 pa_pstream_send_tagstruct(s->context->pstream, t);
1389 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1390
1391 return o;
1392 }
1393
1394 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1395 pa_usec_t usec;
1396
1397 pa_assert(s);
1398 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1399 pa_assert(s->state == PA_STREAM_READY);
1400 pa_assert(s->direction != PA_STREAM_UPLOAD);
1401 pa_assert(s->timing_info_valid);
1402 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1403 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1404
1405 if (s->direction == PA_STREAM_PLAYBACK) {
1406 /* The last byte that was written into the output device
1407 * had this time value associated */
1408 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1409
1410 if (!s->corked && !s->suspended) {
1411
1412 if (!ignore_transport)
1413 /* Because the latency info took a little time to come
1414 * to us, we assume that the real output time is actually
1415 * a little ahead */
1416 usec += s->timing_info.transport_usec;
1417
1418 /* However, the output device usually maintains a buffer
1419 too, hence the real sample currently played is a little
1420 back */
1421 if (s->timing_info.sink_usec >= usec)
1422 usec = 0;
1423 else
1424 usec -= s->timing_info.sink_usec;
1425 }
1426
1427 } else {
1428 pa_assert(s->direction == PA_STREAM_RECORD);
1429
1430 /* The last byte written into the server side queue had
1431 * this time value associated */
1432 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1433
1434 if (!s->corked && !s->suspended) {
1435
1436 if (!ignore_transport)
1437 /* Add transport latency */
1438 usec += s->timing_info.transport_usec;
1439
1440 /* Add latency of data in device buffer */
1441 usec += s->timing_info.source_usec;
1442
1443 /* If this is a monitor source, we need to correct the
1444 * time by the playback device buffer */
1445 if (s->timing_info.sink_usec >= usec)
1446 usec = 0;
1447 else
1448 usec -= s->timing_info.sink_usec;
1449 }
1450 }
1451
1452 return usec;
1453 }
1454
1455 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1456 pa_operation *o = userdata;
1457 struct timeval local, remote, now;
1458 pa_timing_info *i;
1459 pa_bool_t playing = FALSE;
1460 uint64_t underrun_for = 0, playing_for = 0;
1461
1462 pa_assert(pd);
1463 pa_assert(o);
1464 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1465
1466 if (!o->context || !o->stream)
1467 goto finish;
1468
1469 i = &o->stream->timing_info;
1470
1471 o->stream->timing_info_valid = FALSE;
1472 i->write_index_corrupt = TRUE;
1473 i->read_index_corrupt = TRUE;
1474
1475 if (command != PA_COMMAND_REPLY) {
1476 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1477 goto finish;
1478
1479 } else {
1480
1481 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1482 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1483 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1484 pa_tagstruct_get_timeval(t, &local) < 0 ||
1485 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1486 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1487 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1488
1489 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1490 goto finish;
1491 }
1492
1493 if (o->context->version >= 13 &&
1494 o->stream->direction == PA_STREAM_PLAYBACK)
1495 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1496 pa_tagstruct_getu64(t, &playing_for) < 0) {
1497
1498 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1499 goto finish;
1500 }
1501
1502
1503 if (!pa_tagstruct_eof(t)) {
1504 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1505 goto finish;
1506 }
1507 o->stream->timing_info_valid = TRUE;
1508 i->write_index_corrupt = FALSE;
1509 i->read_index_corrupt = FALSE;
1510
1511 i->playing = (int) playing;
1512 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1513
1514 pa_gettimeofday(&now);
1515
1516 /* Calculcate timestamps */
1517 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1518 /* local and remote seem to have synchronized clocks */
1519
1520 if (o->stream->direction == PA_STREAM_PLAYBACK)
1521 i->transport_usec = pa_timeval_diff(&remote, &local);
1522 else
1523 i->transport_usec = pa_timeval_diff(&now, &remote);
1524
1525 i->synchronized_clocks = TRUE;
1526 i->timestamp = remote;
1527 } else {
1528 /* clocks are not synchronized, let's estimate latency then */
1529 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1530 i->synchronized_clocks = FALSE;
1531 i->timestamp = local;
1532 pa_timeval_add(&i->timestamp, i->transport_usec);
1533 }
1534
1535 /* Invalidate read and write indexes if necessary */
1536 if (tag < o->stream->read_index_not_before)
1537 i->read_index_corrupt = TRUE;
1538
1539 if (tag < o->stream->write_index_not_before)
1540 i->write_index_corrupt = TRUE;
1541
1542 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1543 /* Write index correction */
1544
1545 int n, j;
1546 uint32_t ctag = tag;
1547
1548 /* Go through the saved correction values and add up the
1549 * total correction.*/
1550 for (n = 0, j = o->stream->current_write_index_correction+1;
1551 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1552 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1553
1554 /* Step over invalid data or out-of-date data */
1555 if (!o->stream->write_index_corrections[j].valid ||
1556 o->stream->write_index_corrections[j].tag < ctag)
1557 continue;
1558
1559 /* Make sure that everything is in order */
1560 ctag = o->stream->write_index_corrections[j].tag+1;
1561
1562 /* Now fix the write index */
1563 if (o->stream->write_index_corrections[j].corrupt) {
1564 /* A corrupting seek was made */
1565 i->write_index_corrupt = TRUE;
1566 } else if (o->stream->write_index_corrections[j].absolute) {
1567 /* An absolute seek was made */
1568 i->write_index = o->stream->write_index_corrections[j].value;
1569 i->write_index_corrupt = FALSE;
1570 } else if (!i->write_index_corrupt) {
1571 /* A relative seek was made */
1572 i->write_index += o->stream->write_index_corrections[j].value;
1573 }
1574 }
1575
1576 /* Clear old correction entries */
1577 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1578 if (!o->stream->write_index_corrections[n].valid)
1579 continue;
1580
1581 if (o->stream->write_index_corrections[n].tag <= tag)
1582 o->stream->write_index_corrections[n].valid = FALSE;
1583 }
1584 }
1585
1586 if (o->stream->direction == PA_STREAM_RECORD) {
1587 /* Read index correction */
1588
1589 if (!i->read_index_corrupt)
1590 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1591 }
1592
1593 /* Update smoother */
1594 if (o->stream->smoother) {
1595 pa_usec_t u, x;
1596
1597 u = x = pa_rtclock_usec() - i->transport_usec;
1598
1599 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1600 pa_usec_t su;
1601
1602 /* If we weren't playing then it will take some time
1603 * until the audio will actually come out through the
1604 * speakers. Since we follow that timing here, we need
1605 * to try to fix this up */
1606
1607 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1608
1609 if (su < i->sink_usec)
1610 x += i->sink_usec - su;
1611 }
1612
1613 if (!i->playing)
1614 pa_smoother_pause(o->stream->smoother, x);
1615
1616 /* Update the smoother */
1617 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1618 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1619 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1620
1621 if (i->playing)
1622 pa_smoother_resume(o->stream->smoother, x, TRUE);
1623 }
1624 }
1625
1626 o->stream->auto_timing_update_requested = FALSE;
1627
1628 if (o->stream->latency_update_callback)
1629 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1630
1631 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1632 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1633 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1634 }
1635
1636 finish:
1637
1638 pa_operation_done(o);
1639 pa_operation_unref(o);
1640 }
1641
1642 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1643 uint32_t tag;
1644 pa_operation *o;
1645 pa_tagstruct *t;
1646 struct timeval now;
1647 int cidx = 0;
1648
1649 pa_assert(s);
1650 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1651
1652 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1653 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1654 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1655
1656 if (s->direction == PA_STREAM_PLAYBACK) {
1657 /* Find a place to store the write_index correction data for this entry */
1658 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1659
1660 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1661 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1662 }
1663 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1664
1665 t = pa_tagstruct_command(
1666 s->context,
1667 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1668 &tag);
1669 pa_tagstruct_putu32(t, s->channel);
1670 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1671
1672 pa_pstream_send_tagstruct(s->context->pstream, t);
1673 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1674
1675 if (s->direction == PA_STREAM_PLAYBACK) {
1676 /* Fill in initial correction data */
1677
1678 s->current_write_index_correction = cidx;
1679
1680 s->write_index_corrections[cidx].valid = TRUE;
1681 s->write_index_corrections[cidx].absolute = FALSE;
1682 s->write_index_corrections[cidx].corrupt = FALSE;
1683 s->write_index_corrections[cidx].tag = tag;
1684 s->write_index_corrections[cidx].value = 0;
1685 }
1686
1687 return o;
1688 }
1689
1690 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1691 pa_stream *s = userdata;
1692
1693 pa_assert(pd);
1694 pa_assert(s);
1695 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1696
1697 pa_stream_ref(s);
1698
1699 if (command != PA_COMMAND_REPLY) {
1700 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1701 goto finish;
1702
1703 pa_stream_set_state(s, PA_STREAM_FAILED);
1704 goto finish;
1705 } else if (!pa_tagstruct_eof(t)) {
1706 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1707 goto finish;
1708 }
1709
1710 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1711
1712 finish:
1713 pa_stream_unref(s);
1714 }
1715
1716 int pa_stream_disconnect(pa_stream *s) {
1717 pa_tagstruct *t;
1718 uint32_t tag;
1719
1720 pa_assert(s);
1721 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1722
1723 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1724 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1725 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1726
1727 pa_stream_ref(s);
1728
1729 t = pa_tagstruct_command(
1730 s->context,
1731 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1732 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1733 &tag);
1734 pa_tagstruct_putu32(t, s->channel);
1735 pa_pstream_send_tagstruct(s->context->pstream, t);
1736 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1737
1738 pa_stream_unref(s);
1739 return 0;
1740 }
1741
1742 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1743 pa_assert(s);
1744 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1745
1746 if (pa_detect_fork())
1747 return;
1748
1749 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1750 return;
1751
1752 s->read_callback = cb;
1753 s->read_userdata = userdata;
1754 }
1755
1756 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1757 pa_assert(s);
1758 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1759
1760 if (pa_detect_fork())
1761 return;
1762
1763 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1764 return;
1765
1766 s->write_callback = cb;
1767 s->write_userdata = userdata;
1768 }
1769
1770 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1771 pa_assert(s);
1772 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1773
1774 if (pa_detect_fork())
1775 return;
1776
1777 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1778 return;
1779
1780 s->state_callback = cb;
1781 s->state_userdata = userdata;
1782 }
1783
1784 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1785 pa_assert(s);
1786 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1787
1788 if (pa_detect_fork())
1789 return;
1790
1791 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1792 return;
1793
1794 s->overflow_callback = cb;
1795 s->overflow_userdata = userdata;
1796 }
1797
1798 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1799 pa_assert(s);
1800 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1801
1802 if (pa_detect_fork())
1803 return;
1804
1805 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1806 return;
1807
1808 s->underflow_callback = cb;
1809 s->underflow_userdata = userdata;
1810 }
1811
1812 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1813 pa_assert(s);
1814 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1815
1816 if (pa_detect_fork())
1817 return;
1818
1819 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1820 return;
1821
1822 s->latency_update_callback = cb;
1823 s->latency_update_userdata = userdata;
1824 }
1825
1826 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1827 pa_assert(s);
1828 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1829
1830 if (pa_detect_fork())
1831 return;
1832
1833 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1834 return;
1835
1836 s->moved_callback = cb;
1837 s->moved_userdata = userdata;
1838 }
1839
1840 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1841 pa_assert(s);
1842 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1843
1844 if (pa_detect_fork())
1845 return;
1846
1847 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1848 return;
1849
1850 s->suspended_callback = cb;
1851 s->suspended_userdata = userdata;
1852 }
1853
1854 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1855 pa_assert(s);
1856 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1857
1858 if (pa_detect_fork())
1859 return;
1860
1861 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1862 return;
1863
1864 s->started_callback = cb;
1865 s->started_userdata = userdata;
1866 }
1867
1868 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1869 pa_assert(s);
1870 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1871
1872 if (pa_detect_fork())
1873 return;
1874
1875 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1876 return;
1877
1878 s->event_callback = cb;
1879 s->event_userdata = userdata;
1880 }
1881
1882 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1883 pa_assert(s);
1884 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1885
1886 if (pa_detect_fork())
1887 return;
1888
1889 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1890 return;
1891
1892 s->buffer_attr_callback = cb;
1893 s->buffer_attr_userdata = userdata;
1894 }
1895
1896 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1897 pa_operation *o = userdata;
1898 int success = 1;
1899
1900 pa_assert(pd);
1901 pa_assert(o);
1902 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1903
1904 if (!o->context)
1905 goto finish;
1906
1907 if (command != PA_COMMAND_REPLY) {
1908 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1909 goto finish;
1910
1911 success = 0;
1912 } else if (!pa_tagstruct_eof(t)) {
1913 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1914 goto finish;
1915 }
1916
1917 if (o->callback) {
1918 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1919 cb(o->stream, success, o->userdata);
1920 }
1921
1922 finish:
1923 pa_operation_done(o);
1924 pa_operation_unref(o);
1925 }
1926
1927 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1928 pa_operation *o;
1929 pa_tagstruct *t;
1930 uint32_t tag;
1931
1932 pa_assert(s);
1933 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1934
1935 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1936 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1937 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1938
1939 s->corked = b;
1940
1941 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1942
1943 t = pa_tagstruct_command(
1944 s->context,
1945 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1946 &tag);
1947 pa_tagstruct_putu32(t, s->channel);
1948 pa_tagstruct_put_boolean(t, !!b);
1949 pa_pstream_send_tagstruct(s->context->pstream, t);
1950 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1951
1952 check_smoother_status(s, FALSE, FALSE, FALSE);
1953
1954 /* This might cause the indexes to hang/start again, hence
1955 * let's request a timing update */
1956 request_auto_timing_update(s, TRUE);
1957
1958 return o;
1959 }
1960
1961 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1962 pa_tagstruct *t;
1963 pa_operation *o;
1964 uint32_t tag;
1965
1966 pa_assert(s);
1967 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1968
1969 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1970 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1971
1972 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1973
1974 t = pa_tagstruct_command(s->context, command, &tag);
1975 pa_tagstruct_putu32(t, s->channel);
1976 pa_pstream_send_tagstruct(s->context->pstream, t);
1977 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1978
1979 return o;
1980 }
1981
1982 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1983 pa_operation *o;
1984
1985 pa_assert(s);
1986 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1987
1988 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1989 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1990 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1991
1992 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1993 return NULL;
1994
1995 if (s->direction == PA_STREAM_PLAYBACK) {
1996
1997 if (s->write_index_corrections[s->current_write_index_correction].valid)
1998 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1999
2000 if (s->buffer_attr.prebuf > 0)
2001 check_smoother_status(s, FALSE, FALSE, TRUE);
2002
2003 /* This will change the write index, but leave the
2004 * read index untouched. */
2005 invalidate_indexes(s, FALSE, TRUE);
2006
2007 } else
2008 /* For record streams this has no influence on the write
2009 * index, but the read index might jump. */
2010 invalidate_indexes(s, TRUE, FALSE);
2011
2012 return o;
2013 }
2014
2015 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2016 pa_operation *o;
2017
2018 pa_assert(s);
2019 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2020
2021 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2022 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2023 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2024 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2025
2026 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2027 return NULL;
2028
2029 /* This might cause the read index to hang again, hence
2030 * let's request a timing update */
2031 request_auto_timing_update(s, TRUE);
2032
2033 return o;
2034 }
2035
2036 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2037 pa_operation *o;
2038
2039 pa_assert(s);
2040 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2041
2042 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2043 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2044 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2045 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2046
2047 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2048 return NULL;
2049
2050 /* This might cause the read index to start moving again, hence
2051 * let's request a timing update */
2052 request_auto_timing_update(s, TRUE);
2053
2054 return o;
2055 }
2056
2057 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2058 pa_operation *o;
2059
2060 pa_assert(s);
2061 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2062 pa_assert(name);
2063
2064 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2065 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2066 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2067
2068 if (s->context->version >= 13) {
2069 pa_proplist *p = pa_proplist_new();
2070
2071 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2072 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2073 pa_proplist_free(p);
2074 } else {
2075 pa_tagstruct *t;
2076 uint32_t tag;
2077
2078 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2079 t = pa_tagstruct_command(
2080 s->context,
2081 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2082 &tag);
2083 pa_tagstruct_putu32(t, s->channel);
2084 pa_tagstruct_puts(t, name);
2085 pa_pstream_send_tagstruct(s->context->pstream, t);
2086 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2087 }
2088
2089 return o;
2090 }
2091
2092 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2093 pa_usec_t usec;
2094
2095 pa_assert(s);
2096 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2097
2098 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2099 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2100 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2101 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2102 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2103 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2104
2105 if (s->smoother)
2106 usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
2107 else
2108 usec = calc_time(s, FALSE);
2109
2110 /* Make sure the time runs monotonically */
2111 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2112 if (usec < s->previous_time)
2113 usec = s->previous_time;
2114 else
2115 s->previous_time = usec;
2116 }
2117
2118 if (r_usec)
2119 *r_usec = usec;
2120
2121 return 0;
2122 }
2123
2124 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2125 pa_assert(s);
2126 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2127
2128 if (negative)
2129 *negative = 0;
2130
2131 if (a >= b)
2132 return a-b;
2133 else {
2134 if (negative && s->direction == PA_STREAM_RECORD) {
2135 *negative = 1;
2136 return b-a;
2137 } else
2138 return 0;
2139 }
2140 }
2141
2142 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2143 pa_usec_t t, c;
2144 int r;
2145 int64_t cindex;
2146
2147 pa_assert(s);
2148 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2149 pa_assert(r_usec);
2150
2151 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2152 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2153 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2154 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2155 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2156 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2157
2158 if ((r = pa_stream_get_time(s, &t)) < 0)
2159 return r;
2160
2161 if (s->direction == PA_STREAM_PLAYBACK)
2162 cindex = s->timing_info.write_index;
2163 else
2164 cindex = s->timing_info.read_index;
2165
2166 if (cindex < 0)
2167 cindex = 0;
2168
2169 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2170
2171 if (s->direction == PA_STREAM_PLAYBACK)
2172 *r_usec = time_counter_diff(s, c, t, negative);
2173 else
2174 *r_usec = time_counter_diff(s, t, c, negative);
2175
2176 return 0;
2177 }
2178
2179 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2180 pa_assert(s);
2181 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2182
2183 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2184 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2185 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2186 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2187
2188 return &s->timing_info;
2189 }
2190
2191 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2192 pa_assert(s);
2193 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2194
2195 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2196
2197 return &s->sample_spec;
2198 }
2199
2200 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2201 pa_assert(s);
2202 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2203
2204 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2205
2206 return &s->channel_map;
2207 }
2208
2209 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2210 pa_assert(s);
2211 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2212
2213 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2214 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2215 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2216
2217 return &s->buffer_attr;
2218 }
2219
2220 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2221 pa_operation *o = userdata;
2222 int success = 1;
2223
2224 pa_assert(pd);
2225 pa_assert(o);
2226 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2227
2228 if (!o->context)
2229 goto finish;
2230
2231 if (command != PA_COMMAND_REPLY) {
2232 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2233 goto finish;
2234
2235 success = 0;
2236 } else {
2237 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2238 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2239 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2240 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2241 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2242 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2243 goto finish;
2244 }
2245 } else if (o->stream->direction == PA_STREAM_RECORD) {
2246 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2247 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2248 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2249 goto finish;
2250 }
2251 }
2252
2253 if (o->stream->context->version >= 13) {
2254 pa_usec_t usec;
2255
2256 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2257 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2258 goto finish;
2259 }
2260
2261 if (o->stream->direction == PA_STREAM_RECORD)
2262 o->stream->timing_info.configured_source_usec = usec;
2263 else
2264 o->stream->timing_info.configured_sink_usec = usec;
2265 }
2266
2267 if (!pa_tagstruct_eof(t)) {
2268 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2269 goto finish;
2270 }
2271 }
2272
2273 if (o->callback) {
2274 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2275 cb(o->stream, success, o->userdata);
2276 }
2277
2278 finish:
2279 pa_operation_done(o);
2280 pa_operation_unref(o);
2281 }
2282
2283
2284 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2285 pa_operation *o;
2286 pa_tagstruct *t;
2287 uint32_t tag;
2288
2289 pa_assert(s);
2290 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2291 pa_assert(attr);
2292
2293 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2294 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2295 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2296 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2297
2298 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2299
2300 t = pa_tagstruct_command(
2301 s->context,
2302 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2303 &tag);
2304 pa_tagstruct_putu32(t, s->channel);
2305
2306 pa_tagstruct_putu32(t, attr->maxlength);
2307
2308 if (s->direction == PA_STREAM_PLAYBACK)
2309 pa_tagstruct_put(
2310 t,
2311 PA_TAG_U32, attr->tlength,
2312 PA_TAG_U32, attr->prebuf,
2313 PA_TAG_U32, attr->minreq,
2314 PA_TAG_INVALID);
2315 else
2316 pa_tagstruct_putu32(t, attr->fragsize);
2317
2318 if (s->context->version >= 13)
2319 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2320
2321 if (s->context->version >= 14)
2322 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2323
2324 pa_pstream_send_tagstruct(s->context->pstream, t);
2325 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2326
2327 /* This might cause changes in the read/write indexex, hence let's
2328 * request a timing update */
2329 request_auto_timing_update(s, TRUE);
2330
2331 return o;
2332 }
2333
2334 uint32_t pa_stream_get_device_index(pa_stream *s) {
2335 pa_assert(s);
2336 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2337
2338 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2339 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2340 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2341 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2342 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2343
2344 return s->device_index;
2345 }
2346
2347 const char *pa_stream_get_device_name(pa_stream *s) {
2348 pa_assert(s);
2349 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2350
2351 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2352 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2353 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2354 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2355 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2356
2357 return s->device_name;
2358 }
2359
2360 int pa_stream_is_suspended(pa_stream *s) {
2361 pa_assert(s);
2362 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2363
2364 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2365 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2366 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2367 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2368
2369 return s->suspended;
2370 }
2371
2372 int pa_stream_is_corked(pa_stream *s) {
2373 pa_assert(s);
2374 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2375
2376 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2377 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2378 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2379
2380 return s->corked;
2381 }
2382
2383 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2384 pa_operation *o = userdata;
2385 int success = 1;
2386
2387 pa_assert(pd);
2388 pa_assert(o);
2389 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2390
2391 if (!o->context)
2392 goto finish;
2393
2394 if (command != PA_COMMAND_REPLY) {
2395 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2396 goto finish;
2397
2398 success = 0;
2399 } else {
2400
2401 if (!pa_tagstruct_eof(t)) {
2402 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2403 goto finish;
2404 }
2405 }
2406
2407 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2408 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2409
2410 if (o->callback) {
2411 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2412 cb(o->stream, success, o->userdata);
2413 }
2414
2415 finish:
2416 pa_operation_done(o);
2417 pa_operation_unref(o);
2418 }
2419
2420
2421 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2422 pa_operation *o;
2423 pa_tagstruct *t;
2424 uint32_t tag;
2425
2426 pa_assert(s);
2427 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2428
2429 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2430 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2431 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2432 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2433 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2434 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2435
2436 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2437 o->private = PA_UINT_TO_PTR(rate);
2438
2439 t = pa_tagstruct_command(
2440 s->context,
2441 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2442 &tag);
2443 pa_tagstruct_putu32(t, s->channel);
2444 pa_tagstruct_putu32(t, rate);
2445
2446 pa_pstream_send_tagstruct(s->context->pstream, t);
2447 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2448
2449 return o;
2450 }
2451
2452 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2453 pa_operation *o;
2454 pa_tagstruct *t;
2455 uint32_t tag;
2456
2457 pa_assert(s);
2458 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2459
2460 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2461 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2462 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2463 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2464 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2465
2466 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2467
2468 t = pa_tagstruct_command(
2469 s->context,
2470 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2471 &tag);
2472 pa_tagstruct_putu32(t, s->channel);
2473 pa_tagstruct_putu32(t, (uint32_t) mode);
2474 pa_tagstruct_put_proplist(t, p);
2475
2476 pa_pstream_send_tagstruct(s->context->pstream, t);
2477 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2478
2479 /* Please note that we don't update s->proplist here, because we
2480 * don't export that field */
2481
2482 return o;
2483 }
2484
2485 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2486 pa_operation *o;
2487 pa_tagstruct *t;
2488 uint32_t tag;
2489 const char * const*k;
2490
2491 pa_assert(s);
2492 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2493
2494 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2495 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2496 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2497 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2498 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2499
2500 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2501
2502 t = pa_tagstruct_command(
2503 s->context,
2504 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2505 &tag);
2506 pa_tagstruct_putu32(t, s->channel);
2507
2508 for (k = keys; *k; k++)
2509 pa_tagstruct_puts(t, *k);
2510
2511 pa_tagstruct_puts(t, NULL);
2512
2513 pa_pstream_send_tagstruct(s->context->pstream, t);
2514 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2515
2516 /* Please note that we don't update s->proplist here, because we
2517 * don't export that field */
2518
2519 return o;
2520 }
2521
2522 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2523 pa_assert(s);
2524 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2525
2526 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2527 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2528 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2529 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2530
2531 s->direct_on_input = sink_input_idx;
2532
2533 return 0;
2534 }
2535
2536 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2537 pa_assert(s);
2538 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2539
2540 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2541 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2542 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2543
2544 return s->direct_on_input;
2545 }