]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
* add new function pa_check_in_group()
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <assert.h>
29 #include <unistd.h>
30 #include <sys/socket.h>
31 #include <sys/un.h>
32
33 #ifdef HAVE_NETINET_IN_H
34 #include <netinet/in.h>
35 #endif
36
37 #include "winsock.h"
38
39 #include <pulse/xmalloc.h>
40
41 #include <pulsecore/queue.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-scache.h>
44 #include <pulsecore/creds.h>
45
46 #include "pstream.h"
47
48 enum {
49 PA_PSTREAM_DESCRIPTOR_LENGTH,
50 PA_PSTREAM_DESCRIPTOR_CHANNEL,
51 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
52 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
53 PA_PSTREAM_DESCRIPTOR_SEEK,
54 PA_PSTREAM_DESCRIPTOR_MAX
55 };
56
57 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
58
59 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
60 #define FRAME_SIZE_MAX PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
61
62 struct item_info {
63 enum { PA_PSTREAM_ITEM_PACKET, PA_PSTREAM_ITEM_MEMBLOCK } type;
64
65 /* memblock info */
66 pa_memchunk chunk;
67 uint32_t channel;
68 int64_t offset;
69 pa_seek_mode_t seek_mode;
70
71 /* packet info */
72 pa_packet *packet;
73 #ifdef HAVE_CREDS
74 int with_creds;
75 pa_creds creds;
76 #endif
77 };
78
79 struct pa_pstream {
80 int ref;
81
82 pa_mainloop_api *mainloop;
83 pa_defer_event *defer_event;
84 pa_iochannel *io;
85 pa_queue *send_queue;
86
87 int dead;
88
89 struct {
90 struct item_info* current;
91 pa_pstream_descriptor descriptor;
92 void *data;
93 size_t index;
94 } write;
95
96 struct {
97 pa_memblock *memblock;
98 pa_packet *packet;
99 pa_pstream_descriptor descriptor;
100 void *data;
101 size_t index;
102 } read;
103
104 pa_pstream_packet_cb_t recieve_packet_callback;
105 void *recieve_packet_callback_userdata;
106
107 pa_pstream_memblock_cb_t recieve_memblock_callback;
108 void *recieve_memblock_callback_userdata;
109
110 pa_pstream_notify_cb_t drain_callback;
111 void *drain_callback_userdata;
112
113 pa_pstream_notify_cb_t die_callback;
114 void *die_callback_userdata;
115
116 pa_memblock_stat *memblock_stat;
117
118 #ifdef HAVE_CREDS
119 pa_creds read_creds, write_creds;
120 int read_creds_valid, send_creds_now;
121 #endif
122 };
123
124 static int do_write(pa_pstream *p);
125 static int do_read(pa_pstream *p);
126
127 static void do_something(pa_pstream *p) {
128 assert(p);
129
130 p->mainloop->defer_enable(p->defer_event, 0);
131
132 pa_pstream_ref(p);
133
134 if (!p->dead && pa_iochannel_is_readable(p->io)) {
135 if (do_read(p) < 0)
136 goto fail;
137 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
138 goto fail;
139
140 if (!p->dead && pa_iochannel_is_writable(p->io)) {
141 if (do_write(p) < 0)
142 goto fail;
143 }
144
145 pa_pstream_unref(p);
146 return;
147
148 fail:
149
150 p->dead = 1;
151
152 if (p->die_callback)
153 p->die_callback(p, p->die_callback_userdata);
154
155 pa_pstream_unref(p);
156 }
157
158 static void io_callback(pa_iochannel*io, void *userdata) {
159 pa_pstream *p = userdata;
160
161 assert(p);
162 assert(p->io == io);
163
164 do_something(p);
165 }
166
167 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
168 pa_pstream *p = userdata;
169
170 assert(p);
171 assert(p->defer_event == e);
172 assert(p->mainloop == m);
173
174 do_something(p);
175 }
176
177 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s) {
178 pa_pstream *p;
179 assert(io);
180
181 p = pa_xnew(pa_pstream, 1);
182
183 p->ref = 1;
184 p->io = io;
185 pa_iochannel_set_callback(io, io_callback, p);
186
187 p->dead = 0;
188
189 p->mainloop = m;
190 p->defer_event = m->defer_new(m, defer_callback, p);
191 m->defer_enable(p->defer_event, 0);
192
193 p->send_queue = pa_queue_new();
194 assert(p->send_queue);
195
196 p->write.current = NULL;
197 p->write.index = 0;
198
199 p->read.memblock = NULL;
200 p->read.packet = NULL;
201 p->read.index = 0;
202
203 p->recieve_packet_callback = NULL;
204 p->recieve_packet_callback_userdata = NULL;
205
206 p->recieve_memblock_callback = NULL;
207 p->recieve_memblock_callback_userdata = NULL;
208
209 p->drain_callback = NULL;
210 p->drain_callback_userdata = NULL;
211
212 p->die_callback = NULL;
213 p->die_callback_userdata = NULL;
214
215 p->memblock_stat = s;
216
217 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
218 pa_iochannel_socket_set_sndbuf(io, 1024*8);
219
220 #ifdef HAVE_CREDS
221 p->send_creds_now = 0;
222 p->read_creds_valid = 0;
223 #endif
224 return p;
225 }
226
227 static void item_free(void *item, PA_GCC_UNUSED void *p) {
228 struct item_info *i = item;
229 assert(i);
230
231 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
232 assert(i->chunk.memblock);
233 pa_memblock_unref(i->chunk.memblock);
234 } else {
235 assert(i->type == PA_PSTREAM_ITEM_PACKET);
236 assert(i->packet);
237 pa_packet_unref(i->packet);
238 }
239
240 pa_xfree(i);
241 }
242
243 static void pstream_free(pa_pstream *p) {
244 assert(p);
245
246 pa_pstream_close(p);
247
248 pa_queue_free(p->send_queue, item_free, NULL);
249
250 if (p->write.current)
251 item_free(p->write.current, NULL);
252
253 if (p->read.memblock)
254 pa_memblock_unref(p->read.memblock);
255
256 if (p->read.packet)
257 pa_packet_unref(p->read.packet);
258
259 pa_xfree(p);
260 }
261
262 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
263 struct item_info *i;
264 assert(p && packet && p->ref >= 1);
265
266 if (p->dead)
267 return;
268
269 /* pa_log(__FILE__": push-packet %p", packet); */
270
271 i = pa_xnew(struct item_info, 1);
272 i->type = PA_PSTREAM_ITEM_PACKET;
273 i->packet = pa_packet_ref(packet);
274 #ifdef HAVE_CREDS
275 if ((i->with_creds = !!creds))
276 i->creds = *creds;
277 #endif
278
279 pa_queue_push(p->send_queue, i);
280 p->mainloop->defer_enable(p->defer_event, 1);
281 }
282
283 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
284 struct item_info *i;
285 assert(p && channel != (uint32_t) -1 && chunk && p->ref >= 1);
286
287 if (p->dead)
288 return;
289
290 /* pa_log(__FILE__": push-memblock %p", chunk); */
291
292 i = pa_xnew(struct item_info, 1);
293 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
294 i->chunk = *chunk;
295 i->channel = channel;
296 i->offset = offset;
297 i->seek_mode = seek_mode;
298 #ifdef HAVE_CREDS
299 i->with_creds = 0;
300 #endif
301
302 pa_memblock_ref(i->chunk.memblock);
303
304 pa_queue_push(p->send_queue, i);
305 p->mainloop->defer_enable(p->defer_event, 1);
306 }
307
308 static void prepare_next_write_item(pa_pstream *p) {
309 assert(p);
310
311 if (!(p->write.current = pa_queue_pop(p->send_queue)))
312 return;
313
314 p->write.index = 0;
315
316 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
317 /*pa_log(__FILE__": pop-packet %p", p->write.current->packet);*/
318
319 assert(p->write.current->packet);
320 p->write.data = p->write.current->packet->data;
321 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
322 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
323 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
324 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
325 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = 0;
326
327
328 } else {
329 assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
330 p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
331 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
332 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
333 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
334 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
335 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = htonl(p->write.current->seek_mode);
336 }
337
338 #ifdef HAVE_CREDS
339 if ((p->send_creds_now = p->write.current->with_creds))
340 p->write_creds = p->write.current->creds;
341
342 #endif
343
344 }
345
346 static int do_write(pa_pstream *p) {
347 void *d;
348 size_t l;
349 ssize_t r;
350 assert(p);
351
352 if (!p->write.current)
353 prepare_next_write_item(p);
354
355 if (!p->write.current)
356 return 0;
357
358 assert(p->write.data);
359
360 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
361 d = (uint8_t*) p->write.descriptor + p->write.index;
362 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
363 } else {
364 d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
365 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
366 }
367
368 #ifdef HAVE_CREDS
369 if (p->send_creds_now) {
370
371 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
372 return -1;
373
374 p->send_creds_now = 0;
375 } else
376 #endif
377
378 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
379 return -1;
380
381 p->write.index += r;
382
383 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE+ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
384 assert(p->write.current);
385 item_free(p->write.current, (void *) 1);
386 p->write.current = NULL;
387
388 if (p->drain_callback && !pa_pstream_is_pending(p))
389 p->drain_callback(p, p->drain_callback_userdata);
390 }
391
392 return 0;
393 }
394
395 static int do_read(pa_pstream *p) {
396 void *d;
397 size_t l;
398 ssize_t r;
399 assert(p);
400
401 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
402 d = (uint8_t*) p->read.descriptor + p->read.index;
403 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
404 } else {
405 assert(p->read.data);
406 d = (uint8_t*) p->read.data + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
407 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
408 }
409
410 #ifdef HAVE_CREDS
411 {
412 int b = 0;
413
414 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
415 return -1;
416
417 p->read_creds_valid = p->read_creds_valid || b;
418 }
419 #else
420 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
421 return -1;
422 #endif
423
424 p->read.index += r;
425
426 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
427 /* Reading of frame descriptor complete */
428
429 /* Frame size too large */
430 if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) > FRAME_SIZE_MAX) {
431 pa_log_warn(__FILE__": Frame size too large: %lu > %lu", (unsigned long) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), (unsigned long) FRAME_SIZE_MAX);
432 return -1;
433 }
434
435 assert(!p->read.packet && !p->read.memblock);
436
437 if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {
438 /* Frame is a packet frame */
439 p->read.packet = pa_packet_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]));
440 p->read.data = p->read.packet->data;
441 } else {
442 /* Frame is a memblock frame */
443 p->read.memblock = pa_memblock_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), p->memblock_stat);
444 p->read.data = p->read.memblock->data;
445
446 if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]) > PA_SEEK_RELATIVE_END) {
447 pa_log_warn(__FILE__": Invalid seek mode");
448 return -1;
449 }
450 }
451
452 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
453 /* Frame payload available */
454
455 if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */
456 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
457
458 if (l > 0) {
459 pa_memchunk chunk;
460
461 chunk.memblock = p->read.memblock;
462 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
463 chunk.length = l;
464
465 if (p->recieve_memblock_callback) {
466 int64_t offset;
467
468 offset = (int64_t) (
469 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
470 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
471
472 p->recieve_memblock_callback(
473 p,
474 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
475 offset,
476 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]),
477 &chunk,
478 p->recieve_memblock_callback_userdata);
479 }
480
481 /* Drop seek info for following callbacks */
482 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] =
483 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
484 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
485 }
486 }
487
488 /* Frame complete */
489 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
490 if (p->read.memblock) {
491 assert(!p->read.packet);
492
493 pa_memblock_unref(p->read.memblock);
494 p->read.memblock = NULL;
495 } else {
496 assert(p->read.packet);
497
498 if (p->recieve_packet_callback)
499 #ifdef HAVE_CREDS
500 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
501 #else
502 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
503 #endif
504
505 pa_packet_unref(p->read.packet);
506 p->read.packet = NULL;
507 }
508
509 p->read.index = 0;
510 #ifdef HAVE_CREDS
511 p->read_creds_valid = 0;
512 #endif
513 }
514 }
515
516 return 0;
517 }
518
519 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
520 assert(p);
521 assert(p->ref >= 1);
522
523 p->die_callback = cb;
524 p->die_callback_userdata = userdata;
525 }
526
527
528 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
529 assert(p);
530 assert(p->ref >= 1);
531
532 p->drain_callback = cb;
533 p->drain_callback_userdata = userdata;
534 }
535
536 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
537 assert(p);
538 assert(p->ref >= 1);
539
540 p->recieve_packet_callback = cb;
541 p->recieve_packet_callback_userdata = userdata;
542 }
543
544 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
545 assert(p);
546 assert(p->ref >= 1);
547
548 p->recieve_memblock_callback = cb;
549 p->recieve_memblock_callback_userdata = userdata;
550 }
551
552 int pa_pstream_is_pending(pa_pstream *p) {
553 assert(p);
554
555 if (p->dead)
556 return 0;
557
558 return p->write.current || !pa_queue_is_empty(p->send_queue);
559 }
560
561 void pa_pstream_unref(pa_pstream*p) {
562 assert(p);
563 assert(p->ref >= 1);
564
565 if (--p->ref == 0)
566 pstream_free(p);
567 }
568
569 pa_pstream* pa_pstream_ref(pa_pstream*p) {
570 assert(p);
571 assert(p->ref >= 1);
572
573 p->ref++;
574 return p;
575 }
576
577 void pa_pstream_close(pa_pstream *p) {
578 assert(p);
579
580 p->dead = 1;
581
582 if (p->io) {
583 pa_iochannel_free(p->io);
584 p->io = NULL;
585 }
586
587 if (p->defer_event) {
588 p->mainloop->defer_free(p->defer_event);
589 p->defer_event = NULL;
590 }
591
592 p->die_callback = NULL;
593 p->drain_callback = NULL;
594 p->recieve_packet_callback = NULL;
595 p->recieve_memblock_callback = NULL;
596 }