]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
190fc9a33a4045cf15520054658fe7ae14000358
[pulseaudio] / src / pulsecore / pstream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30
31 #ifdef HAVE_NETINET_IN_H
32 #include <netinet/in.h>
33 #endif
34
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/socket.h>
38 #include <pulsecore/queue.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/creds.h>
41 #include <pulsecore/refcnt.h>
42 #include <pulsecore/flist.h>
43 #include <pulsecore/macro.h>
44
45 #include "pstream.h"
46
47 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
48 #define PA_FLAG_SHMDATA 0x80000000LU
49 #define PA_FLAG_SHMRELEASE 0x40000000LU
50 #define PA_FLAG_SHMREVOKE 0xC0000000LU
51 #define PA_FLAG_SHMMASK 0xFF000000LU
52 #define PA_FLAG_SEEKMASK 0x000000FFLU
53
54 /* The sequence descriptor header consists of 5 32bit integers: */
55 enum {
56 PA_PSTREAM_DESCRIPTOR_LENGTH,
57 PA_PSTREAM_DESCRIPTOR_CHANNEL,
58 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
59 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
60 PA_PSTREAM_DESCRIPTOR_FLAGS,
61 PA_PSTREAM_DESCRIPTOR_MAX
62 };
63
64 /* If we have an SHM block, this info follows the descriptor */
65 enum {
66 PA_PSTREAM_SHM_BLOCKID,
67 PA_PSTREAM_SHM_SHMID,
68 PA_PSTREAM_SHM_INDEX,
69 PA_PSTREAM_SHM_LENGTH,
70 PA_PSTREAM_SHM_MAX
71 };
72
73 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
74
75 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
76
77 #define MINIBUF_SIZE (256)
78
79 /* To allow uploading a single sample in one frame, this value should be the
80 * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
81 */
82 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
83
84 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
85
86 struct item_info {
87 enum {
88 PA_PSTREAM_ITEM_PACKET,
89 PA_PSTREAM_ITEM_MEMBLOCK,
90 PA_PSTREAM_ITEM_SHMRELEASE,
91 PA_PSTREAM_ITEM_SHMREVOKE
92 } type;
93
94 /* packet info */
95 pa_packet *packet;
96 #ifdef HAVE_CREDS
97 bool with_creds;
98 pa_creds creds;
99 #endif
100
101 /* memblock info */
102 pa_memchunk chunk;
103 uint32_t channel;
104 int64_t offset;
105 pa_seek_mode_t seek_mode;
106
107 /* release/revoke info */
108 uint32_t block_id;
109 };
110
111 struct pa_pstream {
112 PA_REFCNT_DECLARE;
113
114 pa_mainloop_api *mainloop;
115 pa_defer_event *defer_event;
116 pa_iochannel *io;
117
118 pa_queue *send_queue;
119
120 bool dead;
121
122 struct {
123 union {
124 uint8_t minibuf[MINIBUF_SIZE];
125 pa_pstream_descriptor descriptor;
126 };
127 struct item_info* current;
128 void *data;
129 size_t index;
130 int minibuf_validsize;
131 pa_memchunk memchunk;
132 } write;
133
134 struct {
135 pa_pstream_descriptor descriptor;
136 pa_memblock *memblock;
137 pa_packet *packet;
138 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
139 void *data;
140 size_t index;
141 } read;
142
143 bool use_shm;
144 pa_memimport *import;
145 pa_memexport *export;
146
147 pa_pstream_packet_cb_t receive_packet_callback;
148 void *receive_packet_callback_userdata;
149
150 pa_pstream_memblock_cb_t receive_memblock_callback;
151 void *receive_memblock_callback_userdata;
152
153 pa_pstream_notify_cb_t drain_callback;
154 void *drain_callback_userdata;
155
156 pa_pstream_notify_cb_t die_callback;
157 void *die_callback_userdata;
158
159 pa_pstream_block_id_cb_t revoke_callback;
160 void *revoke_callback_userdata;
161
162 pa_pstream_block_id_cb_t release_callback;
163 void *release_callback_userdata;
164
165 pa_mempool *mempool;
166
167 #ifdef HAVE_CREDS
168 pa_creds read_creds, write_creds;
169 bool read_creds_valid, send_creds_now;
170 #endif
171 };
172
173 static int do_write(pa_pstream *p);
174 static int do_read(pa_pstream *p);
175
176 static void do_pstream_read_write(pa_pstream *p) {
177 pa_assert(p);
178 pa_assert(PA_REFCNT_VALUE(p) > 0);
179
180 pa_pstream_ref(p);
181
182 p->mainloop->defer_enable(p->defer_event, 0);
183
184 if (!p->dead && pa_iochannel_is_readable(p->io)) {
185 if (do_read(p) < 0)
186 goto fail;
187 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
188 goto fail;
189
190 while (!p->dead && pa_iochannel_is_writable(p->io)) {
191 int r = do_write(p);
192 if (r < 0)
193 goto fail;
194 if (r == 0)
195 break;
196 }
197
198 pa_pstream_unref(p);
199 return;
200
201 fail:
202
203 if (p->die_callback)
204 p->die_callback(p, p->die_callback_userdata);
205
206 pa_pstream_unlink(p);
207 pa_pstream_unref(p);
208 }
209
210 static void io_callback(pa_iochannel*io, void *userdata) {
211 pa_pstream *p = userdata;
212
213 pa_assert(p);
214 pa_assert(PA_REFCNT_VALUE(p) > 0);
215 pa_assert(p->io == io);
216
217 do_pstream_read_write(p);
218 }
219
220 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
221 pa_pstream *p = userdata;
222
223 pa_assert(p);
224 pa_assert(PA_REFCNT_VALUE(p) > 0);
225 pa_assert(p->defer_event == e);
226 pa_assert(p->mainloop == m);
227
228 do_pstream_read_write(p);
229 }
230
231 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
232
233 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
234 pa_pstream *p;
235
236 pa_assert(m);
237 pa_assert(io);
238 pa_assert(pool);
239
240 p = pa_xnew(pa_pstream, 1);
241 PA_REFCNT_INIT(p);
242 p->io = io;
243 pa_iochannel_set_callback(io, io_callback, p);
244 p->dead = false;
245
246 p->mainloop = m;
247 p->defer_event = m->defer_new(m, defer_callback, p);
248 m->defer_enable(p->defer_event, 0);
249
250 p->send_queue = pa_queue_new();
251
252 p->write.current = NULL;
253 p->write.index = 0;
254 pa_memchunk_reset(&p->write.memchunk);
255 p->read.memblock = NULL;
256 p->read.packet = NULL;
257 p->read.index = 0;
258
259 p->receive_packet_callback = NULL;
260 p->receive_packet_callback_userdata = NULL;
261 p->receive_memblock_callback = NULL;
262 p->receive_memblock_callback_userdata = NULL;
263 p->drain_callback = NULL;
264 p->drain_callback_userdata = NULL;
265 p->die_callback = NULL;
266 p->die_callback_userdata = NULL;
267 p->revoke_callback = NULL;
268 p->revoke_callback_userdata = NULL;
269 p->release_callback = NULL;
270 p->release_callback_userdata = NULL;
271
272 p->mempool = pool;
273
274 p->use_shm = false;
275 p->export = NULL;
276
277 /* We do importing unconditionally */
278 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
279
280 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
281 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
282
283 #ifdef HAVE_CREDS
284 p->send_creds_now = false;
285 p->read_creds_valid = false;
286 #endif
287 return p;
288 }
289
290 static void item_free(void *item) {
291 struct item_info *i = item;
292 pa_assert(i);
293
294 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
295 pa_assert(i->chunk.memblock);
296 pa_memblock_unref(i->chunk.memblock);
297 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
298 pa_assert(i->packet);
299 pa_packet_unref(i->packet);
300 }
301
302 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
303 pa_xfree(i);
304 }
305
306 static void pstream_free(pa_pstream *p) {
307 pa_assert(p);
308
309 pa_pstream_unlink(p);
310
311 pa_queue_free(p->send_queue, item_free);
312
313 if (p->write.current)
314 item_free(p->write.current);
315
316 if (p->write.memchunk.memblock)
317 pa_memblock_unref(p->write.memchunk.memblock);
318
319 if (p->read.memblock)
320 pa_memblock_unref(p->read.memblock);
321
322 if (p->read.packet)
323 pa_packet_unref(p->read.packet);
324
325 pa_xfree(p);
326 }
327
328 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
329 struct item_info *i;
330
331 pa_assert(p);
332 pa_assert(PA_REFCNT_VALUE(p) > 0);
333 pa_assert(packet);
334
335 if (p->dead)
336 return;
337
338 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
339 i = pa_xnew(struct item_info, 1);
340
341 i->type = PA_PSTREAM_ITEM_PACKET;
342 i->packet = pa_packet_ref(packet);
343
344 #ifdef HAVE_CREDS
345 if ((i->with_creds = !!creds))
346 i->creds = *creds;
347 #endif
348
349 pa_queue_push(p->send_queue, i);
350
351 p->mainloop->defer_enable(p->defer_event, 1);
352 }
353
354 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
355 size_t length, idx;
356 size_t bsm;
357
358 pa_assert(p);
359 pa_assert(PA_REFCNT_VALUE(p) > 0);
360 pa_assert(channel != (uint32_t) -1);
361 pa_assert(chunk);
362
363 if (p->dead)
364 return;
365
366 idx = 0;
367 length = chunk->length;
368
369 bsm = pa_mempool_block_size_max(p->mempool);
370
371 while (length > 0) {
372 struct item_info *i;
373 size_t n;
374
375 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
376 i = pa_xnew(struct item_info, 1);
377 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
378
379 n = PA_MIN(length, bsm);
380 i->chunk.index = chunk->index + idx;
381 i->chunk.length = n;
382 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
383
384 i->channel = channel;
385 i->offset = offset;
386 i->seek_mode = seek_mode;
387 #ifdef HAVE_CREDS
388 i->with_creds = false;
389 #endif
390
391 pa_queue_push(p->send_queue, i);
392
393 idx += n;
394 length -= n;
395 }
396
397 p->mainloop->defer_enable(p->defer_event, 1);
398 }
399
400 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
401 struct item_info *item;
402 pa_assert(p);
403 pa_assert(PA_REFCNT_VALUE(p) > 0);
404
405 if (p->dead)
406 return;
407
408 /* pa_log("Releasing block %u", block_id); */
409
410 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
411 item = pa_xnew(struct item_info, 1);
412 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
413 item->block_id = block_id;
414 #ifdef HAVE_CREDS
415 item->with_creds = false;
416 #endif
417
418 pa_queue_push(p->send_queue, item);
419 p->mainloop->defer_enable(p->defer_event, 1);
420 }
421
422 /* might be called from thread context */
423 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
424 pa_pstream *p = userdata;
425
426 pa_assert(p);
427 pa_assert(PA_REFCNT_VALUE(p) > 0);
428
429 if (p->dead)
430 return;
431
432 if (p->release_callback)
433 p->release_callback(p, block_id, p->release_callback_userdata);
434 else
435 pa_pstream_send_release(p, block_id);
436 }
437
438 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
439 struct item_info *item;
440 pa_assert(p);
441 pa_assert(PA_REFCNT_VALUE(p) > 0);
442
443 if (p->dead)
444 return;
445 /* pa_log("Revoking block %u", block_id); */
446
447 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
448 item = pa_xnew(struct item_info, 1);
449 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
450 item->block_id = block_id;
451 #ifdef HAVE_CREDS
452 item->with_creds = false;
453 #endif
454
455 pa_queue_push(p->send_queue, item);
456 p->mainloop->defer_enable(p->defer_event, 1);
457 }
458
459 /* might be called from thread context */
460 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
461 pa_pstream *p = userdata;
462
463 pa_assert(p);
464 pa_assert(PA_REFCNT_VALUE(p) > 0);
465
466 if (p->revoke_callback)
467 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
468 else
469 pa_pstream_send_revoke(p, block_id);
470 }
471
472 static void prepare_next_write_item(pa_pstream *p) {
473 pa_assert(p);
474 pa_assert(PA_REFCNT_VALUE(p) > 0);
475
476 p->write.current = pa_queue_pop(p->send_queue);
477
478 if (!p->write.current)
479 return;
480 p->write.index = 0;
481 p->write.data = NULL;
482 p->write.minibuf_validsize = 0;
483 pa_memchunk_reset(&p->write.memchunk);
484
485 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
486 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
487 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
488 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
489 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
490
491 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
492
493 pa_assert(p->write.current->packet);
494 p->write.data = p->write.current->packet->data;
495 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->packet->length);
496
497 if (p->write.current->packet->length <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
498 memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, p->write.current->packet->length);
499 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + p->write.current->packet->length;
500 }
501
502 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
503
504 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
505 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
506
507 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
508
509 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
510 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
511
512 } else {
513 uint32_t flags;
514 bool send_payload = true;
515
516 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
517 pa_assert(p->write.current->chunk.memblock);
518
519 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
520 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
521 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
522
523 flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
524
525 if (p->use_shm) {
526 uint32_t block_id, shm_id;
527 size_t offset, length;
528 uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
529 size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
530
531 pa_assert(p->export);
532
533 if (pa_memexport_put(p->export,
534 p->write.current->chunk.memblock,
535 &block_id,
536 &shm_id,
537 &offset,
538 &length) >= 0) {
539
540 flags |= PA_FLAG_SHMDATA;
541 send_payload = false;
542
543 shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
544 shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
545 shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
546 shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
547
548 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
549 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
550 }
551 /* else */
552 /* pa_log_warn("Failed to export memory block."); */
553 }
554
555 if (send_payload) {
556 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
557 p->write.memchunk = p->write.current->chunk;
558 pa_memblock_ref(p->write.memchunk.memblock);
559 p->write.data = NULL;
560 }
561
562 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
563 }
564
565 #ifdef HAVE_CREDS
566 if ((p->send_creds_now = p->write.current->with_creds))
567 p->write_creds = p->write.current->creds;
568 #endif
569 }
570
571 static int do_write(pa_pstream *p) {
572 void *d;
573 size_t l;
574 ssize_t r;
575 pa_memblock *release_memblock = NULL;
576
577 pa_assert(p);
578 pa_assert(PA_REFCNT_VALUE(p) > 0);
579
580 if (!p->write.current)
581 prepare_next_write_item(p);
582
583 if (!p->write.current)
584 return 0;
585
586 if (p->write.minibuf_validsize > 0) {
587 d = p->write.minibuf + p->write.index;
588 l = p->write.minibuf_validsize - p->write.index;
589 } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
590 d = (uint8_t*) p->write.descriptor + p->write.index;
591 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
592 } else {
593 pa_assert(p->write.data || p->write.memchunk.memblock);
594
595 if (p->write.data)
596 d = p->write.data;
597 else {
598 d = pa_memblock_acquire_chunk(&p->write.memchunk);
599 release_memblock = p->write.memchunk.memblock;
600 }
601
602 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
603 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
604 }
605
606 pa_assert(l > 0);
607
608 #ifdef HAVE_CREDS
609 if (p->send_creds_now) {
610
611 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
612 goto fail;
613
614 p->send_creds_now = false;
615 } else
616 #endif
617
618 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
619 goto fail;
620
621 if (release_memblock)
622 pa_memblock_release(release_memblock);
623
624 p->write.index += (size_t) r;
625
626 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
627 pa_assert(p->write.current);
628 item_free(p->write.current);
629 p->write.current = NULL;
630
631 if (p->write.memchunk.memblock)
632 pa_memblock_unref(p->write.memchunk.memblock);
633
634 pa_memchunk_reset(&p->write.memchunk);
635
636 if (p->drain_callback && !pa_pstream_is_pending(p))
637 p->drain_callback(p, p->drain_callback_userdata);
638 }
639
640 return (size_t) r == l ? 1 : 0;
641
642 fail:
643
644 if (release_memblock)
645 pa_memblock_release(release_memblock);
646
647 return -1;
648 }
649
650 static int do_read(pa_pstream *p) {
651 void *d;
652 size_t l;
653 ssize_t r;
654 pa_memblock *release_memblock = NULL;
655 pa_assert(p);
656 pa_assert(PA_REFCNT_VALUE(p) > 0);
657
658 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
659 d = (uint8_t*) p->read.descriptor + p->read.index;
660 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
661 } else {
662 pa_assert(p->read.data || p->read.memblock);
663
664 if (p->read.data)
665 d = p->read.data;
666 else {
667 d = pa_memblock_acquire(p->read.memblock);
668 release_memblock = p->read.memblock;
669 }
670
671 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
672 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
673 }
674
675 #ifdef HAVE_CREDS
676 {
677 bool b = 0;
678
679 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
680 goto fail;
681
682 p->read_creds_valid = p->read_creds_valid || b;
683 }
684 #else
685 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
686 goto fail;
687 #endif
688
689 if (release_memblock)
690 pa_memblock_release(release_memblock);
691
692 p->read.index += (size_t) r;
693
694 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
695 uint32_t flags, length, channel;
696 /* Reading of frame descriptor complete */
697
698 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
699
700 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
701 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
702 return -1;
703 }
704
705 if (flags == PA_FLAG_SHMRELEASE) {
706
707 /* This is a SHM memblock release frame with no payload */
708
709 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
710
711 pa_assert(p->export);
712 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
713
714 goto frame_done;
715
716 } else if (flags == PA_FLAG_SHMREVOKE) {
717
718 /* This is a SHM memblock revoke frame with no payload */
719
720 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
721
722 pa_assert(p->import);
723 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
724
725 goto frame_done;
726 }
727
728 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
729
730 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
731 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
732 return -1;
733 }
734
735 pa_assert(!p->read.packet && !p->read.memblock);
736
737 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
738
739 if (channel == (uint32_t) -1) {
740
741 if (flags != 0) {
742 pa_log_warn("Received packet frame with invalid flags value.");
743 return -1;
744 }
745
746 /* Frame is a packet frame */
747 p->read.packet = pa_packet_new(length);
748 p->read.data = p->read.packet->data;
749
750 } else {
751
752 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
753 pa_log_warn("Received memblock frame with invalid seek mode.");
754 return -1;
755 }
756
757 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
758
759 if (length != sizeof(p->read.shm_info)) {
760 pa_log_warn("Received SHM memblock frame with invalid frame length.");
761 return -1;
762 }
763
764 /* Frame is a memblock frame referencing an SHM memblock */
765 p->read.data = p->read.shm_info;
766
767 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
768
769 /* Frame is a memblock frame */
770
771 p->read.memblock = pa_memblock_new(p->mempool, length);
772 p->read.data = NULL;
773 } else {
774
775 pa_log_warn("Received memblock frame with invalid flags value.");
776 return -1;
777 }
778 }
779
780 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
781 /* Frame payload available */
782
783 if (p->read.memblock && p->receive_memblock_callback) {
784
785 /* Is this memblock data? Than pass it to the user */
786 l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
787
788 if (l > 0) {
789 pa_memchunk chunk;
790
791 chunk.memblock = p->read.memblock;
792 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
793 chunk.length = l;
794
795 if (p->receive_memblock_callback) {
796 int64_t offset;
797
798 offset = (int64_t) (
799 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
800 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
801
802 p->receive_memblock_callback(
803 p,
804 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
805 offset,
806 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
807 &chunk,
808 p->receive_memblock_callback_userdata);
809 }
810
811 /* Drop seek info for following callbacks */
812 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
813 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
814 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
815 }
816 }
817
818 /* Frame complete */
819 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
820
821 if (p->read.memblock) {
822
823 /* This was a memblock frame. We can unref the memblock now */
824 pa_memblock_unref(p->read.memblock);
825
826 } else if (p->read.packet) {
827
828 if (p->receive_packet_callback)
829 #ifdef HAVE_CREDS
830 p->receive_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->receive_packet_callback_userdata);
831 #else
832 p->receive_packet_callback(p, p->read.packet, NULL, p->receive_packet_callback_userdata);
833 #endif
834
835 pa_packet_unref(p->read.packet);
836 } else {
837 pa_memblock *b;
838
839 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
840
841 pa_assert(p->import);
842
843 if (!(b = pa_memimport_get(p->import,
844 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
845 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
846 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
847 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
848
849 if (pa_log_ratelimit(PA_LOG_DEBUG))
850 pa_log_debug("Failed to import memory block.");
851 }
852
853 if (p->receive_memblock_callback) {
854 int64_t offset;
855 pa_memchunk chunk;
856
857 chunk.memblock = b;
858 chunk.index = 0;
859 chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
860
861 offset = (int64_t) (
862 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
863 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
864
865 p->receive_memblock_callback(
866 p,
867 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
868 offset,
869 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
870 &chunk,
871 p->receive_memblock_callback_userdata);
872 }
873
874 if (b)
875 pa_memblock_unref(b);
876 }
877
878 goto frame_done;
879 }
880 }
881
882 return 0;
883
884 frame_done:
885 p->read.memblock = NULL;
886 p->read.packet = NULL;
887 p->read.index = 0;
888 p->read.data = NULL;
889
890 #ifdef HAVE_CREDS
891 p->read_creds_valid = false;
892 #endif
893
894 return 0;
895
896 fail:
897 if (release_memblock)
898 pa_memblock_release(release_memblock);
899
900 return -1;
901 }
902
903 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
904 pa_assert(p);
905 pa_assert(PA_REFCNT_VALUE(p) > 0);
906
907 p->die_callback = cb;
908 p->die_callback_userdata = userdata;
909 }
910
911 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
912 pa_assert(p);
913 pa_assert(PA_REFCNT_VALUE(p) > 0);
914
915 p->drain_callback = cb;
916 p->drain_callback_userdata = userdata;
917 }
918
919 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
920 pa_assert(p);
921 pa_assert(PA_REFCNT_VALUE(p) > 0);
922
923 p->receive_packet_callback = cb;
924 p->receive_packet_callback_userdata = userdata;
925 }
926
927 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
928 pa_assert(p);
929 pa_assert(PA_REFCNT_VALUE(p) > 0);
930
931 p->receive_memblock_callback = cb;
932 p->receive_memblock_callback_userdata = userdata;
933 }
934
935 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
936 pa_assert(p);
937 pa_assert(PA_REFCNT_VALUE(p) > 0);
938
939 p->release_callback = cb;
940 p->release_callback_userdata = userdata;
941 }
942
943 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
944 pa_assert(p);
945 pa_assert(PA_REFCNT_VALUE(p) > 0);
946
947 p->release_callback = cb;
948 p->release_callback_userdata = userdata;
949 }
950
951 bool pa_pstream_is_pending(pa_pstream *p) {
952 bool b;
953
954 pa_assert(p);
955 pa_assert(PA_REFCNT_VALUE(p) > 0);
956
957 if (p->dead)
958 b = false;
959 else
960 b = p->write.current || !pa_queue_isempty(p->send_queue);
961
962 return b;
963 }
964
965 void pa_pstream_unref(pa_pstream*p) {
966 pa_assert(p);
967 pa_assert(PA_REFCNT_VALUE(p) > 0);
968
969 if (PA_REFCNT_DEC(p) <= 0)
970 pstream_free(p);
971 }
972
973 pa_pstream* pa_pstream_ref(pa_pstream*p) {
974 pa_assert(p);
975 pa_assert(PA_REFCNT_VALUE(p) > 0);
976
977 PA_REFCNT_INC(p);
978 return p;
979 }
980
981 void pa_pstream_unlink(pa_pstream *p) {
982 pa_assert(p);
983
984 if (p->dead)
985 return;
986
987 p->dead = true;
988
989 if (p->import) {
990 pa_memimport_free(p->import);
991 p->import = NULL;
992 }
993
994 if (p->export) {
995 pa_memexport_free(p->export);
996 p->export = NULL;
997 }
998
999 if (p->io) {
1000 pa_iochannel_free(p->io);
1001 p->io = NULL;
1002 }
1003
1004 if (p->defer_event) {
1005 p->mainloop->defer_free(p->defer_event);
1006 p->defer_event = NULL;
1007 }
1008
1009 p->die_callback = NULL;
1010 p->drain_callback = NULL;
1011 p->receive_packet_callback = NULL;
1012 p->receive_memblock_callback = NULL;
1013 }
1014
1015 void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
1016 pa_assert(p);
1017 pa_assert(PA_REFCNT_VALUE(p) > 0);
1018
1019 p->use_shm = enable;
1020
1021 if (enable) {
1022
1023 if (!p->export)
1024 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1025
1026 } else {
1027
1028 if (p->export) {
1029 pa_memexport_free(p->export);
1030 p->export = NULL;
1031 }
1032 }
1033 }
1034
1035 bool pa_pstream_get_shm(pa_pstream *p) {
1036 pa_assert(p);
1037 pa_assert(PA_REFCNT_VALUE(p) > 0);
1038
1039 return p->use_shm;
1040 }