]> code.delx.au - pulseaudio/blob - polyp/polyplib.c
b1052a8d383fa782fc10c8fddda44b12e1ae140d
[pulseaudio] / polyp / polyplib.c
1 /* $Id$ */
2
3 /***
4 This file is part of polypaudio.
5
6 polypaudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published
8 by the Free Software Foundation; either version 2 of the License,
9 or (at your option) any later version.
10
11 polypaudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with polypaudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdio.h>
27 #include <assert.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netdb.h>
33
34 #include "polyplib.h"
35 #include "native-common.h"
36 #include "pdispatch.h"
37 #include "pstream.h"
38 #include "dynarray.h"
39 #include "socket-client.h"
40 #include "pstream-util.h"
41 #include "authkey.h"
42 #include "util.h"
43
44 #define DEFAULT_MAXLENGTH 204800
45 #define DEFAULT_TLENGTH 10240
46 #define DEFAULT_PREBUF 4096
47 #define DEFAULT_MINREQ 1024
48 #define DEFAULT_FRAGSIZE 1024
49
50 #define DEFAULT_TIMEOUT (5*60)
51 #define DEFAULT_SERVER "/tmp/polypaudio/native"
52 #define DEFAULT_PORT "4713"
53
54 struct pa_context {
55 char *name;
56 struct pa_mainloop_api* mainloop;
57 struct pa_socket_client *client;
58 struct pa_pstream *pstream;
59 struct pa_pdispatch *pdispatch;
60 struct pa_dynarray *record_streams, *playback_streams;
61 struct pa_stream *first_stream;
62 uint32_t ctag;
63 uint32_t error;
64 enum {
65 CONTEXT_UNCONNECTED,
66 CONTEXT_CONNECTING,
67 CONTEXT_AUTHORIZING,
68 CONTEXT_SETTING_NAME,
69 CONTEXT_READY,
70 CONTEXT_DEAD
71 } state;
72
73 void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata);
74 void *connect_complete_userdata;
75
76 void (*drain_complete_callback)(struct pa_context*c, void *userdata);
77 void *drain_complete_userdata;
78
79 void (*die_callback)(struct pa_context*c, void *userdata);
80 void *die_userdata;
81
82 void (*stat_callback)(struct pa_context*c, uint32_t count, uint32_t total, void *userdata);
83 void *stat_userdata;
84
85 void (*play_sample_callback)(struct pa_context*c, int success, void *userdata);
86 void *play_sample_userdata;
87
88 void (*remove_sample_callback)(struct pa_context*c, int success, void *userdata);
89 void *remove_sample_userdata;
90
91 uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
92 };
93
94 struct pa_stream {
95 struct pa_context *context;
96 struct pa_stream *next, *previous;
97
98 char *name;
99 struct pa_buffer_attr buffer_attr;
100 struct pa_sample_spec sample_spec;
101 uint32_t channel;
102 int channel_valid;
103 uint32_t device_index;
104 enum pa_stream_direction direction;
105
106 enum { STREAM_CREATING, STREAM_READY, STREAM_DEAD} state;
107 uint32_t requested_bytes;
108
109 void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata);
110 void *read_userdata;
111
112 void (*write_callback)(struct pa_stream *p, size_t length, void *userdata);
113 void *write_userdata;
114
115 void (*create_complete_callback)(struct pa_stream *s, int success, void *userdata);
116 void *create_complete_userdata;
117
118 void (*drain_complete_callback)(struct pa_stream *s, void *userdata);
119 void *drain_complete_userdata;
120
121 void (*die_callback)(struct pa_stream*c, void *userdata);
122 void *die_userdata;
123
124 void (*get_latency_callback)(struct pa_stream*c, uint32_t latency, void *userdata);
125 void *get_latency_userdata;
126
127 void (*finish_sample_callback)(struct pa_stream*c, int success, void *userdata);
128 void *finish_sample_userdata;
129 };
130
131 static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
132 static void command_playback_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
133
134 static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
135 [PA_COMMAND_ERROR] = { NULL },
136 [PA_COMMAND_REPLY] = { NULL },
137 [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { NULL },
138 [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { NULL },
139 [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL },
140 [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL },
141 [PA_COMMAND_EXIT] = { NULL },
142 [PA_COMMAND_REQUEST] = { command_request },
143 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = { command_playback_stream_killed },
144 [PA_COMMAND_RECORD_STREAM_KILLED] = { command_playback_stream_killed },
145 };
146
147 struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) {
148 struct pa_context *c;
149 assert(mainloop && name);
150
151 c = malloc(sizeof(struct pa_context));
152 assert(c);
153 c->name = strdup(name);
154 c->mainloop = mainloop;
155 c->client = NULL;
156 c->pstream = NULL;
157 c->pdispatch = NULL;
158 c->playback_streams = pa_dynarray_new();
159 assert(c->playback_streams);
160 c->record_streams = pa_dynarray_new();
161 assert(c->record_streams);
162 c->first_stream = NULL;
163 c->error = PA_ERROR_OK;
164 c->state = CONTEXT_UNCONNECTED;
165 c->ctag = 0;
166
167 c->connect_complete_callback = NULL;
168 c->connect_complete_userdata = NULL;
169
170 c->drain_complete_callback = NULL;
171 c->drain_complete_userdata = NULL;
172
173 c->die_callback = NULL;
174 c->die_userdata = NULL;
175
176 c->stat_callback = NULL;
177 c->stat_userdata = NULL;
178
179 c->play_sample_callback = NULL;
180 c->play_sample_userdata = NULL;
181
182 c->remove_sample_callback = NULL;
183 c->remove_sample_userdata = NULL;
184
185 pa_check_for_sigpipe();
186 return c;
187 }
188
189 void pa_context_free(struct pa_context *c) {
190 assert(c);
191
192 while (c->first_stream)
193 pa_stream_free(c->first_stream);
194
195 if (c->client)
196 pa_socket_client_free(c->client);
197 if (c->pdispatch)
198 pa_pdispatch_free(c->pdispatch);
199 if (c->pstream)
200 pa_pstream_free(c->pstream);
201 if (c->record_streams)
202 pa_dynarray_free(c->record_streams, NULL, NULL);
203 if (c->playback_streams)
204 pa_dynarray_free(c->playback_streams, NULL, NULL);
205
206 free(c->name);
207 free(c);
208 }
209
210 static void stream_dead(struct pa_stream *s) {
211 assert(s);
212
213 if (s->state == STREAM_DEAD)
214 return;
215
216 if (s->state == STREAM_READY) {
217 s->state = STREAM_DEAD;
218 if (s->die_callback)
219 s->die_callback(s, s->die_userdata);
220 } else
221 s->state = STREAM_DEAD;
222 }
223
224 static void context_dead(struct pa_context *c) {
225 struct pa_stream *s;
226 assert(c);
227
228 if (c->state == CONTEXT_DEAD)
229 return;
230
231 if (c->pdispatch)
232 pa_pdispatch_free(c->pdispatch);
233 c->pdispatch = NULL;
234
235 if (c->pstream)
236 pa_pstream_free(c->pstream);
237 c->pstream = NULL;
238
239 if (c->client)
240 pa_socket_client_free(c->client);
241 c->client = NULL;
242
243 for (s = c->first_stream; s; s = s->next)
244 stream_dead(s);
245
246 if (c->state == CONTEXT_READY) {
247 c->state = CONTEXT_DEAD;
248 if (c->die_callback)
249 c->die_callback(c, c->die_userdata);
250 } else
251 c->state = CONTEXT_DEAD;
252 }
253
254 static void pstream_die_callback(struct pa_pstream *p, void *userdata) {
255 struct pa_context *c = userdata;
256 assert(p && c);
257 c->error = PA_ERROR_CONNECTIONTERMINATED;
258 context_dead(c);
259 }
260
261 static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
262 struct pa_context *c = userdata;
263 assert(p && packet && c);
264
265 if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
266 fprintf(stderr, "polyp.c: invalid packet.\n");
267 c->error = PA_ERROR_PROTOCOL;
268 context_dead(c);
269 }
270 }
271
272 static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata) {
273 struct pa_context *c = userdata;
274 struct pa_stream *s;
275 assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
276
277 if (!(s = pa_dynarray_get(c->record_streams, channel)))
278 return;
279
280 if (s->read_callback)
281 s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata);
282 }
283
284 static int handle_error(struct pa_context *c, uint32_t command, struct pa_tagstruct *t) {
285 assert(c && t);
286
287 if (command == PA_COMMAND_ERROR) {
288 if (pa_tagstruct_getu32(t, &c->error) < 0) {
289 c->error = PA_ERROR_PROTOCOL;
290 return -1;
291 }
292
293 return 0;
294 }
295
296 c->error = (command == PA_COMMAND_TIMEOUT) ? PA_ERROR_TIMEOUT : PA_ERROR_INTERNAL;
297 return -1;
298 }
299
300 static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
301 struct pa_context *c = userdata;
302 assert(pd && c && (c->state == CONTEXT_AUTHORIZING || c->state == CONTEXT_SETTING_NAME));
303
304 if (command != PA_COMMAND_REPLY) {
305 handle_error(c, command, t);
306 context_dead(c);
307
308 if (c->connect_complete_callback)
309 c->connect_complete_callback(c, 0, c->connect_complete_userdata);
310
311 return;
312 }
313
314 if (c->state == CONTEXT_AUTHORIZING) {
315 struct pa_tagstruct *t;
316 c->state = CONTEXT_SETTING_NAME;
317 t = pa_tagstruct_new(NULL, 0);
318 assert(t);
319 pa_tagstruct_putu32(t, PA_COMMAND_SET_NAME);
320 pa_tagstruct_putu32(t, tag = c->ctag++);
321 pa_tagstruct_puts(t, c->name);
322 pa_pstream_send_tagstruct(c->pstream, t);
323 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c);
324 } else {
325 assert(c->state == CONTEXT_SETTING_NAME);
326
327 c->state = CONTEXT_READY;
328
329 if (c->connect_complete_callback)
330 c->connect_complete_callback(c, 1, c->connect_complete_userdata);
331 }
332
333 return;
334 }
335
336 static void on_connection(struct pa_socket_client *client, struct pa_iochannel*io, void *userdata) {
337 struct pa_context *c = userdata;
338 struct pa_tagstruct *t;
339 uint32_t tag;
340 assert(client && c && c->state == CONTEXT_CONNECTING);
341
342 pa_socket_client_free(client);
343 c->client = NULL;
344
345 if (!io) {
346 c->error = PA_ERROR_CONNECTIONREFUSED;
347 context_dead(c);
348
349 if (c->connect_complete_callback)
350 c->connect_complete_callback(c, 0, c->connect_complete_userdata);
351
352 return;
353 }
354
355 c->pstream = pa_pstream_new(c->mainloop, io);
356 assert(c->pstream);
357 pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
358 pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
359 pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
360
361 c->pdispatch = pa_pdispatch_new(c->mainloop, command_table, PA_COMMAND_MAX);
362 assert(c->pdispatch);
363
364 t = pa_tagstruct_new(NULL, 0);
365 assert(t);
366 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
367 pa_tagstruct_putu32(t, tag = c->ctag++);
368 pa_tagstruct_put_arbitrary(t, c->auth_cookie, sizeof(c->auth_cookie));
369 pa_pstream_send_tagstruct(c->pstream, t);
370 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c);
371 c->state = CONTEXT_AUTHORIZING;
372 }
373
374 static struct sockaddr *resolve_server(const char *server, size_t *len) {
375 struct sockaddr *sa;
376 struct addrinfo hints, *result = NULL;
377 char *port;
378 assert(server && len);
379
380 if ((port = strrchr(server, ':')))
381 port++;
382 if (!port)
383 port = DEFAULT_PORT;
384
385 memset(&hints, 0, sizeof(hints));
386 hints.ai_family = PF_UNSPEC;
387 hints.ai_socktype = SOCK_STREAM;
388 hints.ai_protocol = 0;
389
390 if (getaddrinfo(server, port, &hints, &result) != 0)
391 return NULL;
392 assert(result);
393
394 sa = malloc(*len = result->ai_addrlen);
395 assert(sa);
396 memcpy(sa, result->ai_addr, *len);
397
398 freeaddrinfo(result);
399
400 return sa;
401
402 }
403
404 int pa_context_connect(struct pa_context *c, const char *server, void (*complete) (struct pa_context*c, int success, void *userdata), void *userdata) {
405 assert(c && c->state == CONTEXT_UNCONNECTED);
406
407 if (pa_authkey_load_from_home(PA_NATIVE_COOKIE_FILE, c->auth_cookie, sizeof(c->auth_cookie)) < 0) {
408 c->error = PA_ERROR_AUTHKEY;
409 return -1;
410 }
411
412 if (!server)
413 if (!(server = getenv("POLYP_SERVER")))
414 server = DEFAULT_SERVER;
415
416 assert(!c->client);
417
418 if (*server == '/') {
419 if (!(c->client = pa_socket_client_new_unix(c->mainloop, server))) {
420 c->error = PA_ERROR_CONNECTIONREFUSED;
421 return -1;
422 }
423 } else {
424 struct sockaddr* sa;
425 size_t sa_len;
426
427 if (!(sa = resolve_server(server, &sa_len))) {
428 c->error = PA_ERROR_INVALIDSERVER;
429 return -1;
430 }
431
432 c->client = pa_socket_client_new_sockaddr(c->mainloop, sa, sa_len);
433 free(sa);
434
435 if (!c->client) {
436 c->error = PA_ERROR_CONNECTIONREFUSED;
437 return -1;
438 }
439 }
440
441 c->connect_complete_callback = complete;
442 c->connect_complete_userdata = userdata;
443
444 pa_socket_client_set_callback(c->client, on_connection, c);
445 c->state = CONTEXT_CONNECTING;
446
447 return 0;
448 }
449
450 int pa_context_is_dead(struct pa_context *c) {
451 assert(c);
452 return c->state == CONTEXT_DEAD;
453 }
454
455 int pa_context_is_ready(struct pa_context *c) {
456 assert(c);
457 return c->state == CONTEXT_READY;
458 }
459
460 int pa_context_errno(struct pa_context *c) {
461 assert(c);
462 return c->error;
463 }
464
465 void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata) {
466 assert(c);
467 c->die_callback = cb;
468 c->die_userdata = userdata;
469 }
470
471 static void command_playback_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
472 struct pa_context *c = userdata;
473 struct pa_stream *s;
474 uint32_t channel;
475 assert(pd && (command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED) && t && c);
476
477 if (pa_tagstruct_getu32(t, &channel) < 0 ||
478 !pa_tagstruct_eof(t)) {
479 c->error = PA_ERROR_PROTOCOL;
480 context_dead(c);
481 return;
482 }
483
484 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
485 return;
486
487 c->error = PA_ERROR_KILLED;
488 stream_dead(s);
489 }
490
491 static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
492 struct pa_stream *s;
493 struct pa_context *c = userdata;
494 uint32_t bytes, channel;
495 assert(pd && command == PA_COMMAND_REQUEST && t && c);
496
497 if (pa_tagstruct_getu32(t, &channel) < 0 ||
498 pa_tagstruct_getu32(t, &bytes) < 0 ||
499 !pa_tagstruct_eof(t)) {
500 c->error = PA_ERROR_PROTOCOL;
501 context_dead(c);
502 return;
503 }
504
505 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
506 return;
507
508 if (s->state != STREAM_READY)
509 return;
510
511 s->requested_bytes += bytes;
512
513 if (s->requested_bytes && s->write_callback)
514 s->write_callback(s, s->requested_bytes, s->write_userdata);
515 }
516
517 static void create_stream_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
518 struct pa_stream *s = userdata;
519 assert(pd && s && s->state == STREAM_CREATING);
520
521 if (command != PA_COMMAND_REPLY) {
522 if (handle_error(s->context, command, t) < 0) {
523 context_dead(s->context);
524 return;
525 }
526
527 stream_dead(s);
528 if (s->create_complete_callback)
529 s->create_complete_callback(s, 0, s->create_complete_userdata);
530
531 return;
532 }
533
534 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
535 ((s->direction != PA_STREAM_UPLOAD) && pa_tagstruct_getu32(t, &s->device_index) < 0) ||
536 !pa_tagstruct_eof(t)) {
537 s->context->error = PA_ERROR_PROTOCOL;
538 context_dead(s->context);
539 return;
540 }
541
542 s->channel_valid = 1;
543 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
544
545 s->state = STREAM_READY;
546 if (s->create_complete_callback)
547 s->create_complete_callback(s, 1, s->create_complete_userdata);
548 }
549
550 static void create_stream(struct pa_stream *s, const char *dev) {
551 struct pa_tagstruct *t;
552 uint32_t tag;
553 assert(s);
554
555 s->state = STREAM_CREATING;
556
557 t = pa_tagstruct_new(NULL, 0);
558 assert(t);
559
560 pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM);
561 pa_tagstruct_putu32(t, tag = s->context->ctag++);
562 pa_tagstruct_puts(t, s->name);
563 pa_tagstruct_put_sample_spec(t, &s->sample_spec);
564 pa_tagstruct_putu32(t, (uint32_t) -1);
565 pa_tagstruct_puts(t, dev ? dev : "");
566 pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
567 if (s->direction == PA_STREAM_PLAYBACK) {
568 pa_tagstruct_putu32(t, s->buffer_attr.tlength);
569 pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
570 pa_tagstruct_putu32(t, s->buffer_attr.minreq);
571 } else
572 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
573
574 pa_pstream_send_tagstruct(s->context->pstream, t);
575 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, s);
576 }
577
578 static struct pa_stream *internal_stream_new(struct pa_context *c) {
579 struct pa_stream *s;
580
581 s = malloc(sizeof(struct pa_stream));
582 assert(s);
583 s->context = c;
584
585 s->read_callback = NULL;
586 s->read_userdata = NULL;
587 s->write_callback = NULL;
588 s->write_userdata = NULL;
589 s->die_callback = NULL;
590 s->die_userdata = NULL;
591 s->create_complete_callback = NULL;
592 s->create_complete_userdata = NULL;
593 s->get_latency_callback = NULL;
594 s->get_latency_userdata = NULL;
595 s->finish_sample_callback = NULL;
596 s->finish_sample_userdata = NULL;
597
598 s->name = NULL;
599 s->state = STREAM_CREATING;
600 s->requested_bytes = 0;
601 s->channel = 0;
602 s->channel_valid = 0;
603 s->device_index = (uint32_t) -1;
604
605 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
606
607 s->next = c->first_stream;
608 if (s->next)
609 s->next->previous = s;
610 s->previous = NULL;
611 c->first_stream = s;
612
613 return s;
614 }
615
616 struct pa_stream* pa_stream_new(
617 struct pa_context *c,
618 enum pa_stream_direction dir,
619 const char *dev,
620 const char *name,
621 const struct pa_sample_spec *ss,
622 const struct pa_buffer_attr *attr,
623 void (*complete) (struct pa_stream*s, int success, void *userdata),
624 void *userdata) {
625
626 struct pa_stream *s;
627
628 assert(c && name && ss && c->state == CONTEXT_READY && (dir == PA_STREAM_PLAYBACK || dir == PA_STREAM_RECORD));
629
630 s = internal_stream_new(c);
631 assert(s);
632
633 s->create_complete_callback = complete;
634 s->create_complete_userdata = userdata;
635 s->name = strdup(name);
636 s->state = STREAM_CREATING;
637 s->direction = dir;
638 s->sample_spec = *ss;
639 if (attr)
640 s->buffer_attr = *attr;
641 else {
642 s->buffer_attr.maxlength = DEFAULT_MAXLENGTH;
643 s->buffer_attr.tlength = DEFAULT_TLENGTH;
644 s->buffer_attr.prebuf = DEFAULT_PREBUF;
645 s->buffer_attr.minreq = DEFAULT_MINREQ;
646 s->buffer_attr.fragsize = DEFAULT_FRAGSIZE;
647 }
648
649 create_stream(s, dev);
650
651 return s;
652 }
653
654 void pa_stream_free(struct pa_stream *s) {
655 assert(s && s->context);
656
657 if (s->context->pdispatch)
658 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
659
660 free(s->name);
661
662 if (s->channel_valid && s->context->state == CONTEXT_READY) {
663 struct pa_tagstruct *t = pa_tagstruct_new(NULL, 0);
664 assert(t);
665
666 pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
667 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM));
668 pa_tagstruct_putu32(t, s->context->ctag++);
669 pa_tagstruct_putu32(t, s->channel);
670 pa_pstream_send_tagstruct(s->context->pstream, t);
671 }
672
673 if (s->channel_valid)
674 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
675
676 if (s->next)
677 s->next->previous = s->previous;
678 if (s->previous)
679 s->previous->next = s->next;
680 else
681 s->context->first_stream = s->next;
682
683 free(s);
684 }
685
686 void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata) {
687 s->write_callback = cb;
688 s->write_userdata = userdata;
689 }
690
691 void pa_stream_write(struct pa_stream *s, const void *data, size_t length) {
692 struct pa_memchunk chunk;
693 assert(s && s->context && data && length && s->state == STREAM_READY);
694
695 chunk.memblock = pa_memblock_new(length);
696 assert(chunk.memblock && chunk.memblock->data);
697 memcpy(chunk.memblock->data, data, length);
698 chunk.index = 0;
699 chunk.length = length;
700
701 pa_pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk);
702 pa_memblock_unref(chunk.memblock);
703
704 /*fprintf(stderr, "Sent %u bytes\n", length);*/
705
706 if (length < s->requested_bytes)
707 s->requested_bytes -= length;
708 else
709 s->requested_bytes = 0;
710 }
711
712 size_t pa_stream_writable_size(struct pa_stream *s) {
713 assert(s && s->state == STREAM_READY);
714 return s->requested_bytes;
715 }
716
717 void pa_stream_set_read_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) {
718 assert(s && cb);
719 s->read_callback = cb;
720 s->read_userdata = userdata;
721 }
722
723 int pa_stream_is_dead(struct pa_stream *s) {
724 return s->state == STREAM_DEAD;
725 }
726
727 int pa_stream_is_ready(struct pa_stream*s) {
728 return s->state == STREAM_READY;
729 }
730
731 void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata) {
732 assert(s);
733 s->die_callback = cb;
734 s->die_userdata = userdata;
735 }
736
737 int pa_context_is_pending(struct pa_context *c) {
738 assert(c);
739
740 if (c->state != CONTEXT_READY)
741 return 0;
742
743 return pa_pstream_is_pending(c->pstream) || pa_pdispatch_is_pending(c->pdispatch);
744 }
745
746 struct pa_context* pa_stream_get_context(struct pa_stream *p) {
747 assert(p);
748 return p->context;
749 }
750
751 static void set_dispatch_callbacks(struct pa_context *c);
752
753 static void pdispatch_drain_callback(struct pa_pdispatch*pd, void *userdata) {
754 set_dispatch_callbacks(userdata);
755 }
756
757 static void pstream_drain_callback(struct pa_pstream *s, void *userdata) {
758 set_dispatch_callbacks(userdata);
759 }
760
761 static void set_dispatch_callbacks(struct pa_context *c) {
762 assert(c && c->state == CONTEXT_READY);
763
764 pa_pstream_set_drain_callback(c->pstream, NULL, NULL);
765 pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL);
766
767 if (pa_pdispatch_is_pending(c->pdispatch)) {
768 pa_pdispatch_set_drain_callback(c->pdispatch, pdispatch_drain_callback, c);
769 return;
770 }
771
772 if (pa_pstream_is_pending(c->pstream)) {
773 pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
774 return;
775 }
776
777 assert(c->drain_complete_callback);
778 c->drain_complete_callback(c, c->drain_complete_userdata);
779 }
780
781 int pa_context_drain(
782 struct pa_context *c,
783 void (*complete) (struct pa_context*c, void *userdata),
784 void *userdata) {
785
786 assert(c && c->state == CONTEXT_READY);
787
788 if (complete == NULL) {
789 c->drain_complete_callback = NULL;
790 pa_pstream_set_drain_callback(c->pstream, NULL, NULL);
791 pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL);
792 return 0;
793 }
794
795 if (!pa_context_is_pending(c))
796 return -1;
797
798 c->drain_complete_callback = complete;
799 c->drain_complete_userdata = userdata;
800
801 set_dispatch_callbacks(c);
802
803 return 0;
804 }
805
806 static void stream_drain_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
807 struct pa_stream *s = userdata;
808 assert(pd && s);
809
810 if (command != PA_COMMAND_REPLY) {
811 if (handle_error(s->context, command, t) < 0) {
812 context_dead(s->context);
813 return;
814 }
815
816 stream_dead(s);
817 return;
818 }
819
820 if (s->state != STREAM_READY)
821 return;
822
823 if (!pa_tagstruct_eof(t)) {
824 s->context->error = PA_ERROR_PROTOCOL;
825 context_dead(s->context);
826 return;
827 }
828
829 if (s->drain_complete_callback) {
830 void (*temp) (struct pa_stream*s, void *userdata) = s->drain_complete_callback;
831 s->drain_complete_callback = NULL;
832 temp(s, s->drain_complete_userdata);
833 }
834 }
835
836
837 void pa_stream_drain(struct pa_stream *s, void (*complete) (struct pa_stream*s, void *userdata), void *userdata) {
838 struct pa_tagstruct *t;
839 uint32_t tag;
840 assert(s && s->state == STREAM_READY);
841
842 if (!complete) {
843 s->drain_complete_callback = NULL;
844 return;
845 }
846
847 s->drain_complete_callback = complete;
848 s->drain_complete_userdata = userdata;
849
850 t = pa_tagstruct_new(NULL, 0);
851 assert(t);
852
853 pa_tagstruct_putu32(t, PA_COMMAND_DRAIN_PLAYBACK_STREAM);
854 pa_tagstruct_putu32(t, tag = s->context->ctag++);
855 pa_tagstruct_putu32(t, s->channel);
856 pa_pstream_send_tagstruct(s->context->pstream, t);
857 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_drain_callback, s);
858 }
859
860 void pa_context_exit(struct pa_context *c) {
861 struct pa_tagstruct *t;
862 t = pa_tagstruct_new(NULL, 0);
863 assert(t);
864 pa_tagstruct_putu32(t, PA_COMMAND_EXIT);
865 pa_tagstruct_putu32(t, c->ctag++);
866 pa_pstream_send_tagstruct(c->pstream, t);
867 }
868
869 static void context_stat_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
870 struct pa_context *c = userdata;
871 uint32_t total, count;
872 assert(pd && c);
873
874 if (command != PA_COMMAND_REPLY) {
875 if (handle_error(c, command, t) < 0) {
876 context_dead(c);
877 return;
878 }
879
880 if (c->stat_callback)
881 c->stat_callback(c, (uint32_t) -1, (uint32_t) -1, c->stat_userdata);
882 return;
883 }
884
885 if (pa_tagstruct_getu32(t, &count) < 0 ||
886 pa_tagstruct_getu32(t, &total) < 0 ||
887 !pa_tagstruct_eof(t)) {
888 c->error = PA_ERROR_PROTOCOL;
889 context_dead(c);
890 return;
891 }
892
893 if (c->stat_callback)
894 c->stat_callback(c, count, total, c->stat_userdata);
895 }
896
897 void pa_context_stat(struct pa_context *c, void (*cb)(struct pa_context *c, uint32_t count, uint32_t total, void *userdata), void *userdata) {
898 uint32_t tag;
899 struct pa_tagstruct *t;
900
901 c->stat_callback = cb;
902 c->stat_userdata = userdata;
903
904 if (cb == NULL)
905 return;
906
907 t = pa_tagstruct_new(NULL, 0);
908 assert(t);
909 pa_tagstruct_putu32(t, PA_COMMAND_STAT);
910 pa_tagstruct_putu32(t, tag = c->ctag++);
911 pa_pstream_send_tagstruct(c->pstream, t);
912 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, context_stat_callback, c);
913 }
914
915 static void stream_get_latency_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
916 struct pa_stream *s = userdata;
917 uint32_t latency;
918 assert(pd && s);
919
920 if (command != PA_COMMAND_REPLY) {
921 if (handle_error(s->context, command, t) < 0) {
922 context_dead(s->context);
923 return;
924 }
925
926 if (s->get_latency_callback)
927 s->get_latency_callback(s, (uint32_t) -1, s->get_latency_userdata);
928 return;
929 }
930
931 if (pa_tagstruct_getu32(t, &latency) < 0 ||
932 !pa_tagstruct_eof(t)) {
933 s->context->error = PA_ERROR_PROTOCOL;
934 context_dead(s->context);
935 return;
936 }
937
938 if (s->get_latency_callback)
939 s->get_latency_callback(s, latency, s->get_latency_userdata);
940 }
941
942 void pa_stream_get_latency(struct pa_stream *p, void (*cb)(struct pa_stream *p, uint32_t latency, void *userdata), void *userdata) {
943 uint32_t tag;
944 struct pa_tagstruct *t;
945
946 p->get_latency_callback = cb;
947 p->get_latency_userdata = userdata;
948
949 if (cb == NULL)
950 return;
951
952 t = pa_tagstruct_new(NULL, 0);
953 assert(t);
954 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
955 pa_tagstruct_putu32(t, tag = p->context->ctag++);
956 pa_tagstruct_putu32(t, p->channel);
957 pa_pstream_send_tagstruct(p->context->pstream, t);
958 pa_pdispatch_register_reply(p->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, p);
959 }
960
961 struct pa_stream* pa_context_upload_sample(struct pa_context *c, const char *name, const struct pa_sample_spec *ss, size_t length, void (*cb) (struct pa_stream*s, int success, void *userdata), void *userdata) {
962 struct pa_stream *s;
963 struct pa_tagstruct *t;
964 uint32_t tag;
965
966 s = internal_stream_new(c);
967 assert(s);
968
969 s->create_complete_callback = cb;
970 s->create_complete_userdata = userdata;
971 s->name = strdup(name);
972 s->state = STREAM_CREATING;
973 s->direction = PA_STREAM_UPLOAD;
974 s->sample_spec = *ss;
975
976 t = pa_tagstruct_new(NULL, 0);
977 assert(t);
978 pa_tagstruct_putu32(t, PA_COMMAND_CREATE_UPLOAD_STREAM);
979 pa_tagstruct_putu32(t, tag = c->ctag++);
980 pa_tagstruct_puts(t, name);
981 pa_tagstruct_put_sample_spec(t, ss);
982 pa_tagstruct_putu32(t, length);
983 pa_pstream_send_tagstruct(c->pstream, t);
984 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, s);
985
986 return s;
987 }
988
989 static void stream_finish_sample_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
990 struct pa_stream *s = userdata;
991 assert(pd && s);
992
993 if (command != PA_COMMAND_REPLY) {
994 if (handle_error(s->context, command, t) < 0) {
995 context_dead(s->context);
996 return;
997 }
998
999 if (s->finish_sample_callback)
1000 s->finish_sample_callback(s, 0, s->finish_sample_userdata);
1001 return;
1002 }
1003
1004 if (!pa_tagstruct_eof(t)) {
1005 s->context->error = PA_ERROR_PROTOCOL;
1006 context_dead(s->context);
1007 return;
1008 }
1009
1010 if (s->finish_sample_callback)
1011 s->finish_sample_callback(s, 1, s->finish_sample_userdata);
1012 }
1013
1014 void pa_stream_finish_sample(struct pa_stream *p, void (*cb)(struct pa_stream*s, int success, void *userdata), void *userdata) {
1015 struct pa_tagstruct *t;
1016 uint32_t tag;
1017 assert(p);
1018
1019 p->finish_sample_callback = cb;
1020 p->finish_sample_userdata = userdata;
1021
1022 t = pa_tagstruct_new(NULL, 0);
1023 assert(t);
1024 pa_tagstruct_putu32(t, PA_COMMAND_FINISH_UPLOAD_STREAM);
1025 pa_tagstruct_putu32(t, tag = p->context->ctag++);
1026 pa_tagstruct_putu32(t, p->channel);
1027 pa_pstream_send_tagstruct(p->context->pstream, t);
1028 pa_pdispatch_register_reply(p->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_finish_sample_callback, p);
1029 }
1030
1031 static void context_play_sample_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1032 struct pa_context *c = userdata;
1033 assert(pd && c);
1034
1035 if (command != PA_COMMAND_REPLY) {
1036 if (handle_error(c, command, t) < 0) {
1037 context_dead(c);
1038 return;
1039 }
1040
1041 if (c->play_sample_callback)
1042 c->play_sample_callback(c, 0, c->play_sample_userdata);
1043 return;
1044 }
1045
1046 if (!pa_tagstruct_eof(t)) {
1047 c->error = PA_ERROR_PROTOCOL;
1048 context_dead(c);
1049 return;
1050 }
1051
1052 if (c->play_sample_callback)
1053 c->play_sample_callback(c, 1, c->play_sample_userdata);
1054 }
1055
1056 void pa_context_play_sample(struct pa_context *c, const char *name, const char *dev, uint32_t volume, void (*cb)(struct pa_context *c, int success, void *userdata), void *userdata) {
1057 struct pa_tagstruct *t;
1058 uint32_t tag;
1059 assert(c && name && *name && (!dev || *dev));
1060
1061 if (!volume)
1062 return;
1063
1064 c->play_sample_callback = cb;
1065 c->play_sample_userdata = userdata;
1066
1067 t = pa_tagstruct_new(NULL, 0);
1068 assert(t);
1069 pa_tagstruct_putu32(t, PA_COMMAND_PLAY_SAMPLE);
1070 pa_tagstruct_putu32(t, tag = c->ctag++);
1071 pa_tagstruct_putu32(t, (uint32_t) -1);
1072 pa_tagstruct_puts(t, dev ? dev : "");
1073 pa_tagstruct_putu32(t, volume);
1074 pa_tagstruct_puts(t, name);
1075 pa_pstream_send_tagstruct(c->pstream, t);
1076 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, context_play_sample_callback, c);
1077 }
1078
1079 static void context_remove_sample_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1080 struct pa_context *c = userdata;
1081 assert(pd && c);
1082
1083 if (command != PA_COMMAND_REPLY) {
1084 if (handle_error(c, command, t) < 0) {
1085 context_dead(c);
1086 return;
1087 }
1088
1089 if (c->remove_sample_callback)
1090 c->remove_sample_callback(c, 0, c->remove_sample_userdata);
1091 return;
1092 }
1093
1094 if (!pa_tagstruct_eof(t)) {
1095 c->error = PA_ERROR_PROTOCOL;
1096 context_dead(c);
1097 return;
1098 }
1099
1100 if (c->remove_sample_callback)
1101 c->remove_sample_callback(c, 1, c->remove_sample_userdata);
1102 }
1103
1104 void pa_context_remove_sample(struct pa_context *c, const char *name, void (*cb)(struct pa_context *c, int success, void *userdata), void *userdata) {
1105 struct pa_tagstruct *t;
1106 uint32_t tag;
1107 assert(c && name);
1108
1109 c->remove_sample_callback = cb;
1110 c->remove_sample_userdata = userdata;
1111
1112 t = pa_tagstruct_new(NULL, 0);
1113 assert(t);
1114 pa_tagstruct_putu32(t, PA_COMMAND_REMOVE_SAMPLE);
1115 pa_tagstruct_putu32(t, tag = c->ctag++);
1116 pa_tagstruct_puts(t, name);
1117 pa_pstream_send_tagstruct(c->pstream, t);
1118 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, context_remove_sample_callback, c);
1119 }