]> code.delx.au - pulseaudio/blob - src/polyp.c
fix recording for simpel and esound protocols
[pulseaudio] / src / polyp.c
1 #include <stdio.h>
2 #include <assert.h>
3 #include <stdlib.h>
4 #include <string.h>
5
6 #include "polyp.h"
7 #include "protocol-native-spec.h"
8 #include "pdispatch.h"
9 #include "pstream.h"
10 #include "dynarray.h"
11 #include "socket-client.h"
12 #include "pstream-util.h"
13 #include "authkey.h"
14 #include "util.h"
15
16 #define DEFAULT_MAXLENGTH 20480
17 #define DEFAULT_TLENGTH 10240
18 #define DEFAULT_PREBUF 4096
19 #define DEFAULT_MINREQ 1024
20
21 #define DEFAULT_TIMEOUT (5*60)
22 #define DEFAULT_SERVER "/tmp/polypaudio/native"
23
24 struct pa_context {
25 char *name;
26 struct pa_mainloop_api* mainloop;
27 struct pa_socket_client *client;
28 struct pa_pstream *pstream;
29 struct pa_pdispatch *pdispatch;
30 struct pa_dynarray *streams;
31 struct pa_stream *first_stream;
32 uint32_t ctag;
33 uint32_t error;
34 enum {
35 CONTEXT_UNCONNECTED,
36 CONTEXT_CONNECTING,
37 CONTEXT_AUTHORIZING,
38 CONTEXT_SETTING_NAME,
39 CONTEXT_READY,
40 CONTEXT_DEAD
41 } state;
42
43 void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata);
44 void *connect_complete_userdata;
45
46 void (*drain_complete_callback)(struct pa_context*c, void *userdata);
47 void *drain_complete_userdata;
48
49 void (*die_callback)(struct pa_context*c, void *userdata);
50 void *die_userdata;
51
52 uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
53 };
54
55 struct pa_stream {
56 struct pa_context *context;
57 struct pa_stream *next, *previous;
58
59 char *name;
60 struct pa_buffer_attr buffer_attr;
61 struct pa_sample_spec sample_spec;
62 uint32_t device_index;
63 uint32_t channel;
64 int channel_valid;
65 enum pa_stream_direction direction;
66
67 enum { STREAM_LOOKING_UP, STREAM_CREATING, STREAM_READY, STREAM_DEAD} state;
68 uint32_t requested_bytes;
69
70 void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata);
71 void *read_userdata;
72
73 void (*write_callback)(struct pa_stream *p, size_t length, void *userdata);
74 void *write_userdata;
75
76 void (*create_complete_callback)(struct pa_stream *s, int success, void *userdata);
77 void *create_complete_userdata;
78
79 void (*drain_complete_callback)(struct pa_stream *s, void *userdata);
80 void *drain_complete_userdata;
81
82 void (*die_callback)(struct pa_stream*c, void *userdata);
83 void *die_userdata;
84 };
85
86 static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
87
88 static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
89 [PA_COMMAND_ERROR] = { NULL },
90 [PA_COMMAND_REPLY] = { NULL },
91 [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { NULL },
92 [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { NULL },
93 [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL },
94 [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL },
95 [PA_COMMAND_EXIT] = { NULL },
96 [PA_COMMAND_REQUEST] = { command_request },
97 };
98
99 struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) {
100 struct pa_context *c;
101 assert(mainloop && name);
102
103 c = malloc(sizeof(struct pa_context));
104 assert(c);
105 c->name = strdup(name);
106 c->mainloop = mainloop;
107 c->client = NULL;
108 c->pstream = NULL;
109 c->pdispatch = NULL;
110 c->streams = pa_dynarray_new();
111 assert(c->streams);
112 c->first_stream = NULL;
113 c->error = PA_ERROR_OK;
114 c->state = CONTEXT_UNCONNECTED;
115 c->ctag = 0;
116
117 c->connect_complete_callback = NULL;
118 c->connect_complete_userdata = NULL;
119
120 c->drain_complete_callback = NULL;
121 c->drain_complete_userdata = NULL;
122
123 c->die_callback = NULL;
124 c->die_userdata = NULL;
125
126 pa_check_for_sigpipe();
127 return c;
128 }
129
130 void pa_context_free(struct pa_context *c) {
131 assert(c);
132
133 while (c->first_stream)
134 pa_stream_free(c->first_stream);
135
136 if (c->client)
137 pa_socket_client_free(c->client);
138 if (c->pdispatch)
139 pa_pdispatch_free(c->pdispatch);
140 if (c->pstream)
141 pa_pstream_free(c->pstream);
142 if (c->streams)
143 pa_dynarray_free(c->streams, NULL, NULL);
144
145 free(c->name);
146 free(c);
147 }
148
149 static void stream_dead(struct pa_stream *s) {
150 assert(s);
151
152 if (s->state == STREAM_DEAD)
153 return;
154
155 if (s->state == STREAM_READY) {
156 s->state = STREAM_DEAD;
157 if (s->die_callback)
158 s->die_callback(s, s->die_userdata);
159 } else
160 s->state = STREAM_DEAD;
161 }
162
163 static void context_dead(struct pa_context *c) {
164 struct pa_stream *s;
165 assert(c);
166
167 if (c->state == CONTEXT_DEAD)
168 return;
169
170 if (c->pdispatch)
171 pa_pdispatch_free(c->pdispatch);
172 c->pdispatch = NULL;
173
174 if (c->pstream)
175 pa_pstream_free(c->pstream);
176 c->pstream = NULL;
177
178 if (c->client)
179 pa_socket_client_free(c->client);
180 c->client = NULL;
181
182 for (s = c->first_stream; s; s = s->next)
183 stream_dead(s);
184
185 if (c->state == CONTEXT_READY) {
186 c->state = CONTEXT_DEAD;
187 if (c->die_callback)
188 c->die_callback(c, c->die_userdata);
189 } else
190 s->state = CONTEXT_DEAD;
191 }
192
193 static void pstream_die_callback(struct pa_pstream *p, void *userdata) {
194 struct pa_context *c = userdata;
195 assert(p && c);
196 context_dead(c);
197 }
198
199 static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
200 struct pa_context *c = userdata;
201 assert(p && packet && c);
202
203 if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
204 fprintf(stderr, "polyp.c: invalid packet.\n");
205 context_dead(c);
206 }
207 }
208
209 static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) {
210 struct pa_context *c = userdata;
211 struct pa_stream *s;
212 assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
213
214 if (!(s = pa_dynarray_get(c->streams, channel)))
215 return;
216
217 if (s->read_callback)
218 s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata);
219 }
220
221 static int handle_error(struct pa_context *c, uint32_t command, struct pa_tagstruct *t) {
222 assert(c && t);
223
224 if (command == PA_COMMAND_ERROR) {
225 if (pa_tagstruct_getu32(t, &c->error) < 0) {
226 c->error = PA_ERROR_PROTOCOL;
227 return -1;
228 }
229
230 return 0;
231 }
232
233 c->error = (command == PA_COMMAND_TIMEOUT) ? PA_ERROR_TIMEOUT : PA_ERROR_INTERNAL;
234 return -1;
235 }
236
237 static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
238 struct pa_context *c = userdata;
239 assert(pd && c && (c->state == CONTEXT_AUTHORIZING || c->state == CONTEXT_SETTING_NAME));
240
241 if (command != PA_COMMAND_REPLY) {
242 handle_error(c, command, t);
243 context_dead(c);
244
245 if (c->connect_complete_callback)
246 c->connect_complete_callback(c, 0, c->connect_complete_userdata);
247
248 return;
249 }
250
251 if (c->state == CONTEXT_AUTHORIZING) {
252 struct pa_tagstruct *t;
253 c->state = CONTEXT_SETTING_NAME;
254 t = pa_tagstruct_new(NULL, 0);
255 assert(t);
256 pa_tagstruct_putu32(t, PA_COMMAND_SET_NAME);
257 pa_tagstruct_putu32(t, tag = c->ctag++);
258 pa_tagstruct_puts(t, c->name);
259 pa_pstream_send_tagstruct(c->pstream, t);
260 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c);
261 } else {
262 assert(c->state == CONTEXT_SETTING_NAME);
263
264 c->state = CONTEXT_READY;
265
266 if (c->connect_complete_callback)
267 c->connect_complete_callback(c, 1, c->connect_complete_userdata);
268 }
269
270 return;
271 }
272
273 static void on_connection(struct pa_socket_client *client, struct pa_iochannel*io, void *userdata) {
274 struct pa_context *c = userdata;
275 struct pa_tagstruct *t;
276 uint32_t tag;
277 assert(client && io && c && c->state == CONTEXT_CONNECTING);
278
279 pa_socket_client_free(client);
280 c->client = NULL;
281
282 if (!io) {
283 c->error = PA_ERROR_CONNECTIONREFUSED;
284 context_dead(c);
285
286 if (c->connect_complete_callback)
287 c->connect_complete_callback(c, 0, c->connect_complete_userdata);
288
289 return;
290 }
291
292 c->pstream = pa_pstream_new(c->mainloop, io);
293 assert(c->pstream);
294 pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
295 pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
296 pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
297
298 c->pdispatch = pa_pdispatch_new(c->mainloop, command_table, PA_COMMAND_MAX);
299 assert(c->pdispatch);
300
301 t = pa_tagstruct_new(NULL, 0);
302 assert(t);
303 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
304 pa_tagstruct_putu32(t, tag = c->ctag++);
305 pa_tagstruct_put_arbitrary(t, c->auth_cookie, sizeof(c->auth_cookie));
306 pa_pstream_send_tagstruct(c->pstream, t);
307 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c);
308 c->state = CONTEXT_AUTHORIZING;
309 }
310
311 int pa_context_connect(struct pa_context *c, const char *server, void (*complete) (struct pa_context*c, int success, void *userdata), void *userdata) {
312 assert(c && c->state == CONTEXT_UNCONNECTED);
313
314 if (pa_authkey_load_from_home(PA_NATIVE_COOKIE_FILE, c->auth_cookie, sizeof(c->auth_cookie)) < 0) {
315 c->error = PA_ERROR_AUTHKEY;
316 return -1;
317 }
318
319 assert(!c->client);
320 if (!(c->client = pa_socket_client_new_unix(c->mainloop, server ? server : DEFAULT_SERVER))) {
321 c->error = PA_ERROR_CONNECTIONREFUSED;
322 return -1;
323 }
324
325 c->connect_complete_callback = complete;
326 c->connect_complete_userdata = userdata;
327
328 pa_socket_client_set_callback(c->client, on_connection, c);
329 c->state = CONTEXT_CONNECTING;
330
331 return 0;
332 }
333
334 int pa_context_is_dead(struct pa_context *c) {
335 assert(c);
336 return c->state == CONTEXT_DEAD;
337 }
338
339 int pa_context_is_ready(struct pa_context *c) {
340 assert(c);
341 return c->state == CONTEXT_READY;
342 }
343
344 int pa_context_errno(struct pa_context *c) {
345 assert(c);
346 return c->error;
347 }
348
349 void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata) {
350 assert(c);
351 c->die_callback = cb;
352 c->die_userdata = userdata;
353 }
354
355 static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
356 struct pa_stream *s;
357 struct pa_context *c = userdata;
358 uint32_t bytes, channel;
359 assert(pd && command == PA_COMMAND_REQUEST && t && c);
360
361 if (pa_tagstruct_getu32(t, &channel) < 0 ||
362 pa_tagstruct_getu32(t, &bytes) < 0 ||
363 !pa_tagstruct_eof(t)) {
364 c->error = PA_ERROR_PROTOCOL;
365 context_dead(c);
366 return;
367 }
368
369 if (!(s = pa_dynarray_get(c->streams, channel)))
370 return;
371
372 if (s->state != STREAM_READY)
373 return;
374
375 s->requested_bytes += bytes;
376
377 if (s->requested_bytes && s->write_callback)
378 s->write_callback(s, s->requested_bytes, s->write_userdata);
379 }
380
381 static void create_playback_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
382 struct pa_stream *s = userdata;
383 assert(pd && s && s->state == STREAM_CREATING);
384
385 if (command != PA_COMMAND_REPLY) {
386 if (handle_error(s->context, command, t) < 0) {
387 context_dead(s->context);
388 return;
389 }
390
391 stream_dead(s);
392 if (s->create_complete_callback)
393 s->create_complete_callback(s, 0, s->create_complete_userdata);
394
395 return;
396 }
397
398 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
399 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
400 !pa_tagstruct_eof(t)) {
401 s->context->error = PA_ERROR_PROTOCOL;
402 context_dead(s->context);
403 return;
404 }
405
406 s->channel_valid = 1;
407 pa_dynarray_put(s->context->streams, s->channel, s);
408
409 s->state = STREAM_READY;
410 if (s->create_complete_callback)
411 s->create_complete_callback(s, 1, s->create_complete_userdata);
412 }
413
414 static void create_stream(struct pa_stream *s, uint32_t tdev_index) {
415 struct pa_tagstruct *t;
416 uint32_t tag;
417 assert(s);
418
419 s->state = STREAM_CREATING;
420
421 t = pa_tagstruct_new(NULL, 0);
422 assert(t);
423
424 pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM);
425 pa_tagstruct_putu32(t, tag = s->context->ctag++);
426 pa_tagstruct_puts(t, s->name);
427 pa_tagstruct_put_sample_spec(t, &s->sample_spec);
428 pa_tagstruct_putu32(t, tdev_index);
429 pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
430 pa_tagstruct_putu32(t, s->buffer_attr.tlength);
431 pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
432 pa_tagstruct_putu32(t, s->buffer_attr.minreq);
433
434 pa_pstream_send_tagstruct(s->context->pstream, t);
435 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s);
436 }
437
438 static void lookup_device_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
439 struct pa_stream *s = userdata;
440 uint32_t tdev;
441 assert(pd && s && s->state == STREAM_LOOKING_UP);
442
443 if (command != PA_COMMAND_REPLY) {
444 if (handle_error(s->context, command, t) < 0) {
445 context_dead(s->context);
446 return;
447 }
448
449 stream_dead(s);
450 if (s->create_complete_callback)
451 s->create_complete_callback(s, 0, s->create_complete_userdata);
452 return;
453 }
454
455 if (pa_tagstruct_getu32(t, &tdev) < 0 ||
456 !pa_tagstruct_eof(t)) {
457 s->context->error = PA_ERROR_PROTOCOL;
458 context_dead(s->context);
459 return;
460 }
461
462 create_stream(s, tdev);
463 }
464
465 static void lookup_device(struct pa_stream *s, const char *tdev) {
466 struct pa_tagstruct *t;
467 uint32_t tag;
468 assert(s);
469
470 s->state = STREAM_LOOKING_UP;
471
472 t = pa_tagstruct_new(NULL, 0);
473 assert(t);
474
475 pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_LOOKUP_SINK : PA_COMMAND_LOOKUP_SOURCE);
476 pa_tagstruct_putu32(t, tag = s->context->ctag++);
477 pa_tagstruct_puts(t, tdev);
478
479 pa_pstream_send_tagstruct(s->context->pstream, t);
480 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, lookup_device_callback, s);
481 }
482
483 struct pa_stream* pa_stream_new(
484 struct pa_context *c,
485 enum pa_stream_direction dir,
486 const char *dev,
487 const char *name,
488 const struct pa_sample_spec *ss,
489 const struct pa_buffer_attr *attr,
490 void (*complete) (struct pa_stream*s, int success, void *userdata),
491 void *userdata) {
492
493 struct pa_stream *s;
494
495 assert(c && name && ss && c->state == CONTEXT_READY);
496
497 s = malloc(sizeof(struct pa_stream));
498 assert(s);
499 s->context = c;
500
501 s->read_callback = NULL;
502 s->read_userdata = NULL;
503 s->write_callback = NULL;
504 s->write_userdata = NULL;
505 s->die_callback = NULL;
506 s->die_userdata = NULL;
507 s->create_complete_callback = complete;
508 s->create_complete_userdata = NULL;
509
510 s->name = strdup(name);
511 s->state = STREAM_CREATING;
512 s->requested_bytes = 0;
513 s->channel = 0;
514 s->channel_valid = 0;
515 s->device_index = (uint32_t) -1;
516 s->direction = dir;
517 s->sample_spec = *ss;
518 if (attr)
519 s->buffer_attr = *attr;
520 else {
521 s->buffer_attr.maxlength = DEFAULT_MAXLENGTH;
522 s->buffer_attr.tlength = DEFAULT_TLENGTH;
523 s->buffer_attr.prebuf = DEFAULT_PREBUF;
524 s->buffer_attr.minreq = DEFAULT_MINREQ;
525 }
526
527 s->next = c->first_stream;
528 if (s->next)
529 s->next->previous = s;
530 s->previous = NULL;
531 c->first_stream = s;
532
533 if (dev)
534 lookup_device(s, dev);
535 else
536 create_stream(s, (uint32_t) -1);
537
538 return s;
539 }
540
541 void pa_stream_free(struct pa_stream *s) {
542 assert(s && s->context);
543
544 if (s->context->pdispatch)
545 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
546
547 free(s->name);
548
549 if (s->channel_valid && s->context->state == CONTEXT_READY) {
550 struct pa_tagstruct *t = pa_tagstruct_new(NULL, 0);
551 assert(t);
552
553 pa_tagstruct_putu32(t, PA_COMMAND_DELETE_PLAYBACK_STREAM);
554 pa_tagstruct_putu32(t, s->context->ctag++);
555 pa_tagstruct_putu32(t, s->channel);
556 pa_pstream_send_tagstruct(s->context->pstream, t);
557 }
558
559 if (s->channel_valid)
560 pa_dynarray_put(s->context->streams, s->channel, NULL);
561
562 if (s->next)
563 s->next->previous = s->previous;
564 if (s->previous)
565 s->previous->next = s->next;
566 else
567 s->context->first_stream = s->next;
568
569 free(s);
570 }
571
572 void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata) {
573 assert(s && cb);
574 s->write_callback = cb;
575 s->write_userdata = userdata;
576 }
577
578 void pa_stream_write(struct pa_stream *s, const void *data, size_t length) {
579 struct pa_memchunk chunk;
580 assert(s && s->context && data && length && s->state == STREAM_READY);
581
582 chunk.memblock = pa_memblock_new(length);
583 assert(chunk.memblock && chunk.memblock->data);
584 memcpy(chunk.memblock->data, data, length);
585 chunk.index = 0;
586 chunk.length = length;
587
588 pa_pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk);
589 pa_memblock_unref(chunk.memblock);
590
591 /*fprintf(stderr, "Sent %u bytes\n", length);*/
592
593 if (length < s->requested_bytes)
594 s->requested_bytes -= length;
595 else
596 s->requested_bytes = 0;
597 }
598
599 size_t pa_stream_writable_size(struct pa_stream *s) {
600 assert(s && s->state == STREAM_READY);
601 return s->requested_bytes;
602 }
603
604 void pa_stream_set_read_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) {
605 assert(s && cb);
606 s->read_callback = cb;
607 s->read_userdata = userdata;
608 }
609
610 int pa_stream_is_dead(struct pa_stream *s) {
611 return s->state == STREAM_DEAD;
612 }
613
614 int pa_stream_is_ready(struct pa_stream*s) {
615 return s->state == STREAM_READY;
616 }
617
618 void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata) {
619 assert(s);
620 s->die_callback = cb;
621 s->die_userdata = userdata;
622 }
623
624 int pa_context_is_pending(struct pa_context *c) {
625 assert(c);
626
627 if (c->state != CONTEXT_READY)
628 return 0;
629
630 return pa_pstream_is_pending(c->pstream) || pa_pdispatch_is_pending(c->pdispatch);
631 }
632
633 struct pa_context* pa_stream_get_context(struct pa_stream *p) {
634 assert(p);
635 return p->context;
636 }
637
638 static void set_dispatch_callbacks(struct pa_context *c);
639
640 static void pdispatch_drain_callback(struct pa_pdispatch*pd, void *userdata) {
641 set_dispatch_callbacks(userdata);
642 }
643
644 static void pstream_drain_callback(struct pa_pstream *s, void *userdata) {
645 set_dispatch_callbacks(userdata);
646 }
647
648 static void set_dispatch_callbacks(struct pa_context *c) {
649 assert(c && c->state == CONTEXT_READY);
650
651 pa_pstream_set_drain_callback(c->pstream, NULL, NULL);
652 pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL);
653
654 if (pa_pdispatch_is_pending(c->pdispatch)) {
655 pa_pdispatch_set_drain_callback(c->pdispatch, pdispatch_drain_callback, c);
656 return;
657 }
658
659 if (pa_pstream_is_pending(c->pstream)) {
660 pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
661 return;
662 }
663
664 assert(c->drain_complete_callback);
665 c->drain_complete_callback(c, c->drain_complete_userdata);
666 }
667
668 int pa_context_drain(
669 struct pa_context *c,
670 void (*complete) (struct pa_context*c, void *userdata),
671 void *userdata) {
672
673 assert(c && c->state == CONTEXT_READY);
674
675 if (complete == NULL) {
676 c->drain_complete_callback = NULL;
677 pa_pstream_set_drain_callback(c->pstream, NULL, NULL);
678 pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL);
679 return 0;
680 }
681
682 if (!pa_context_is_pending(c))
683 return -1;
684
685 c->drain_complete_callback = complete;
686 c->drain_complete_userdata = userdata;
687
688 set_dispatch_callbacks(c);
689
690 return 0;
691 }
692
693 static void stream_drain_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
694 struct pa_stream *s = userdata;
695 assert(pd && s);
696
697 if (command != PA_COMMAND_REPLY) {
698 if (handle_error(s->context, command, t) < 0) {
699 context_dead(s->context);
700 return;
701 }
702
703 stream_dead(s);
704 return;
705 }
706
707 if (s->state != STREAM_READY)
708 return;
709
710 if (!pa_tagstruct_eof(t)) {
711 s->context->error = PA_ERROR_PROTOCOL;
712 context_dead(s->context);
713 return;
714 }
715
716 if (s->drain_complete_callback) {
717 void (*temp) (struct pa_stream*s, void *userdata) = s->drain_complete_callback;
718 s->drain_complete_callback = NULL;
719 temp(s, s->drain_complete_userdata);
720 }
721 }
722
723
724 void pa_stream_drain(struct pa_stream *s, void (*complete) (struct pa_stream*s, void *userdata), void *userdata) {
725 struct pa_tagstruct *t;
726 uint32_t tag;
727 assert(s && s->state == STREAM_READY);
728
729 if (!complete) {
730 s->drain_complete_callback = NULL;
731 return;
732 }
733
734 s->drain_complete_callback = complete;
735 s->drain_complete_userdata = userdata;
736
737 t = pa_tagstruct_new(NULL, 0);
738 assert(t);
739
740 pa_tagstruct_putu32(t, PA_COMMAND_DRAIN_PLAYBACK_STREAM);
741 pa_tagstruct_putu32(t, tag = s->context->ctag++);
742 pa_tagstruct_putu32(t, s->channel);
743 pa_pstream_send_tagstruct(s->context->pstream, t);
744 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_drain_callback, s);
745 }