]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
Merge dead branch 'liboil-test'
[pulseaudio] / src / pulsecore / pstream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
33 #endif
34 #ifdef HAVE_SYS_UN_H
35 #include <sys/un.h>
36 #endif
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #endif
40
41
42 #include <pulse/xmalloc.h>
43
44 #include <pulsecore/winsock.h>
45 #include <pulsecore/queue.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-scache.h>
48 #include <pulsecore/creds.h>
49 #include <pulsecore/refcnt.h>
50 #include <pulsecore/flist.h>
51 #include <pulsecore/macro.h>
52
53 #include "pstream.h"
54
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA 0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE 0xC0000000LU
59 #define PA_FLAG_SHMMASK 0xFF000000LU
60 #define PA_FLAG_SEEKMASK 0x000000FFLU
61
62 /* The sequence descriptor header consists of 5 32bit integers: */
63 enum {
64 PA_PSTREAM_DESCRIPTOR_LENGTH,
65 PA_PSTREAM_DESCRIPTOR_CHANNEL,
66 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
68 PA_PSTREAM_DESCRIPTOR_FLAGS,
69 PA_PSTREAM_DESCRIPTOR_MAX
70 };
71
72 /* If we have an SHM block, this info follows the descriptor */
73 enum {
74 PA_PSTREAM_SHM_BLOCKID,
75 PA_PSTREAM_SHM_SHMID,
76 PA_PSTREAM_SHM_INDEX,
77 PA_PSTREAM_SHM_LENGTH,
78 PA_PSTREAM_SHM_MAX
79 };
80
81 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
82
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
85
86 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
87
88 struct item_info {
89 enum {
90 PA_PSTREAM_ITEM_PACKET,
91 PA_PSTREAM_ITEM_MEMBLOCK,
92 PA_PSTREAM_ITEM_SHMRELEASE,
93 PA_PSTREAM_ITEM_SHMREVOKE
94 } type;
95
96 /* packet info */
97 pa_packet *packet;
98 #ifdef HAVE_CREDS
99 pa_bool_t with_creds;
100 pa_creds creds;
101 #endif
102
103 /* memblock info */
104 pa_memchunk chunk;
105 uint32_t channel;
106 int64_t offset;
107 pa_seek_mode_t seek_mode;
108
109 /* release/revoke info */
110 uint32_t block_id;
111 };
112
113 struct pa_pstream {
114 PA_REFCNT_DECLARE;
115
116 pa_mainloop_api *mainloop;
117 pa_defer_event *defer_event;
118 pa_iochannel *io;
119
120 pa_queue *send_queue;
121
122 pa_bool_t dead;
123
124 struct {
125 pa_pstream_descriptor descriptor;
126 struct item_info* current;
127 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
128 void *data;
129 size_t index;
130 pa_memchunk memchunk;
131 } write;
132
133 struct {
134 pa_pstream_descriptor descriptor;
135 pa_memblock *memblock;
136 pa_packet *packet;
137 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
138 void *data;
139 size_t index;
140 } read;
141
142 pa_bool_t use_shm;
143 pa_memimport *import;
144 pa_memexport *export;
145
146 pa_pstream_packet_cb_t recieve_packet_callback;
147 void *recieve_packet_callback_userdata;
148
149 pa_pstream_memblock_cb_t recieve_memblock_callback;
150 void *recieve_memblock_callback_userdata;
151
152 pa_pstream_notify_cb_t drain_callback;
153 void *drain_callback_userdata;
154
155 pa_pstream_notify_cb_t die_callback;
156 void *die_callback_userdata;
157
158 pa_pstream_block_id_cb_t revoke_callback;
159 void *revoke_callback_userdata;
160
161 pa_pstream_block_id_cb_t release_callback;
162 void *release_callback_userdata;
163
164 pa_mempool *mempool;
165
166 #ifdef HAVE_CREDS
167 pa_creds read_creds, write_creds;
168 pa_bool_t read_creds_valid, send_creds_now;
169 #endif
170 };
171
172 static int do_write(pa_pstream *p);
173 static int do_read(pa_pstream *p);
174
175 static void do_something(pa_pstream *p) {
176 pa_assert(p);
177 pa_assert(PA_REFCNT_VALUE(p) > 0);
178
179 pa_pstream_ref(p);
180
181 p->mainloop->defer_enable(p->defer_event, 0);
182
183 if (!p->dead && pa_iochannel_is_readable(p->io)) {
184 if (do_read(p) < 0)
185 goto fail;
186 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
187 goto fail;
188
189 if (!p->dead && pa_iochannel_is_writable(p->io)) {
190 if (do_write(p) < 0)
191 goto fail;
192 }
193
194 pa_pstream_unref(p);
195 return;
196
197 fail:
198
199 if (p->die_callback)
200 p->die_callback(p, p->die_callback_userdata);
201
202 pa_pstream_unlink(p);
203 pa_pstream_unref(p);
204 }
205
206 static void io_callback(pa_iochannel*io, void *userdata) {
207 pa_pstream *p = userdata;
208
209 pa_assert(p);
210 pa_assert(PA_REFCNT_VALUE(p) > 0);
211 pa_assert(p->io == io);
212
213 do_something(p);
214 }
215
216 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
217 pa_pstream *p = userdata;
218
219 pa_assert(p);
220 pa_assert(PA_REFCNT_VALUE(p) > 0);
221 pa_assert(p->defer_event == e);
222 pa_assert(p->mainloop == m);
223
224 do_something(p);
225 }
226
227 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
228
229 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
230 pa_pstream *p;
231
232 pa_assert(m);
233 pa_assert(io);
234 pa_assert(pool);
235
236 p = pa_xnew(pa_pstream, 1);
237 PA_REFCNT_INIT(p);
238 p->io = io;
239 pa_iochannel_set_callback(io, io_callback, p);
240 p->dead = FALSE;
241
242 p->mainloop = m;
243 p->defer_event = m->defer_new(m, defer_callback, p);
244 m->defer_enable(p->defer_event, 0);
245
246 p->send_queue = pa_queue_new();
247
248 p->write.current = NULL;
249 p->write.index = 0;
250 pa_memchunk_reset(&p->write.memchunk);
251 p->read.memblock = NULL;
252 p->read.packet = NULL;
253 p->read.index = 0;
254
255 p->recieve_packet_callback = NULL;
256 p->recieve_packet_callback_userdata = NULL;
257 p->recieve_memblock_callback = NULL;
258 p->recieve_memblock_callback_userdata = NULL;
259 p->drain_callback = NULL;
260 p->drain_callback_userdata = NULL;
261 p->die_callback = NULL;
262 p->die_callback_userdata = NULL;
263 p->revoke_callback = NULL;
264 p->revoke_callback_userdata = NULL;
265 p->release_callback = NULL;
266 p->release_callback_userdata = NULL;
267
268 p->mempool = pool;
269
270 p->use_shm = FALSE;
271 p->export = NULL;
272
273 /* We do importing unconditionally */
274 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
275
276 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
277 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
278
279 #ifdef HAVE_CREDS
280 p->send_creds_now = FALSE;
281 p->read_creds_valid = FALSE;
282 #endif
283 return p;
284 }
285
286 static void item_free(void *item, PA_GCC_UNUSED void *q) {
287 struct item_info *i = item;
288 pa_assert(i);
289
290 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
291 pa_assert(i->chunk.memblock);
292 pa_memblock_unref(i->chunk.memblock);
293 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
294 pa_assert(i->packet);
295 pa_packet_unref(i->packet);
296 }
297
298 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
299 pa_xfree(i);
300 }
301
302 static void pstream_free(pa_pstream *p) {
303 pa_assert(p);
304
305 pa_pstream_unlink(p);
306
307 pa_queue_free(p->send_queue, item_free, NULL);
308
309 if (p->write.current)
310 item_free(p->write.current, NULL);
311
312 if (p->write.memchunk.memblock)
313 pa_memblock_unref(p->write.memchunk.memblock);
314
315 if (p->read.memblock)
316 pa_memblock_unref(p->read.memblock);
317
318 if (p->read.packet)
319 pa_packet_unref(p->read.packet);
320
321 pa_xfree(p);
322 }
323
324 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
325 struct item_info *i;
326
327 pa_assert(p);
328 pa_assert(PA_REFCNT_VALUE(p) > 0);
329 pa_assert(packet);
330
331 if (p->dead)
332 return;
333
334 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
335 i = pa_xnew(struct item_info, 1);
336
337 i->type = PA_PSTREAM_ITEM_PACKET;
338 i->packet = pa_packet_ref(packet);
339
340 #ifdef HAVE_CREDS
341 if ((i->with_creds = !!creds))
342 i->creds = *creds;
343 #endif
344
345 pa_queue_push(p->send_queue, i);
346
347 p->mainloop->defer_enable(p->defer_event, 1);
348 }
349
350 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
351 size_t length, idx;
352 size_t bsm;
353
354 pa_assert(p);
355 pa_assert(PA_REFCNT_VALUE(p) > 0);
356 pa_assert(channel != (uint32_t) -1);
357 pa_assert(chunk);
358
359 if (p->dead)
360 return;
361
362 idx = 0;
363 length = chunk->length;
364
365 bsm = pa_mempool_block_size_max(p->mempool);
366
367 while (length > 0) {
368 struct item_info *i;
369 size_t n;
370
371 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
372 i = pa_xnew(struct item_info, 1);
373 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
374
375 n = PA_MIN(length, bsm);
376 i->chunk.index = chunk->index + idx;
377 i->chunk.length = n;
378 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
379
380 i->channel = channel;
381 i->offset = offset;
382 i->seek_mode = seek_mode;
383 #ifdef HAVE_CREDS
384 i->with_creds = FALSE;
385 #endif
386
387 pa_queue_push(p->send_queue, i);
388
389 idx += n;
390 length -= n;
391 }
392
393 p->mainloop->defer_enable(p->defer_event, 1);
394 }
395
396 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
397 struct item_info *item;
398 pa_assert(p);
399 pa_assert(PA_REFCNT_VALUE(p) > 0);
400
401 if (p->dead)
402 return;
403
404 /* pa_log("Releasing block %u", block_id); */
405
406 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
407 item = pa_xnew(struct item_info, 1);
408 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
409 item->block_id = block_id;
410 #ifdef HAVE_CREDS
411 item->with_creds = FALSE;
412 #endif
413
414 pa_queue_push(p->send_queue, item);
415 p->mainloop->defer_enable(p->defer_event, 1);
416 }
417
418 /* might be called from thread context */
419 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
420 pa_pstream *p = userdata;
421
422 pa_assert(p);
423 pa_assert(PA_REFCNT_VALUE(p) > 0);
424
425 if (p->dead)
426 return;
427
428 if (p->release_callback)
429 p->release_callback(p, block_id, p->release_callback_userdata);
430 else
431 pa_pstream_send_release(p, block_id);
432 }
433
434 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
435 struct item_info *item;
436 pa_assert(p);
437 pa_assert(PA_REFCNT_VALUE(p) > 0);
438
439 if (p->dead)
440 return;
441 /* pa_log("Revoking block %u", block_id); */
442
443 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
444 item = pa_xnew(struct item_info, 1);
445 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
446 item->block_id = block_id;
447 #ifdef HAVE_CREDS
448 item->with_creds = FALSE;
449 #endif
450
451 pa_queue_push(p->send_queue, item);
452 p->mainloop->defer_enable(p->defer_event, 1);
453 }
454
455 /* might be called from thread context */
456 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
457 pa_pstream *p = userdata;
458
459 pa_assert(p);
460 pa_assert(PA_REFCNT_VALUE(p) > 0);
461
462 if (p->revoke_callback)
463 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
464 else
465 pa_pstream_send_revoke(p, block_id);
466 }
467
468 static void prepare_next_write_item(pa_pstream *p) {
469 pa_assert(p);
470 pa_assert(PA_REFCNT_VALUE(p) > 0);
471
472 p->write.current = pa_queue_pop(p->send_queue);
473
474 if (!p->write.current)
475 return;
476
477 p->write.index = 0;
478 p->write.data = NULL;
479 pa_memchunk_reset(&p->write.memchunk);
480
481 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
482 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
483 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
484 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
485 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
486
487 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
488
489 pa_assert(p->write.current->packet);
490 p->write.data = p->write.current->packet->data;
491 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
492
493 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
494
495 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
496 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
497
498 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
499
500 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
501 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
502
503 } else {
504 uint32_t flags;
505 pa_bool_t send_payload = TRUE;
506
507 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
508 pa_assert(p->write.current->chunk.memblock);
509
510 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
511 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
512 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
513
514 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
515
516 if (p->use_shm) {
517 uint32_t block_id, shm_id;
518 size_t offset, length;
519
520 pa_assert(p->export);
521
522 if (pa_memexport_put(p->export,
523 p->write.current->chunk.memblock,
524 &block_id,
525 &shm_id,
526 &offset,
527 &length) >= 0) {
528
529 flags |= PA_FLAG_SHMDATA;
530 send_payload = FALSE;
531
532 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
533 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
534 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
535 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
536
537 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
538 p->write.data = p->write.shm_info;
539 }
540 /* else */
541 /* pa_log_warn("Failed to export memory block."); */
542 }
543
544 if (send_payload) {
545 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
546 p->write.memchunk = p->write.current->chunk;
547 pa_memblock_ref(p->write.memchunk.memblock);
548 p->write.data = NULL;
549 }
550
551 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
552 }
553
554 #ifdef HAVE_CREDS
555 if ((p->send_creds_now = p->write.current->with_creds))
556 p->write_creds = p->write.current->creds;
557 #endif
558 }
559
560 static int do_write(pa_pstream *p) {
561 void *d;
562 size_t l;
563 ssize_t r;
564 pa_memblock *release_memblock = NULL;
565
566 pa_assert(p);
567 pa_assert(PA_REFCNT_VALUE(p) > 0);
568
569 if (!p->write.current)
570 prepare_next_write_item(p);
571
572 if (!p->write.current)
573 return 0;
574
575 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
576 d = (uint8_t*) p->write.descriptor + p->write.index;
577 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
578 } else {
579 pa_assert(p->write.data || p->write.memchunk.memblock);
580
581 if (p->write.data)
582 d = p->write.data;
583 else {
584 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
585 release_memblock = p->write.memchunk.memblock;
586 }
587
588 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
589 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
590 }
591
592 pa_assert(l > 0);
593
594 #ifdef HAVE_CREDS
595 if (p->send_creds_now) {
596
597 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
598 goto fail;
599
600 p->send_creds_now = FALSE;
601 } else
602 #endif
603
604 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
605 goto fail;
606
607 if (release_memblock)
608 pa_memblock_release(release_memblock);
609
610 p->write.index += r;
611
612 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
613 pa_assert(p->write.current);
614 item_free(p->write.current, NULL);
615 p->write.current = NULL;
616
617 if (p->write.memchunk.memblock)
618 pa_memblock_unref(p->write.memchunk.memblock);
619
620 pa_memchunk_reset(&p->write.memchunk);
621
622 if (p->drain_callback && !pa_pstream_is_pending(p))
623 p->drain_callback(p, p->drain_callback_userdata);
624 }
625
626 return 0;
627
628 fail:
629
630 if (release_memblock)
631 pa_memblock_release(release_memblock);
632
633 return -1;
634 }
635
636 static int do_read(pa_pstream *p) {
637 void *d;
638 size_t l;
639 ssize_t r;
640 pa_memblock *release_memblock = NULL;
641 pa_assert(p);
642 pa_assert(PA_REFCNT_VALUE(p) > 0);
643
644 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
645 d = (uint8_t*) p->read.descriptor + p->read.index;
646 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
647 } else {
648 pa_assert(p->read.data || p->read.memblock);
649
650 if (p->read.data)
651 d = p->read.data;
652 else {
653 d = pa_memblock_acquire(p->read.memblock);
654 release_memblock = p->read.memblock;
655 }
656
657 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
658 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
659 }
660
661 #ifdef HAVE_CREDS
662 {
663 pa_bool_t b = 0;
664
665 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
666 goto fail;
667
668 p->read_creds_valid = p->read_creds_valid || b;
669 }
670 #else
671 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
672 goto fail;
673 #endif
674
675 if (release_memblock)
676 pa_memblock_release(release_memblock);
677
678 p->read.index += r;
679
680 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
681 uint32_t flags, length, channel;
682 /* Reading of frame descriptor complete */
683
684 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
685
686 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
687 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
688 return -1;
689 }
690
691 if (flags == PA_FLAG_SHMRELEASE) {
692
693 /* This is a SHM memblock release frame with no payload */
694
695 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
696
697 pa_assert(p->export);
698 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
699
700 goto frame_done;
701
702 } else if (flags == PA_FLAG_SHMREVOKE) {
703
704 /* This is a SHM memblock revoke frame with no payload */
705
706 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
707
708 pa_assert(p->import);
709 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
710
711 goto frame_done;
712 }
713
714 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
715
716 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
717 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length);
718 return -1;
719 }
720
721 pa_assert(!p->read.packet && !p->read.memblock);
722
723 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
724
725 if (channel == (uint32_t) -1) {
726
727 if (flags != 0) {
728 pa_log_warn("Received packet frame with invalid flags value.");
729 return -1;
730 }
731
732 /* Frame is a packet frame */
733 p->read.packet = pa_packet_new(length);
734 p->read.data = p->read.packet->data;
735
736 } else {
737
738 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
739 pa_log_warn("Received memblock frame with invalid seek mode.");
740 return -1;
741 }
742
743 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
744
745 if (length != sizeof(p->read.shm_info)) {
746 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
747 return -1;
748 }
749
750 /* Frame is a memblock frame referencing an SHM memblock */
751 p->read.data = p->read.shm_info;
752
753 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
754
755 /* Frame is a memblock frame */
756
757 p->read.memblock = pa_memblock_new(p->mempool, length);
758 p->read.data = NULL;
759 } else {
760
761 pa_log_warn("Recieved memblock frame with invalid flags value.");
762 return -1;
763 }
764 }
765
766 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
767 /* Frame payload available */
768
769 if (p->read.memblock && p->recieve_memblock_callback) {
770
771 /* Is this memblock data? Than pass it to the user */
772 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
773
774 if (l > 0) {
775 pa_memchunk chunk;
776
777 chunk.memblock = p->read.memblock;
778 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
779 chunk.length = l;
780
781 if (p->recieve_memblock_callback) {
782 int64_t offset;
783
784 offset = (int64_t) (
785 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
786 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
787
788 p->recieve_memblock_callback(
789 p,
790 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
791 offset,
792 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
793 &chunk,
794 p->recieve_memblock_callback_userdata);
795 }
796
797 /* Drop seek info for following callbacks */
798 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
799 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
800 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
801 }
802 }
803
804 /* Frame complete */
805 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
806
807 if (p->read.memblock) {
808
809 /* This was a memblock frame. We can unref the memblock now */
810 pa_memblock_unref(p->read.memblock);
811
812 } else if (p->read.packet) {
813
814 if (p->recieve_packet_callback)
815 #ifdef HAVE_CREDS
816 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
817 #else
818 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
819 #endif
820
821 pa_packet_unref(p->read.packet);
822 } else {
823 pa_memblock *b;
824
825 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
826
827 pa_assert(p->import);
828
829 if (!(b = pa_memimport_get(p->import,
830 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
831 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
832 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
833 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
834
835 pa_log_warn("Failed to import memory block.");
836 return -1;
837 }
838
839 if (p->recieve_memblock_callback) {
840 int64_t offset;
841 pa_memchunk chunk;
842
843 chunk.memblock = b;
844 chunk.index = 0;
845 chunk.length = pa_memblock_get_length(b);
846
847 offset = (int64_t) (
848 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
849 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
850
851 p->recieve_memblock_callback(
852 p,
853 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
854 offset,
855 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
856 &chunk,
857 p->recieve_memblock_callback_userdata);
858 }
859
860 pa_memblock_unref(b);
861 }
862
863 goto frame_done;
864 }
865 }
866
867 return 0;
868
869 frame_done:
870 p->read.memblock = NULL;
871 p->read.packet = NULL;
872 p->read.index = 0;
873 p->read.data = NULL;
874
875 #ifdef HAVE_CREDS
876 p->read_creds_valid = FALSE;
877 #endif
878
879 return 0;
880
881 fail:
882 if (release_memblock)
883 pa_memblock_release(release_memblock);
884
885 return -1;
886 }
887
888 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
889 pa_assert(p);
890 pa_assert(PA_REFCNT_VALUE(p) > 0);
891
892 p->die_callback = cb;
893 p->die_callback_userdata = userdata;
894 }
895
896 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
897 pa_assert(p);
898 pa_assert(PA_REFCNT_VALUE(p) > 0);
899
900 p->drain_callback = cb;
901 p->drain_callback_userdata = userdata;
902 }
903
904 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
905 pa_assert(p);
906 pa_assert(PA_REFCNT_VALUE(p) > 0);
907
908 p->recieve_packet_callback = cb;
909 p->recieve_packet_callback_userdata = userdata;
910 }
911
912 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
913 pa_assert(p);
914 pa_assert(PA_REFCNT_VALUE(p) > 0);
915
916 p->recieve_memblock_callback = cb;
917 p->recieve_memblock_callback_userdata = userdata;
918 }
919
920 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
921 pa_assert(p);
922 pa_assert(PA_REFCNT_VALUE(p) > 0);
923
924 p->release_callback = cb;
925 p->release_callback_userdata = userdata;
926 }
927
928 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
929 pa_assert(p);
930 pa_assert(PA_REFCNT_VALUE(p) > 0);
931
932 p->release_callback = cb;
933 p->release_callback_userdata = userdata;
934 }
935
936 pa_bool_t pa_pstream_is_pending(pa_pstream *p) {
937 pa_bool_t b;
938
939 pa_assert(p);
940 pa_assert(PA_REFCNT_VALUE(p) > 0);
941
942 if (p->dead)
943 b = FALSE;
944 else
945 b = p->write.current || !pa_queue_is_empty(p->send_queue);
946
947 return b;
948 }
949
950 void pa_pstream_unref(pa_pstream*p) {
951 pa_assert(p);
952 pa_assert(PA_REFCNT_VALUE(p) > 0);
953
954 if (PA_REFCNT_DEC(p) <= 0)
955 pstream_free(p);
956 }
957
958 pa_pstream* pa_pstream_ref(pa_pstream*p) {
959 pa_assert(p);
960 pa_assert(PA_REFCNT_VALUE(p) > 0);
961
962 PA_REFCNT_INC(p);
963 return p;
964 }
965
966 void pa_pstream_unlink(pa_pstream *p) {
967 pa_assert(p);
968
969 if (p->dead)
970 return;
971
972 p->dead = TRUE;
973
974 if (p->import) {
975 pa_memimport_free(p->import);
976 p->import = NULL;
977 }
978
979 if (p->export) {
980 pa_memexport_free(p->export);
981 p->export = NULL;
982 }
983
984 if (p->io) {
985 pa_iochannel_free(p->io);
986 p->io = NULL;
987 }
988
989 if (p->defer_event) {
990 p->mainloop->defer_free(p->defer_event);
991 p->defer_event = NULL;
992 }
993
994 p->die_callback = NULL;
995 p->drain_callback = NULL;
996 p->recieve_packet_callback = NULL;
997 p->recieve_memblock_callback = NULL;
998 }
999
1000 void pa_pstream_enable_shm(pa_pstream *p, pa_bool_t enable) {
1001 pa_assert(p);
1002 pa_assert(PA_REFCNT_VALUE(p) > 0);
1003
1004 p->use_shm = enable;
1005
1006 if (enable) {
1007
1008 if (!p->export)
1009 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1010
1011 } else {
1012
1013 if (p->export) {
1014 pa_memexport_free(p->export);
1015 p->export = NULL;
1016 }
1017 }
1018 }
1019
1020 pa_bool_t pa_pstream_get_shm(pa_pstream *p) {
1021 pa_assert(p);
1022 pa_assert(PA_REFCNT_VALUE(p) > 0);
1023
1024 return p->use_shm;
1025 }