]> code.delx.au - pulseaudio/blob - src/polyp/stream.c
add notification callback which is called when new latency data becomes available
[pulseaudio] / src / polyp / stream.c
1 /* $Id$ */
2
3 /***
4 This file is part of polypaudio.
5
6 polypaudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2 of the License,
9 or (at your option) any later version.
10
11 polypaudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with polypaudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <assert.h>
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <polyp/def.h>
32 #include <polypcore/xmalloc.h>
33 #include <polypcore/pstream-util.h>
34 #include <polypcore/util.h>
35 #include <polypcore/log.h>
36 #include <polypcore/hashmap.h>
37
38 #include "internal.h"
39
40 #define LATENCY_IPOL_INTERVAL_USEC (10000L)
41
42 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
43 pa_stream *s;
44 int i;
45
46 assert(c);
47
48 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
49 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
50
51 s = pa_xnew(pa_stream, 1);
52 s->ref = 1;
53 s->context = c;
54 s->mainloop = c->mainloop;
55
56 s->read_callback = NULL;
57 s->read_userdata = NULL;
58 s->write_callback = NULL;
59 s->write_userdata = NULL;
60 s->state_callback = NULL;
61 s->state_userdata = NULL;
62 s->overflow_callback = NULL;
63 s->overflow_userdata = NULL;
64 s->underflow_callback = NULL;
65 s->underflow_userdata = NULL;
66 s->latency_update_callback = NULL;
67 s->latency_update_userdata = NULL;
68
69 s->direction = PA_STREAM_NODIRECTION;
70 s->name = pa_xstrdup(name);
71 s->sample_spec = *ss;
72 s->flags = 0;
73
74 if (map)
75 s->channel_map = *map;
76 else
77 pa_channel_map_init_auto(&s->channel_map, ss->channels);
78
79 s->channel = 0;
80 s->channel_valid = 0;
81 s->syncid = c->csyncid++;
82 s->device_index = PA_INVALID_INDEX;
83 s->requested_bytes = 0;
84 s->state = PA_STREAM_UNCONNECTED;
85 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
86
87 s->peek_memchunk.index = 0;
88 s->peek_memchunk.length = 0;
89 s->peek_memchunk.memblock = NULL;
90
91 s->record_memblockq = NULL;
92
93 s->previous_time = 0;
94 s->timing_info_valid = 0;
95 s->read_index_not_before = 0;
96 s->write_index_not_before = 0;
97
98 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
99 s->write_index_corrections[i].valid = 0;
100 s->current_write_index_correction = 0;
101
102 s->corked = 0;
103
104 s->ipol_usec_valid = 0;
105 s->ipol_timestamp.tv_sec = 0;
106 s->ipol_timestamp.tv_usec = 0;
107
108 s->auto_timing_update_event = NULL;
109 s->auto_timing_update_requested = 0;
110
111 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
112 PA_LLIST_PREPEND(pa_stream, c->streams, s);
113 pa_stream_ref(s);
114
115 return s;
116 }
117
118 static void stream_free(pa_stream *s) {
119 assert(s && !s->context && !s->channel_valid);
120
121 if (s->auto_timing_update_event) {
122 assert(s->mainloop);
123 s->mainloop->time_free(s->auto_timing_update_event);
124 }
125
126 if (s->peek_memchunk.memblock)
127 pa_memblock_unref(s->peek_memchunk.memblock);
128
129 if (s->record_memblockq)
130 pa_memblockq_free(s->record_memblockq);
131
132 pa_xfree(s->name);
133 pa_xfree(s);
134 }
135
136 void pa_stream_unref(pa_stream *s) {
137 assert(s);
138 assert(s->ref >= 1);
139
140 if (--(s->ref) == 0)
141 stream_free(s);
142 }
143
144 pa_stream* pa_stream_ref(pa_stream *s) {
145 assert(s);
146 assert(s->ref >= 1);
147
148 s->ref++;
149 return s;
150 }
151
152 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
153 assert(s);
154 assert(s->ref >= 1);
155
156 return s->state;
157 }
158
159 pa_context* pa_stream_get_context(pa_stream *s) {
160 assert(s);
161 assert(s->ref >= 1);
162
163 return s->context;
164 }
165
166 uint32_t pa_stream_get_index(pa_stream *s) {
167 assert(s);
168 assert(s->ref >= 1);
169
170 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
171
172 return s->device_index;
173 }
174
175 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
176 assert(s);
177 assert(s->ref >= 1);
178
179 if (s->state == st)
180 return;
181
182 pa_stream_ref(s);
183
184 s->state = st;
185 if (s->state_callback)
186 s->state_callback(s, s->state_userdata);
187
188 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED) && s->context) {
189
190 /* Detach from context */
191 pa_operation *o, *n;
192
193 /* Unref all operatio object that point to us */
194 for (o = s->context->operations; o; o = n) {
195 n = o->next;
196
197 if (o->stream == s)
198 pa_operation_cancel(o);
199 }
200
201 /* Drop all outstanding replies for this stream */
202 if (s->context->pdispatch)
203 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
204
205 if (s->channel_valid)
206 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
207
208 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
209 pa_stream_unref(s);
210
211 s->channel = 0;
212 s->channel_valid = 0;
213
214 s->context = NULL;
215 }
216
217 pa_stream_unref(s);
218 }
219
220 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
221 pa_context *c = userdata;
222 pa_stream *s;
223 uint32_t channel;
224
225 assert(pd);
226 assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
227 assert(t);
228 assert(c);
229
230 pa_context_ref(c);
231
232 if (pa_tagstruct_getu32(t, &channel) < 0 ||
233 !pa_tagstruct_eof(t)) {
234 pa_context_fail(c, PA_ERR_PROTOCOL);
235 goto finish;
236 }
237
238 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
239 goto finish;
240
241 pa_context_set_error(c, PA_ERR_KILLED);
242 pa_stream_set_state(s, PA_STREAM_FAILED);
243
244 finish:
245 pa_context_unref(c);
246 }
247
248 void pa_command_request(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
249 pa_stream *s;
250 pa_context *c = userdata;
251 uint32_t bytes, channel;
252
253 assert(pd);
254 assert(command == PA_COMMAND_REQUEST);
255 assert(t);
256 assert(c);
257
258 pa_context_ref(c);
259
260 if (pa_tagstruct_getu32(t, &channel) < 0 ||
261 pa_tagstruct_getu32(t, &bytes) < 0 ||
262 !pa_tagstruct_eof(t)) {
263 pa_context_fail(c, PA_ERR_PROTOCOL);
264 goto finish;
265 }
266
267 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
268 goto finish;
269
270 if (s->state == PA_STREAM_READY) {
271 s->requested_bytes += bytes;
272
273 if (s->requested_bytes > 0 && s->write_callback)
274 s->write_callback(s, s->requested_bytes, s->write_userdata);
275 }
276
277 finish:
278 pa_context_unref(c);
279 }
280
281 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
282 pa_stream *s;
283 pa_context *c = userdata;
284 uint32_t channel;
285
286 assert(pd);
287 assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
288 assert(t);
289 assert(c);
290
291 pa_context_ref(c);
292
293 if (pa_tagstruct_getu32(t, &channel) < 0 ||
294 !pa_tagstruct_eof(t)) {
295 pa_context_fail(c, PA_ERR_PROTOCOL);
296 goto finish;
297 }
298
299 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
300 goto finish;
301
302 if (s->state == PA_STREAM_READY) {
303
304 if (command == PA_COMMAND_OVERFLOW) {
305 if (s->overflow_callback)
306 s->overflow_callback(s, s->overflow_userdata);
307 } else if (command == PA_COMMAND_UNDERFLOW) {
308 if (s->underflow_callback)
309 s->underflow_callback(s, s->underflow_userdata);
310 }
311 }
312
313 finish:
314 pa_context_unref(c);
315 }
316
317 static void request_auto_timing_update(pa_stream *s, int force) {
318 struct timeval next;
319 assert(s);
320
321 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
322 return;
323
324 if (s->state == PA_STREAM_READY &&
325 (force || !s->auto_timing_update_requested)) {
326 pa_operation *o;
327
328 /* pa_log("automatically requesting new timing data"); */
329
330 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
331 pa_operation_unref(o);
332 s->auto_timing_update_requested = 1;
333 }
334 }
335
336 pa_gettimeofday(&next);
337 pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
338 s->mainloop->time_restart(s->auto_timing_update_event, &next);
339 }
340
341 static void invalidate_indexes(pa_stream *s, int r, int w) {
342 assert(s);
343
344 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
345
346 if (s->state != PA_STREAM_READY)
347 return;
348
349 if (w) {
350 s->write_index_not_before = s->context->ctag;
351
352 if (s->timing_info_valid)
353 s->timing_info.write_index_corrupt = 1;
354
355 /* pa_log("write_index invalidated"); */
356 }
357
358 if (r) {
359 s->read_index_not_before = s->context->ctag;
360
361 if (s->timing_info_valid)
362 s->timing_info.read_index_corrupt = 1;
363
364 /* pa_log("read_index invalidated"); */
365 }
366
367 if ((s->direction == PA_STREAM_PLAYBACK && r) ||
368 (s->direction == PA_STREAM_RECORD && w))
369 s->ipol_usec_valid = 0;
370
371 request_auto_timing_update(s, 1);
372 }
373
374 static void auto_timing_update_callback(PA_GCC_UNUSED pa_mainloop_api *m, PA_GCC_UNUSED pa_time_event *e, PA_GCC_UNUSED const struct timeval *tv, void *userdata) {
375 pa_stream *s = userdata;
376
377 /* pa_log("time event"); */
378
379 pa_stream_ref(s);
380 request_auto_timing_update(s, 0);
381 pa_stream_unref(s);
382 }
383
384 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
385 pa_stream *s = userdata;
386
387 assert(pd);
388 assert(s);
389 assert(s->state == PA_STREAM_CREATING);
390
391 pa_stream_ref(s);
392
393 if (command != PA_COMMAND_REPLY) {
394 if (pa_context_handle_error(s->context, command, t) < 0)
395 goto finish;
396
397 pa_stream_set_state(s, PA_STREAM_FAILED);
398 goto finish;
399 }
400
401 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
402 ((s->direction != PA_STREAM_UPLOAD) && pa_tagstruct_getu32(t, &s->device_index) < 0) ||
403 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0) ||
404 !pa_tagstruct_eof(t)) {
405 pa_context_fail(s->context, PA_ERR_PROTOCOL);
406 goto finish;
407 }
408
409 if (s->direction == PA_STREAM_RECORD) {
410 assert(!s->record_memblockq);
411
412 s->record_memblockq = pa_memblockq_new(
413 0,
414 s->buffer_attr.maxlength,
415 0,
416 pa_frame_size(&s->sample_spec),
417 1,
418 0,
419 NULL,
420 s->context->memblock_stat);
421 }
422
423 s->channel_valid = 1;
424 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
425
426 pa_stream_set_state(s, PA_STREAM_READY);
427
428 if (s->direction != PA_STREAM_UPLOAD &&
429 s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
430 struct timeval tv;
431
432 pa_gettimeofday(&tv);
433 tv.tv_usec += LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
434
435 assert(!s->auto_timing_update_event);
436 s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
437
438 request_auto_timing_update(s, 1);
439 }
440
441 if (s->requested_bytes > 0 && s->ref > 1 && s->write_callback)
442 s->write_callback(s, s->requested_bytes, s->write_userdata);
443
444 finish:
445 pa_stream_unref(s);
446 }
447
448 static int create_stream(
449 pa_stream_direction_t direction,
450 pa_stream *s,
451 const char *dev,
452 const pa_buffer_attr *attr,
453 pa_stream_flags_t flags,
454 const pa_cvolume *volume,
455 pa_stream *sync_stream) {
456
457 pa_tagstruct *t;
458 uint32_t tag;
459
460 assert(s);
461 assert(s->ref >= 1);
462
463 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
464 PA_CHECK_VALIDITY(s->context, !(flags & ~((direction != PA_STREAM_UPLOAD ?
465 PA_STREAM_START_CORKED|
466 PA_STREAM_INTERPOLATE_TIMING|
467 PA_STREAM_NOT_MONOTONOUS|
468 PA_STREAM_AUTO_TIMING_UPDATE : 0))), PA_ERR_INVALID);
469 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
470 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
471
472 pa_stream_ref(s);
473
474 s->direction = direction;
475 s->flags = flags;
476
477 if (sync_stream)
478 s->syncid = sync_stream->syncid;
479
480 if (attr)
481 s->buffer_attr = *attr;
482 else {
483 /* half a second */
484 s->buffer_attr.tlength = pa_bytes_per_second(&s->sample_spec)/2;
485 s->buffer_attr.maxlength = (s->buffer_attr.tlength*3)/2;
486 s->buffer_attr.minreq = s->buffer_attr.tlength/100;
487 s->buffer_attr.prebuf = s->buffer_attr.tlength - s->buffer_attr.minreq;
488 s->buffer_attr.fragsize = s->buffer_attr.tlength/100;
489 }
490
491 if (!dev)
492 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
493
494 t = pa_tagstruct_command(
495 s->context,
496 s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM,
497 &tag);
498
499 pa_tagstruct_put(
500 t,
501 PA_TAG_STRING, s->name,
502 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
503 PA_TAG_CHANNEL_MAP, &s->channel_map,
504 PA_TAG_U32, PA_INVALID_INDEX,
505 PA_TAG_STRING, dev,
506 PA_TAG_U32, s->buffer_attr.maxlength,
507 PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED),
508 PA_TAG_INVALID);
509
510 if (s->direction == PA_STREAM_PLAYBACK) {
511 pa_cvolume cv;
512
513 pa_tagstruct_put(
514 t,
515 PA_TAG_U32, s->buffer_attr.tlength,
516 PA_TAG_U32, s->buffer_attr.prebuf,
517 PA_TAG_U32, s->buffer_attr.minreq,
518 PA_TAG_U32, s->syncid,
519 PA_TAG_INVALID);
520
521 if (!volume)
522 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
523
524 pa_tagstruct_put_cvolume(t, volume);
525 } else
526 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
527
528 pa_pstream_send_tagstruct(s->context->pstream, t);
529 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
530
531 pa_stream_set_state(s, PA_STREAM_CREATING);
532
533 pa_stream_unref(s);
534 return 0;
535 }
536
537 int pa_stream_connect_playback(
538 pa_stream *s,
539 const char *dev,
540 const pa_buffer_attr *attr,
541 pa_stream_flags_t flags,
542 pa_cvolume *volume,
543 pa_stream *sync_stream) {
544
545 assert(s);
546 assert(s->ref >= 1);
547
548 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
549 }
550
551 int pa_stream_connect_record(
552 pa_stream *s,
553 const char *dev,
554 const pa_buffer_attr *attr,
555 pa_stream_flags_t flags) {
556
557 assert(s);
558 assert(s->ref >= 1);
559
560 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
561 }
562
563 int pa_stream_write(
564 pa_stream *s,
565 const void *data,
566 size_t length,
567 void (*free_cb)(void *p),
568 int64_t offset,
569 pa_seek_mode_t seek) {
570
571 pa_memchunk chunk;
572
573 assert(s);
574 assert(s->ref >= 1);
575 assert(data);
576
577 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
578 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
579 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
580 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
581
582 if (length <= 0)
583 return 0;
584
585 if (free_cb)
586 chunk.memblock = pa_memblock_new_user((void*) data, length, free_cb, 1, s->context->memblock_stat);
587 else {
588 chunk.memblock = pa_memblock_new(length, s->context->memblock_stat);
589 memcpy(chunk.memblock->data, data, length);
590 }
591
592 chunk.index = 0;
593 chunk.length = length;
594
595 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
596 pa_memblock_unref(chunk.memblock);
597
598 if (length < s->requested_bytes)
599 s->requested_bytes -= length;
600 else
601 s->requested_bytes = 0;
602
603 if (s->direction == PA_STREAM_PLAYBACK) {
604
605 /* Update latency request correction */
606 if (s->write_index_corrections[s->current_write_index_correction].valid) {
607
608 if (seek == PA_SEEK_ABSOLUTE) {
609 s->write_index_corrections[s->current_write_index_correction].corrupt = 0;
610 s->write_index_corrections[s->current_write_index_correction].absolute = 1;
611 s->write_index_corrections[s->current_write_index_correction].value = offset + length;
612 } else if (seek == PA_SEEK_RELATIVE) {
613 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
614 s->write_index_corrections[s->current_write_index_correction].value += offset + length;
615 } else
616 s->write_index_corrections[s->current_write_index_correction].corrupt = 1;
617 }
618
619 /* Update the write index in the already available latency data */
620 if (s->timing_info_valid) {
621
622 if (seek == PA_SEEK_ABSOLUTE) {
623 s->timing_info.write_index_corrupt = 0;
624 s->timing_info.write_index = offset + length;
625 } else if (seek == PA_SEEK_RELATIVE) {
626 if (!s->timing_info.write_index_corrupt)
627 s->timing_info.write_index += offset + length;
628 } else
629 s->timing_info.write_index_corrupt = 1;
630 }
631
632 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
633 request_auto_timing_update(s, 1);
634 }
635
636 return 0;
637 }
638
639 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
640 assert(s);
641 assert(s->ref >= 1);
642 assert(data);
643 assert(length);
644
645 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
646 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
647
648 if (!s->peek_memchunk.memblock) {
649
650 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
651 *data = NULL;
652 *length = 0;
653 return 0;
654 }
655 }
656
657 *data = (const char*) s->peek_memchunk.memblock->data + s->peek_memchunk.index;
658 *length = s->peek_memchunk.length;
659 return 0;
660 }
661
662 int pa_stream_drop(pa_stream *s) {
663 assert(s);
664 assert(s->ref >= 1);
665
666 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
667 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
668 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
669
670 pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length);
671
672 /* Fix the simulated local read index */
673 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
674 s->timing_info.read_index += s->peek_memchunk.length;
675
676 pa_memblock_unref(s->peek_memchunk.memblock);
677 s->peek_memchunk.length = 0;
678 s->peek_memchunk.index = 0;
679 s->peek_memchunk.memblock = NULL;
680
681 return 0;
682 }
683
684 size_t pa_stream_writable_size(pa_stream *s) {
685 assert(s);
686 assert(s->ref >= 1);
687
688 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
689 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
690
691 return s->requested_bytes;
692 }
693
694 size_t pa_stream_readable_size(pa_stream *s) {
695 assert(s);
696 assert(s->ref >= 1);
697
698 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
699 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
700
701 return pa_memblockq_get_length(s->record_memblockq);
702 }
703
704 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
705 pa_operation *o;
706 pa_tagstruct *t;
707 uint32_t tag;
708
709 assert(s);
710 assert(s->ref >= 1);
711
712 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
713 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
714
715 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
716
717 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
718 pa_tagstruct_putu32(t, s->channel);
719 pa_pstream_send_tagstruct(s->context->pstream, t);
720 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
721
722 return o;
723 }
724
725 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
726 pa_operation *o = userdata;
727 struct timeval local, remote, now;
728 pa_timing_info *i;
729
730 assert(pd);
731 assert(o);
732
733 if (!o->context || !o->stream)
734 goto finish;
735
736 i = &o->stream->timing_info;
737
738 /* pa_log("pre corrupt w:%u r:%u\n", !o->stream->timing_info_valid || i->write_index_corrupt,!o->stream->timing_info_valid || i->read_index_corrupt); */
739
740 o->stream->timing_info_valid = 0;
741 i->write_index_corrupt = 0;
742 i->read_index_corrupt = 0;
743
744 /* pa_log("timing update %u\n", tag); */
745
746 if (command != PA_COMMAND_REPLY) {
747 if (pa_context_handle_error(o->context, command, t) < 0)
748 goto finish;
749
750 } else if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
751 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
752 pa_tagstruct_get_boolean(t, &i->playing) < 0 ||
753 pa_tagstruct_get_timeval(t, &local) < 0 ||
754 pa_tagstruct_get_timeval(t, &remote) < 0 ||
755 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
756 pa_tagstruct_gets64(t, &i->read_index) < 0 ||
757 !pa_tagstruct_eof(t)) {
758 pa_context_fail(o->context, PA_ERR_PROTOCOL);
759 goto finish;
760
761 } else {
762 o->stream->timing_info_valid = 1;
763
764 pa_gettimeofday(&now);
765
766 /* Calculcate timestamps */
767 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
768 /* local and remote seem to have synchronized clocks */
769
770 if (o->stream->direction == PA_STREAM_PLAYBACK)
771 i->transport_usec = pa_timeval_diff(&remote, &local);
772 else
773 i->transport_usec = pa_timeval_diff(&now, &remote);
774
775 i->synchronized_clocks = 1;
776 i->timestamp = remote;
777 } else {
778 /* clocks are not synchronized, let's estimate latency then */
779 i->transport_usec = pa_timeval_diff(&now, &local)/2;
780 i->synchronized_clocks = 0;
781 i->timestamp = local;
782 pa_timeval_add(&i->timestamp, i->transport_usec);
783 }
784
785 /* Invalidate read and write indexes if necessary */
786 if (tag < o->stream->read_index_not_before)
787 i->read_index_corrupt = 1;
788
789 if (tag < o->stream->write_index_not_before)
790 i->write_index_corrupt = 1;
791
792 if (o->stream->direction == PA_STREAM_PLAYBACK) {
793 /* Write index correction */
794
795 int n, j;
796 uint32_t ctag = tag;
797
798 /* Go through the saved correction values and add up the total correction.*/
799
800 for (n = 0, j = o->stream->current_write_index_correction+1;
801 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
802 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
803
804 /* Step over invalid data or out-of-date data */
805 if (!o->stream->write_index_corrections[j].valid ||
806 o->stream->write_index_corrections[j].tag < ctag)
807 continue;
808
809 /* Make sure that everything is in order */
810 ctag = o->stream->write_index_corrections[j].tag+1;
811
812 /* Now fix the write index */
813 if (o->stream->write_index_corrections[j].corrupt) {
814 /* A corrupting seek was made */
815 i->write_index = 0;
816 i->write_index_corrupt = 1;
817 } else if (o->stream->write_index_corrections[j].absolute) {
818 /* An absolute seek was made */
819 i->write_index = o->stream->write_index_corrections[j].value;
820 i->write_index_corrupt = 0;
821 } else if (!i->write_index_corrupt) {
822 /* A relative seek was made */
823 i->write_index += o->stream->write_index_corrections[j].value;
824 }
825 }
826 }
827
828 if (o->stream->direction == PA_STREAM_RECORD) {
829 /* Read index correction */
830
831 if (!i->read_index_corrupt)
832 i->read_index -= pa_memblockq_get_length(o->stream->record_memblockq);
833 }
834
835 o->stream->ipol_timestamp = now;
836 o->stream->ipol_usec_valid = 0;
837 }
838
839 o->stream->auto_timing_update_requested = 0;
840 /* pa_log("post corrupt w:%u r:%u\n", i->write_index_corrupt || !o->stream->timing_info_valid, i->read_index_corrupt || !o->stream->timing_info_valid); */
841
842 /* Clear old correction entries */
843 if (o->stream->direction == PA_STREAM_PLAYBACK) {
844 int n;
845
846 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
847 if (!o->stream->write_index_corrections[n].valid)
848 continue;
849
850 if (o->stream->write_index_corrections[n].tag <= tag)
851 o->stream->write_index_corrections[n].valid = 0;
852 }
853 }
854
855 if (o->stream->latency_update_callback)
856 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
857
858 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
859 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
860 cb(o->stream, o->stream->timing_info_valid, o->userdata);
861 }
862
863 finish:
864
865 pa_operation_done(o);
866 pa_operation_unref(o);
867 }
868
869 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
870 uint32_t tag;
871 pa_operation *o;
872 pa_tagstruct *t;
873 struct timeval now;
874 int cidx;
875
876 assert(s);
877 assert(s->ref >= 1);
878
879 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
880 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
881
882 if (s->direction == PA_STREAM_PLAYBACK) {
883 /* Find a place to store the write_index correction data for this entry */
884 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
885
886 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
887 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
888 }
889 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
890
891 t = pa_tagstruct_command(
892 s->context,
893 s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY,
894 &tag);
895 pa_tagstruct_putu32(t, s->channel);
896 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
897
898 pa_pstream_send_tagstruct(s->context->pstream, t);
899 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
900
901 if (s->direction == PA_STREAM_PLAYBACK) {
902 /* Fill in initial correction data */
903 o->stream->current_write_index_correction = cidx;
904 o->stream->write_index_corrections[cidx].valid = 1;
905 o->stream->write_index_corrections[cidx].tag = tag;
906 o->stream->write_index_corrections[cidx].absolute = 0;
907 o->stream->write_index_corrections[cidx].value = 0;
908 o->stream->write_index_corrections[cidx].corrupt = 0;
909 }
910
911 /* pa_log("requesting update %u\n", tag); */
912
913 return o;
914 }
915
916 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
917 pa_stream *s = userdata;
918
919 assert(pd);
920 assert(s);
921 assert(s->ref >= 1);
922
923 pa_stream_ref(s);
924
925 if (command != PA_COMMAND_REPLY) {
926 if (pa_context_handle_error(s->context, command, t) < 0)
927 goto finish;
928
929 pa_stream_set_state(s, PA_STREAM_FAILED);
930 goto finish;
931 } else if (!pa_tagstruct_eof(t)) {
932 pa_context_fail(s->context, PA_ERR_PROTOCOL);
933 goto finish;
934 }
935
936 pa_stream_set_state(s, PA_STREAM_TERMINATED);
937
938 finish:
939 pa_stream_unref(s);
940 }
941
942 int pa_stream_disconnect(pa_stream *s) {
943 pa_tagstruct *t;
944 uint32_t tag;
945
946 assert(s);
947 assert(s->ref >= 1);
948
949 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
950 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
951
952 pa_stream_ref(s);
953
954 t = pa_tagstruct_command(
955 s->context,
956 s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
957 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM),
958 &tag);
959 pa_tagstruct_putu32(t, s->channel);
960 pa_pstream_send_tagstruct(s->context->pstream, t);
961 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
962
963 pa_stream_unref(s);
964 return 0;
965 }
966
967 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
968 assert(s);
969 assert(s->ref >= 1);
970
971 s->read_callback = cb;
972 s->read_userdata = userdata;
973 }
974
975 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
976 assert(s);
977 assert(s->ref >= 1);
978
979 s->write_callback = cb;
980 s->write_userdata = userdata;
981 }
982
983 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
984 assert(s);
985 assert(s->ref >= 1);
986
987 s->state_callback = cb;
988 s->state_userdata = userdata;
989 }
990
991 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
992 assert(s);
993 assert(s->ref >= 1);
994
995 s->overflow_callback = cb;
996 s->overflow_userdata = userdata;
997 }
998
999 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1000 assert(s);
1001 assert(s->ref >= 1);
1002
1003 s->underflow_callback = cb;
1004 s->underflow_userdata = userdata;
1005 }
1006
1007 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1008 assert(s);
1009 assert(s->ref >= 1);
1010
1011 s->latency_update_callback = cb;
1012 s->latency_update_userdata = userdata;
1013 }
1014
1015 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
1016 pa_operation *o = userdata;
1017 int success = 1;
1018
1019 assert(pd);
1020 assert(o);
1021 assert(o->ref >= 1);
1022
1023 if (!o->context)
1024 goto finish;
1025
1026 if (command != PA_COMMAND_REPLY) {
1027 if (pa_context_handle_error(o->context, command, t) < 0)
1028 goto finish;
1029
1030 success = 0;
1031 } else if (!pa_tagstruct_eof(t)) {
1032 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1033 goto finish;
1034 }
1035
1036 if (o->callback) {
1037 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1038 cb(o->stream, success, o->userdata);
1039 }
1040
1041 finish:
1042 pa_operation_done(o);
1043 pa_operation_unref(o);
1044 }
1045
1046 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1047 pa_operation *o;
1048 pa_tagstruct *t;
1049 uint32_t tag;
1050
1051 assert(s);
1052 assert(s->ref >= 1);
1053
1054 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1055 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1056
1057 s->corked = b;
1058
1059 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1060
1061 t = pa_tagstruct_command(
1062 s->context,
1063 s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM,
1064 &tag);
1065 pa_tagstruct_putu32(t, s->channel);
1066 pa_tagstruct_put_boolean(t, !!b);
1067 pa_pstream_send_tagstruct(s->context->pstream, t);
1068 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1069
1070 if (s->direction == PA_STREAM_PLAYBACK)
1071 invalidate_indexes(s, 1, 0);
1072
1073 return o;
1074 }
1075
1076 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1077 pa_tagstruct *t;
1078 pa_operation *o;
1079 uint32_t tag;
1080
1081 assert(s);
1082 assert(s->ref >= 1);
1083
1084 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1085
1086 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1087
1088 t = pa_tagstruct_command(s->context, command, &tag);
1089 pa_tagstruct_putu32(t, s->channel);
1090 pa_pstream_send_tagstruct(s->context->pstream, t);
1091 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1092
1093 return o;
1094 }
1095
1096 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1097 pa_operation *o;
1098
1099 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1100
1101 if ((o = stream_send_simple_command(s, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM, cb, userdata))) {
1102
1103 if (s->direction == PA_STREAM_PLAYBACK) {
1104 if (s->write_index_corrections[s->current_write_index_correction].valid)
1105 s->write_index_corrections[s->current_write_index_correction].corrupt = 1;
1106
1107 if (s->timing_info_valid)
1108 s->timing_info.write_index_corrupt = 1;
1109
1110 if (s->buffer_attr.prebuf > 0)
1111 invalidate_indexes(s, 1, 0);
1112 else
1113 request_auto_timing_update(s, 1);
1114 } else
1115 invalidate_indexes(s, 0, 1);
1116 }
1117
1118 return o;
1119 }
1120
1121 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1122 pa_operation *o;
1123
1124 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1125 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1126
1127 if ((o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
1128 invalidate_indexes(s, 1, 0);
1129
1130 return o;
1131 }
1132
1133 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1134 pa_operation *o;
1135
1136 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1137 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1138
1139 if ((o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
1140 invalidate_indexes(s, 1, 0);
1141
1142 return o;
1143 }
1144
1145 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
1146 pa_operation *o;
1147 pa_tagstruct *t;
1148 uint32_t tag;
1149
1150 assert(s);
1151 assert(s->ref >= 1);
1152 assert(name);
1153
1154 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1155 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1156
1157 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1158
1159 t = pa_tagstruct_command(
1160 s->context,
1161 s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME,
1162 &tag);
1163 pa_tagstruct_putu32(t, s->channel);
1164 pa_tagstruct_puts(t, name);
1165 pa_pstream_send_tagstruct(s->context->pstream, t);
1166 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1167
1168 return o;
1169 }
1170
1171 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
1172 pa_usec_t usec = 0;
1173
1174 assert(s);
1175 assert(s->ref >= 1);
1176
1177 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1178 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1179 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
1180 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
1181 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
1182
1183 if (s->flags & PA_STREAM_INTERPOLATE_TIMING && s->ipol_usec_valid)
1184 usec = s->ipol_usec;
1185 else {
1186 if (s->direction == PA_STREAM_PLAYBACK) {
1187 /* The last byte that was written into the output device
1188 * had this time value associated */
1189 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1190
1191 if (!s->corked) {
1192 /* Because the latency info took a little time to come
1193 * to us, we assume that the real output time is actually
1194 * a little ahead */
1195 usec += s->timing_info.transport_usec;
1196
1197 /* However, the output device usually maintains a buffer
1198 too, hence the real sample currently played is a little
1199 back */
1200 if (s->timing_info.sink_usec >= usec)
1201 usec = 0;
1202 else
1203 usec -= s->timing_info.sink_usec;
1204 }
1205
1206 } else if (s->direction == PA_STREAM_RECORD) {
1207 /* The last byte written into the server side queue had
1208 * this time value associated */
1209 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1210
1211 if (!s->corked) {
1212 /* Add transport latency */
1213 usec += s->timing_info.transport_usec;
1214
1215 /* Add latency of data in device buffer */
1216 usec += s->timing_info.source_usec;
1217
1218 /* If this is a monitor source, we need to correct the
1219 * time by the playback device buffer */
1220 if (s->timing_info.sink_usec >= usec)
1221 usec = 0;
1222 else
1223 usec -= s->timing_info.sink_usec;
1224 }
1225 }
1226
1227 if (s->flags & PA_STREAM_INTERPOLATE_TIMING) {
1228 s->ipol_usec = usec;
1229 s->ipol_usec_valid = 1;
1230 }
1231 }
1232
1233 /* Interpolate if requested */
1234 if (s->flags & PA_STREAM_INTERPOLATE_TIMING) {
1235
1236 /* We just add the time that passed since the latency info was
1237 * current */
1238 if (!s->corked) {
1239 struct timeval now;
1240
1241 usec += pa_timeval_diff(pa_gettimeofday(&now), &s->ipol_timestamp);
1242 s->ipol_timestamp = now;
1243 }
1244 }
1245
1246 /* Make sure the time runs monotonically */
1247 if (!(s->flags & PA_STREAM_NOT_MONOTONOUS)) {
1248 if (usec < s->previous_time)
1249 usec = s->previous_time;
1250 else
1251 s->previous_time = usec;
1252 }
1253
1254 if (r_usec)
1255 *r_usec = usec;
1256
1257 return 0;
1258 }
1259
1260 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
1261 assert(s);
1262 assert(s->ref >= 1);
1263
1264 if (negative)
1265 *negative = 0;
1266
1267 if (a >= b)
1268 return a-b;
1269 else {
1270 if (negative && s->direction == PA_STREAM_RECORD) {
1271 *negative = 1;
1272 return b-a;
1273 } else
1274 return 0;
1275 }
1276 }
1277
1278 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
1279 pa_usec_t t, c;
1280 int r;
1281 int64_t cindex;
1282
1283 assert(s);
1284 assert(s->ref >= 1);
1285 assert(r_usec);
1286
1287 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1288 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1289 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
1290 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
1291 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
1292
1293 if ((r = pa_stream_get_time(s, &t)) < 0)
1294 return r;
1295
1296 if (s->direction == PA_STREAM_PLAYBACK)
1297 cindex = s->timing_info.write_index;
1298 else
1299 cindex = s->timing_info.read_index;
1300
1301 if (cindex < 0)
1302 cindex = 0;
1303
1304 c = pa_bytes_to_usec(cindex, &s->sample_spec);
1305
1306 if (s->direction == PA_STREAM_PLAYBACK)
1307 *r_usec = time_counter_diff(s, c, t, negative);
1308 else
1309 *r_usec = time_counter_diff(s, t, c, negative);
1310
1311 return 0;
1312 }
1313
1314 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
1315 assert(s);
1316 assert(s->ref >= 1);
1317
1318 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1319 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1320 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_BADSTATE);
1321
1322 return &s->timing_info;
1323 }
1324
1325 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
1326 assert(s);
1327 assert(s->ref >= 1);
1328
1329 return &s->sample_spec;
1330 }
1331
1332 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
1333 assert(s);
1334 assert(s->ref >= 1);
1335
1336 return &s->channel_map;
1337 }