]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
merge 'lennart' branch back into trunk.
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
35 #endif
36 #ifdef HAVE_SYS_UN_H
37 #include <sys/un.h>
38 #endif
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
41 #endif
42
43
44 #include <pulse/xmalloc.h>
45
46 #include <pulsecore/winsock.h>
47 #include <pulsecore/queue.h>
48 #include <pulsecore/log.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/creds.h>
51 #include <pulsecore/refcnt.h>
52 #include <pulsecore/flist.h>
53 #include <pulsecore/macro.h>
54
55 #include "pstream.h"
56
57 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
58 #define PA_FLAG_SHMDATA 0x80000000LU
59 #define PA_FLAG_SHMRELEASE 0x40000000LU
60 #define PA_FLAG_SHMREVOKE 0xC0000000LU
61 #define PA_FLAG_SHMMASK 0xFF000000LU
62 #define PA_FLAG_SEEKMASK 0x000000FFLU
63
64 /* The sequence descriptor header consists of 5 32bit integers: */
65 enum {
66 PA_PSTREAM_DESCRIPTOR_LENGTH,
67 PA_PSTREAM_DESCRIPTOR_CHANNEL,
68 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
69 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
70 PA_PSTREAM_DESCRIPTOR_FLAGS,
71 PA_PSTREAM_DESCRIPTOR_MAX
72 };
73
74 /* If we have an SHM block, this info follows the descriptor */
75 enum {
76 PA_PSTREAM_SHM_BLOCKID,
77 PA_PSTREAM_SHM_SHMID,
78 PA_PSTREAM_SHM_INDEX,
79 PA_PSTREAM_SHM_LENGTH,
80 PA_PSTREAM_SHM_MAX
81 };
82
83 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
84
85 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
86 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
87
88 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
89
90 struct item_info {
91 enum {
92 PA_PSTREAM_ITEM_PACKET,
93 PA_PSTREAM_ITEM_MEMBLOCK,
94 PA_PSTREAM_ITEM_SHMRELEASE,
95 PA_PSTREAM_ITEM_SHMREVOKE
96 } type;
97
98 /* packet info */
99 pa_packet *packet;
100 #ifdef HAVE_CREDS
101 int with_creds;
102 pa_creds creds;
103 #endif
104
105 /* memblock info */
106 pa_memchunk chunk;
107 uint32_t channel;
108 int64_t offset;
109 pa_seek_mode_t seek_mode;
110
111 /* release/revoke info */
112 uint32_t block_id;
113 };
114
115 struct pa_pstream {
116 PA_REFCNT_DECLARE;
117
118 pa_mainloop_api *mainloop;
119 pa_defer_event *defer_event;
120 pa_iochannel *io;
121
122 pa_queue *send_queue;
123
124 int dead;
125
126 struct {
127 pa_pstream_descriptor descriptor;
128 struct item_info* current;
129 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
130 void *data;
131 size_t index;
132 pa_memchunk memchunk;
133 } write;
134
135 struct {
136 pa_pstream_descriptor descriptor;
137 pa_memblock *memblock;
138 pa_packet *packet;
139 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
140 void *data;
141 size_t index;
142 } read;
143
144 int use_shm;
145 pa_memimport *import;
146 pa_memexport *export;
147
148 pa_pstream_packet_cb_t recieve_packet_callback;
149 void *recieve_packet_callback_userdata;
150
151 pa_pstream_memblock_cb_t recieve_memblock_callback;
152 void *recieve_memblock_callback_userdata;
153
154 pa_pstream_notify_cb_t drain_callback;
155 void *drain_callback_userdata;
156
157 pa_pstream_notify_cb_t die_callback;
158 void *die_callback_userdata;
159
160 pa_pstream_block_id_cb_t revoke_callback;
161 void *revoke_callback_userdata;
162
163 pa_pstream_block_id_cb_t release_callback;
164 void *release_callback_userdata;
165
166 pa_mempool *mempool;
167
168 #ifdef HAVE_CREDS
169 pa_creds read_creds, write_creds;
170 int read_creds_valid, send_creds_now;
171 #endif
172 };
173
174 static int do_write(pa_pstream *p);
175 static int do_read(pa_pstream *p);
176
177 static void do_something(pa_pstream *p) {
178 pa_assert(p);
179 pa_assert(PA_REFCNT_VALUE(p) > 0);
180
181 pa_pstream_ref(p);
182
183 p->mainloop->defer_enable(p->defer_event, 0);
184
185 if (!p->dead && pa_iochannel_is_readable(p->io)) {
186 if (do_read(p) < 0)
187 goto fail;
188 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
189 goto fail;
190
191 if (!p->dead && pa_iochannel_is_writable(p->io)) {
192 if (do_write(p) < 0)
193 goto fail;
194 }
195
196 pa_pstream_unref(p);
197 return;
198
199 fail:
200
201 if (p->die_callback)
202 p->die_callback(p, p->die_callback_userdata);
203
204 pa_pstream_unlink(p);
205 pa_pstream_unref(p);
206 }
207
208 static void io_callback(pa_iochannel*io, void *userdata) {
209 pa_pstream *p = userdata;
210
211 pa_assert(p);
212 pa_assert(PA_REFCNT_VALUE(p) > 0);
213 pa_assert(p->io == io);
214
215 do_something(p);
216 }
217
218 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
219 pa_pstream *p = userdata;
220
221 pa_assert(p);
222 pa_assert(PA_REFCNT_VALUE(p) > 0);
223 pa_assert(p->defer_event == e);
224 pa_assert(p->mainloop == m);
225
226 do_something(p);
227 }
228
229 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
230
231 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
232 pa_pstream *p;
233
234 pa_assert(m);
235 pa_assert(io);
236 pa_assert(pool);
237
238 p = pa_xnew(pa_pstream, 1);
239 PA_REFCNT_INIT(p);
240 p->io = io;
241 pa_iochannel_set_callback(io, io_callback, p);
242 p->dead = 0;
243
244 p->mainloop = m;
245 p->defer_event = m->defer_new(m, defer_callback, p);
246 m->defer_enable(p->defer_event, 0);
247
248 p->send_queue = pa_queue_new();
249
250 p->write.current = NULL;
251 p->write.index = 0;
252 pa_memchunk_reset(&p->write.memchunk);
253 p->read.memblock = NULL;
254 p->read.packet = NULL;
255 p->read.index = 0;
256
257 p->recieve_packet_callback = NULL;
258 p->recieve_packet_callback_userdata = NULL;
259 p->recieve_memblock_callback = NULL;
260 p->recieve_memblock_callback_userdata = NULL;
261 p->drain_callback = NULL;
262 p->drain_callback_userdata = NULL;
263 p->die_callback = NULL;
264 p->die_callback_userdata = NULL;
265 p->revoke_callback = NULL;
266 p->revoke_callback_userdata = NULL;
267 p->release_callback = NULL;
268 p->release_callback_userdata = NULL;
269
270 p->mempool = pool;
271
272 p->use_shm = 0;
273 p->export = NULL;
274
275 /* We do importing unconditionally */
276 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
277
278 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
279 pa_iochannel_socket_set_sndbuf(io, 1024*8);
280
281 #ifdef HAVE_CREDS
282 p->send_creds_now = 0;
283 p->read_creds_valid = 0;
284 #endif
285 return p;
286 }
287
288 static void item_free(void *item, PA_GCC_UNUSED void *q) {
289 struct item_info *i = item;
290 pa_assert(i);
291
292 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
293 pa_assert(i->chunk.memblock);
294 pa_memblock_unref(i->chunk.memblock);
295 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
296 pa_assert(i->packet);
297 pa_packet_unref(i->packet);
298 }
299
300 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
301 pa_xfree(i);
302 }
303
304 static void pstream_free(pa_pstream *p) {
305 pa_assert(p);
306
307 pa_pstream_unlink(p);
308
309 pa_queue_free(p->send_queue, item_free, NULL);
310
311 if (p->write.current)
312 item_free(p->write.current, NULL);
313
314 if (p->write.memchunk.memblock)
315 pa_memblock_unref(p->write.memchunk.memblock);
316
317 if (p->read.memblock)
318 pa_memblock_unref(p->read.memblock);
319
320 if (p->read.packet)
321 pa_packet_unref(p->read.packet);
322
323 pa_xfree(p);
324 }
325
326 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
327 struct item_info *i;
328
329 pa_assert(p);
330 pa_assert(PA_REFCNT_VALUE(p) > 0);
331 pa_assert(packet);
332
333 if (p->dead)
334 return;
335
336 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
337 i = pa_xnew(struct item_info, 1);
338
339 i->type = PA_PSTREAM_ITEM_PACKET;
340 i->packet = pa_packet_ref(packet);
341
342 #ifdef HAVE_CREDS
343 if ((i->with_creds = !!creds))
344 i->creds = *creds;
345 #endif
346
347 pa_queue_push(p->send_queue, i);
348
349 p->mainloop->defer_enable(p->defer_event, 1);
350 }
351
352 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
353 size_t length, idx;
354 size_t bsm;
355
356 pa_assert(p);
357 pa_assert(PA_REFCNT_VALUE(p) > 0);
358 pa_assert(channel != (uint32_t) -1);
359 pa_assert(chunk);
360
361 if (p->dead)
362 return;
363
364 idx = 0;
365 length = chunk->length;
366
367 bsm = pa_mempool_block_size_max(p->mempool);
368
369 while (length > 0) {
370 struct item_info *i;
371 size_t n;
372
373 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
374 i = pa_xnew(struct item_info, 1);
375 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
376
377 n = MIN(length, bsm);
378 i->chunk.index = chunk->index + idx;
379 i->chunk.length = n;
380 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
381
382 i->channel = channel;
383 i->offset = offset;
384 i->seek_mode = seek_mode;
385 #ifdef HAVE_CREDS
386 i->with_creds = 0;
387 #endif
388
389 pa_queue_push(p->send_queue, i);
390
391 idx += n;
392 length -= n;
393 }
394
395 p->mainloop->defer_enable(p->defer_event, 1);
396 }
397
398 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
399 struct item_info *item;
400 pa_assert(p);
401 pa_assert(PA_REFCNT_VALUE(p) > 0);
402
403 if (p->dead)
404 return;
405
406 /* pa_log("Releasing block %u", block_id); */
407
408 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
409 item = pa_xnew(struct item_info, 1);
410 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
411 item->block_id = block_id;
412 #ifdef HAVE_CREDS
413 item->with_creds = 0;
414 #endif
415
416 pa_queue_push(p->send_queue, item);
417 p->mainloop->defer_enable(p->defer_event, 1);
418 }
419
420 /* might be called from thread context */
421 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
422 pa_pstream *p = userdata;
423
424 pa_assert(p);
425 pa_assert(PA_REFCNT_VALUE(p) > 0);
426
427 if (p->dead)
428 return;
429
430 if (p->release_callback)
431 p->release_callback(p, block_id, p->release_callback_userdata);
432 else
433 pa_pstream_send_release(p, block_id);
434 }
435
436 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
437 struct item_info *item;
438 pa_assert(p);
439 pa_assert(PA_REFCNT_VALUE(p) > 0);
440
441 if (p->dead)
442 return;
443 /* pa_log("Revoking block %u", block_id); */
444
445 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
446 item = pa_xnew(struct item_info, 1);
447 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
448 item->block_id = block_id;
449 #ifdef HAVE_CREDS
450 item->with_creds = 0;
451 #endif
452
453 pa_queue_push(p->send_queue, item);
454 p->mainloop->defer_enable(p->defer_event, 1);
455 }
456
457 /* might be called from thread context */
458 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
459 pa_pstream *p = userdata;
460
461 pa_assert(p);
462 pa_assert(PA_REFCNT_VALUE(p) > 0);
463
464 if (p->revoke_callback)
465 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
466 else
467 pa_pstream_send_revoke(p, block_id);
468 }
469
470 static void prepare_next_write_item(pa_pstream *p) {
471 pa_assert(p);
472 pa_assert(PA_REFCNT_VALUE(p) > 0);
473
474 p->write.current = pa_queue_pop(p->send_queue);
475
476 if (!p->write.current)
477 return;
478
479 p->write.index = 0;
480 p->write.data = NULL;
481 pa_memchunk_reset(&p->write.memchunk);
482
483 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
484 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
485 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
486 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
487 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
488
489 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
490
491 pa_assert(p->write.current->packet);
492 p->write.data = p->write.current->packet->data;
493 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
494
495 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
496
497 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
498 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
499
500 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
501
502 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
503 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
504
505 } else {
506 uint32_t flags;
507 int send_payload = 1;
508
509 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
510 pa_assert(p->write.current->chunk.memblock);
511
512 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
513 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
514 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
515
516 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
517
518 if (p->use_shm) {
519 uint32_t block_id, shm_id;
520 size_t offset, length;
521
522 pa_assert(p->export);
523
524 if (pa_memexport_put(p->export,
525 p->write.current->chunk.memblock,
526 &block_id,
527 &shm_id,
528 &offset,
529 &length) >= 0) {
530
531 flags |= PA_FLAG_SHMDATA;
532 send_payload = 0;
533
534 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
535 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
536 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
537 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
538
539 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
540 p->write.data = p->write.shm_info;
541 }
542 /* else */
543 /* pa_log_warn("Failed to export memory block."); */
544 }
545
546 if (send_payload) {
547 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
548 p->write.memchunk = p->write.current->chunk;
549 pa_memblock_ref(p->write.memchunk.memblock);
550 p->write.data = NULL;
551 }
552
553 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
554 }
555
556 #ifdef HAVE_CREDS
557 if ((p->send_creds_now = p->write.current->with_creds))
558 p->write_creds = p->write.current->creds;
559 #endif
560 }
561
562 static int do_write(pa_pstream *p) {
563 void *d;
564 size_t l;
565 ssize_t r;
566 pa_memblock *release_memblock = NULL;
567
568 pa_assert(p);
569 pa_assert(PA_REFCNT_VALUE(p) > 0);
570
571 if (!p->write.current)
572 prepare_next_write_item(p);
573
574 if (!p->write.current)
575 return 0;
576
577 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
578 d = (uint8_t*) p->write.descriptor + p->write.index;
579 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
580 } else {
581 pa_assert(p->write.data || p->write.memchunk.memblock);
582
583 if (p->write.data)
584 d = p->write.data;
585 else {
586 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
587 release_memblock = p->write.memchunk.memblock;
588 }
589
590 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
591 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
592 }
593
594 pa_assert(l > 0);
595
596 #ifdef HAVE_CREDS
597 if (p->send_creds_now) {
598
599 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
600 goto fail;
601
602 p->send_creds_now = 0;
603 } else
604 #endif
605
606 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
607 goto fail;
608
609 if (release_memblock)
610 pa_memblock_release(release_memblock);
611
612 p->write.index += r;
613
614 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
615 pa_assert(p->write.current);
616 item_free(p->write.current, NULL);
617 p->write.current = NULL;
618
619 if (p->write.memchunk.memblock)
620 pa_memblock_unref(p->write.memchunk.memblock);
621
622 pa_memchunk_reset(&p->write.memchunk);
623
624 if (p->drain_callback && !pa_pstream_is_pending(p))
625 p->drain_callback(p, p->drain_callback_userdata);
626 }
627
628 return 0;
629
630 fail:
631
632 if (release_memblock)
633 pa_memblock_release(release_memblock);
634
635 return -1;
636 }
637
638 static int do_read(pa_pstream *p) {
639 void *d;
640 size_t l;
641 ssize_t r;
642 pa_memblock *release_memblock = NULL;
643 pa_assert(p);
644 pa_assert(PA_REFCNT_VALUE(p) > 0);
645
646 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
647 d = (uint8_t*) p->read.descriptor + p->read.index;
648 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
649 } else {
650 pa_assert(p->read.data || p->read.memblock);
651
652 if (p->read.data)
653 d = p->read.data;
654 else {
655 d = pa_memblock_acquire(p->read.memblock);
656 release_memblock = p->read.memblock;
657 }
658
659 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
660 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
661 }
662
663 #ifdef HAVE_CREDS
664 {
665 pa_bool_t b = 0;
666
667 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
668 goto fail;
669
670 p->read_creds_valid = p->read_creds_valid || b;
671 }
672 #else
673 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
674 goto fail;
675 #endif
676
677 if (release_memblock)
678 pa_memblock_release(release_memblock);
679
680 p->read.index += r;
681
682 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
683 uint32_t flags, length, channel;
684 /* Reading of frame descriptor complete */
685
686 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
687
688 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
689 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
690 return -1;
691 }
692
693 if (flags == PA_FLAG_SHMRELEASE) {
694
695 /* This is a SHM memblock release frame with no payload */
696
697 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
698
699 pa_assert(p->export);
700 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
701
702 goto frame_done;
703
704 } else if (flags == PA_FLAG_SHMREVOKE) {
705
706 /* This is a SHM memblock revoke frame with no payload */
707
708 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
709
710 pa_assert(p->import);
711 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
712
713 goto frame_done;
714 }
715
716 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
717
718 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
719 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length);
720 return -1;
721 }
722
723 pa_assert(!p->read.packet && !p->read.memblock);
724
725 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
726
727 if (channel == (uint32_t) -1) {
728
729 if (flags != 0) {
730 pa_log_warn("Received packet frame with invalid flags value.");
731 return -1;
732 }
733
734 /* Frame is a packet frame */
735 p->read.packet = pa_packet_new(length);
736 p->read.data = p->read.packet->data;
737
738 } else {
739
740 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
741 pa_log_warn("Received memblock frame with invalid seek mode.");
742 return -1;
743 }
744
745 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
746
747 if (length != sizeof(p->read.shm_info)) {
748 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
749 return -1;
750 }
751
752 /* Frame is a memblock frame referencing an SHM memblock */
753 p->read.data = p->read.shm_info;
754
755 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
756
757 /* Frame is a memblock frame */
758
759 p->read.memblock = pa_memblock_new(p->mempool, length);
760 p->read.data = NULL;
761 } else {
762
763 pa_log_warn("Recieved memblock frame with invalid flags value.");
764 return -1;
765 }
766 }
767
768 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
769 /* Frame payload available */
770
771 if (p->read.memblock && p->recieve_memblock_callback) {
772
773 /* Is this memblock data? Than pass it to the user */
774 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
775
776 if (l > 0) {
777 pa_memchunk chunk;
778
779 chunk.memblock = p->read.memblock;
780 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
781 chunk.length = l;
782
783 if (p->recieve_memblock_callback) {
784 int64_t offset;
785
786 offset = (int64_t) (
787 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
788 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
789
790 p->recieve_memblock_callback(
791 p,
792 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
793 offset,
794 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
795 &chunk,
796 p->recieve_memblock_callback_userdata);
797 }
798
799 /* Drop seek info for following callbacks */
800 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
801 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
802 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
803 }
804 }
805
806 /* Frame complete */
807 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
808
809 if (p->read.memblock) {
810
811 /* This was a memblock frame. We can unref the memblock now */
812 pa_memblock_unref(p->read.memblock);
813
814 } else if (p->read.packet) {
815
816 if (p->recieve_packet_callback)
817 #ifdef HAVE_CREDS
818 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
819 #else
820 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
821 #endif
822
823 pa_packet_unref(p->read.packet);
824 } else {
825 pa_memblock *b;
826
827 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
828
829 pa_assert(p->import);
830
831 if (!(b = pa_memimport_get(p->import,
832 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
833 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
834 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
835 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
836
837 pa_log_warn("Failed to import memory block.");
838 return -1;
839 }
840
841 if (p->recieve_memblock_callback) {
842 int64_t offset;
843 pa_memchunk chunk;
844
845 chunk.memblock = b;
846 chunk.index = 0;
847 chunk.length = pa_memblock_get_length(b);
848
849 offset = (int64_t) (
850 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
851 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
852
853 p->recieve_memblock_callback(
854 p,
855 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
856 offset,
857 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
858 &chunk,
859 p->recieve_memblock_callback_userdata);
860 }
861
862 pa_memblock_unref(b);
863 }
864
865 goto frame_done;
866 }
867 }
868
869 return 0;
870
871 frame_done:
872 p->read.memblock = NULL;
873 p->read.packet = NULL;
874 p->read.index = 0;
875 p->read.data = NULL;
876
877 #ifdef HAVE_CREDS
878 p->read_creds_valid = 0;
879 #endif
880
881 return 0;
882
883 fail:
884 if (release_memblock)
885 pa_memblock_release(release_memblock);
886
887 return -1;
888 }
889
890 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
891 pa_assert(p);
892 pa_assert(PA_REFCNT_VALUE(p) > 0);
893
894 p->die_callback = cb;
895 p->die_callback_userdata = userdata;
896 }
897
898 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
899 pa_assert(p);
900 pa_assert(PA_REFCNT_VALUE(p) > 0);
901
902 p->drain_callback = cb;
903 p->drain_callback_userdata = userdata;
904 }
905
906 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
907 pa_assert(p);
908 pa_assert(PA_REFCNT_VALUE(p) > 0);
909
910 p->recieve_packet_callback = cb;
911 p->recieve_packet_callback_userdata = userdata;
912 }
913
914 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
915 pa_assert(p);
916 pa_assert(PA_REFCNT_VALUE(p) > 0);
917
918 p->recieve_memblock_callback = cb;
919 p->recieve_memblock_callback_userdata = userdata;
920 }
921
922 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
923 pa_assert(p);
924 pa_assert(PA_REFCNT_VALUE(p) > 0);
925
926 p->release_callback = cb;
927 p->release_callback_userdata = userdata;
928 }
929
930 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
931 pa_assert(p);
932 pa_assert(PA_REFCNT_VALUE(p) > 0);
933
934 p->release_callback = cb;
935 p->release_callback_userdata = userdata;
936 }
937
938 int pa_pstream_is_pending(pa_pstream *p) {
939 int b;
940
941 pa_assert(p);
942 pa_assert(PA_REFCNT_VALUE(p) > 0);
943
944 if (p->dead)
945 b = 0;
946 else
947 b = p->write.current || !pa_queue_is_empty(p->send_queue);
948
949 return b;
950 }
951
952 void pa_pstream_unref(pa_pstream*p) {
953 pa_assert(p);
954 pa_assert(PA_REFCNT_VALUE(p) > 0);
955
956 if (PA_REFCNT_DEC(p) <= 0)
957 pstream_free(p);
958 }
959
960 pa_pstream* pa_pstream_ref(pa_pstream*p) {
961 pa_assert(p);
962 pa_assert(PA_REFCNT_VALUE(p) > 0);
963
964 PA_REFCNT_INC(p);
965 return p;
966 }
967
968 void pa_pstream_unlink(pa_pstream *p) {
969 pa_assert(p);
970
971 if (p->dead)
972 return;
973
974 p->dead = 1;
975
976 if (p->import) {
977 pa_memimport_free(p->import);
978 p->import = NULL;
979 }
980
981 if (p->export) {
982 pa_memexport_free(p->export);
983 p->export = NULL;
984 }
985
986 if (p->io) {
987 pa_iochannel_free(p->io);
988 p->io = NULL;
989 }
990
991 if (p->defer_event) {
992 p->mainloop->defer_free(p->defer_event);
993 p->defer_event = NULL;
994 }
995
996 p->die_callback = NULL;
997 p->drain_callback = NULL;
998 p->recieve_packet_callback = NULL;
999 p->recieve_memblock_callback = NULL;
1000 }
1001
1002 void pa_pstream_use_shm(pa_pstream *p, int enable) {
1003 pa_assert(p);
1004 pa_assert(PA_REFCNT_VALUE(p) > 0);
1005
1006 p->use_shm = enable;
1007
1008 if (enable) {
1009
1010 if (!p->export)
1011 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1012
1013 } else {
1014
1015 if (p->export) {
1016 pa_memexport_free(p->export);
1017 p->export = NULL;
1018 }
1019 }
1020 }