]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
pstream: Use pa_xnew0 in initialization
[pulseaudio] / src / pulsecore / pstream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30
31 #ifdef HAVE_NETINET_IN_H
32 #include <netinet/in.h>
33 #endif
34
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/socket.h>
38 #include <pulsecore/queue.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/creds.h>
41 #include <pulsecore/refcnt.h>
42 #include <pulsecore/flist.h>
43 #include <pulsecore/macro.h>
44
45 #include "pstream.h"
46
47 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
48 #define PA_FLAG_SHMDATA 0x80000000LU
49 #define PA_FLAG_SHMRELEASE 0x40000000LU
50 #define PA_FLAG_SHMREVOKE 0xC0000000LU
51 #define PA_FLAG_SHMMASK 0xFF000000LU
52 #define PA_FLAG_SEEKMASK 0x000000FFLU
53
54 /* The sequence descriptor header consists of 5 32bit integers: */
55 enum {
56 PA_PSTREAM_DESCRIPTOR_LENGTH,
57 PA_PSTREAM_DESCRIPTOR_CHANNEL,
58 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
59 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
60 PA_PSTREAM_DESCRIPTOR_FLAGS,
61 PA_PSTREAM_DESCRIPTOR_MAX
62 };
63
64 /* If we have an SHM block, this info follows the descriptor */
65 enum {
66 PA_PSTREAM_SHM_BLOCKID,
67 PA_PSTREAM_SHM_SHMID,
68 PA_PSTREAM_SHM_INDEX,
69 PA_PSTREAM_SHM_LENGTH,
70 PA_PSTREAM_SHM_MAX
71 };
72
73 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
74
75 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
76
77 #define MINIBUF_SIZE (256)
78
79 /* To allow uploading a single sample in one frame, this value should be the
80 * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
81 */
82 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
83
84 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
85
86 struct item_info {
87 enum {
88 PA_PSTREAM_ITEM_PACKET,
89 PA_PSTREAM_ITEM_MEMBLOCK,
90 PA_PSTREAM_ITEM_SHMRELEASE,
91 PA_PSTREAM_ITEM_SHMREVOKE
92 } type;
93
94 /* packet info */
95 pa_packet *packet;
96 #ifdef HAVE_CREDS
97 bool with_creds;
98 pa_creds creds;
99 #endif
100
101 /* memblock info */
102 pa_memchunk chunk;
103 uint32_t channel;
104 int64_t offset;
105 pa_seek_mode_t seek_mode;
106
107 /* release/revoke info */
108 uint32_t block_id;
109 };
110
111 struct pa_pstream {
112 PA_REFCNT_DECLARE;
113
114 pa_mainloop_api *mainloop;
115 pa_defer_event *defer_event;
116 pa_iochannel *io;
117
118 pa_queue *send_queue;
119
120 bool dead;
121
122 struct {
123 union {
124 uint8_t minibuf[MINIBUF_SIZE];
125 pa_pstream_descriptor descriptor;
126 };
127 struct item_info* current;
128 void *data;
129 size_t index;
130 int minibuf_validsize;
131 pa_memchunk memchunk;
132 } write;
133
134 struct {
135 pa_pstream_descriptor descriptor;
136 pa_memblock *memblock;
137 pa_packet *packet;
138 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
139 void *data;
140 size_t index;
141 } read;
142
143 bool use_shm;
144 pa_memimport *import;
145 pa_memexport *export;
146
147 pa_pstream_packet_cb_t receive_packet_callback;
148 void *receive_packet_callback_userdata;
149
150 pa_pstream_memblock_cb_t receive_memblock_callback;
151 void *receive_memblock_callback_userdata;
152
153 pa_pstream_notify_cb_t drain_callback;
154 void *drain_callback_userdata;
155
156 pa_pstream_notify_cb_t die_callback;
157 void *die_callback_userdata;
158
159 pa_pstream_block_id_cb_t revoke_callback;
160 void *revoke_callback_userdata;
161
162 pa_pstream_block_id_cb_t release_callback;
163 void *release_callback_userdata;
164
165 pa_mempool *mempool;
166
167 #ifdef HAVE_CREDS
168 pa_creds read_creds, write_creds;
169 bool read_creds_valid, send_creds_now;
170 #endif
171 };
172
173 static int do_write(pa_pstream *p);
174 static int do_read(pa_pstream *p);
175
176 static void do_pstream_read_write(pa_pstream *p) {
177 pa_assert(p);
178 pa_assert(PA_REFCNT_VALUE(p) > 0);
179
180 pa_pstream_ref(p);
181
182 p->mainloop->defer_enable(p->defer_event, 0);
183
184 if (!p->dead && pa_iochannel_is_readable(p->io)) {
185 if (do_read(p) < 0)
186 goto fail;
187 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
188 goto fail;
189
190 while (!p->dead && pa_iochannel_is_writable(p->io)) {
191 int r = do_write(p);
192 if (r < 0)
193 goto fail;
194 if (r == 0)
195 break;
196 }
197
198 pa_pstream_unref(p);
199 return;
200
201 fail:
202
203 if (p->die_callback)
204 p->die_callback(p, p->die_callback_userdata);
205
206 pa_pstream_unlink(p);
207 pa_pstream_unref(p);
208 }
209
210 static void io_callback(pa_iochannel*io, void *userdata) {
211 pa_pstream *p = userdata;
212
213 pa_assert(p);
214 pa_assert(PA_REFCNT_VALUE(p) > 0);
215 pa_assert(p->io == io);
216
217 do_pstream_read_write(p);
218 }
219
220 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
221 pa_pstream *p = userdata;
222
223 pa_assert(p);
224 pa_assert(PA_REFCNT_VALUE(p) > 0);
225 pa_assert(p->defer_event == e);
226 pa_assert(p->mainloop == m);
227
228 do_pstream_read_write(p);
229 }
230
231 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
232
233 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
234 pa_pstream *p;
235
236 pa_assert(m);
237 pa_assert(io);
238 pa_assert(pool);
239
240 p = pa_xnew0(pa_pstream, 1);
241 PA_REFCNT_INIT(p);
242 p->io = io;
243 pa_iochannel_set_callback(io, io_callback, p);
244
245 p->mainloop = m;
246 p->defer_event = m->defer_new(m, defer_callback, p);
247 m->defer_enable(p->defer_event, 0);
248
249 p->send_queue = pa_queue_new();
250
251 p->mempool = pool;
252
253 /* We do importing unconditionally */
254 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
255
256 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
257 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
258
259 return p;
260 }
261
262 static void item_free(void *item) {
263 struct item_info *i = item;
264 pa_assert(i);
265
266 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
267 pa_assert(i->chunk.memblock);
268 pa_memblock_unref(i->chunk.memblock);
269 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
270 pa_assert(i->packet);
271 pa_packet_unref(i->packet);
272 }
273
274 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
275 pa_xfree(i);
276 }
277
278 static void pstream_free(pa_pstream *p) {
279 pa_assert(p);
280
281 pa_pstream_unlink(p);
282
283 pa_queue_free(p->send_queue, item_free);
284
285 if (p->write.current)
286 item_free(p->write.current);
287
288 if (p->write.memchunk.memblock)
289 pa_memblock_unref(p->write.memchunk.memblock);
290
291 if (p->read.memblock)
292 pa_memblock_unref(p->read.memblock);
293
294 if (p->read.packet)
295 pa_packet_unref(p->read.packet);
296
297 pa_xfree(p);
298 }
299
300 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
301 struct item_info *i;
302
303 pa_assert(p);
304 pa_assert(PA_REFCNT_VALUE(p) > 0);
305 pa_assert(packet);
306
307 if (p->dead)
308 return;
309
310 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
311 i = pa_xnew(struct item_info, 1);
312
313 i->type = PA_PSTREAM_ITEM_PACKET;
314 i->packet = pa_packet_ref(packet);
315
316 #ifdef HAVE_CREDS
317 if ((i->with_creds = !!creds))
318 i->creds = *creds;
319 #endif
320
321 pa_queue_push(p->send_queue, i);
322
323 p->mainloop->defer_enable(p->defer_event, 1);
324 }
325
326 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
327 size_t length, idx;
328 size_t bsm;
329
330 pa_assert(p);
331 pa_assert(PA_REFCNT_VALUE(p) > 0);
332 pa_assert(channel != (uint32_t) -1);
333 pa_assert(chunk);
334
335 if (p->dead)
336 return;
337
338 idx = 0;
339 length = chunk->length;
340
341 bsm = pa_mempool_block_size_max(p->mempool);
342
343 while (length > 0) {
344 struct item_info *i;
345 size_t n;
346
347 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
348 i = pa_xnew(struct item_info, 1);
349 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
350
351 n = PA_MIN(length, bsm);
352 i->chunk.index = chunk->index + idx;
353 i->chunk.length = n;
354 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
355
356 i->channel = channel;
357 i->offset = offset;
358 i->seek_mode = seek_mode;
359 #ifdef HAVE_CREDS
360 i->with_creds = false;
361 #endif
362
363 pa_queue_push(p->send_queue, i);
364
365 idx += n;
366 length -= n;
367 }
368
369 p->mainloop->defer_enable(p->defer_event, 1);
370 }
371
372 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
373 struct item_info *item;
374 pa_assert(p);
375 pa_assert(PA_REFCNT_VALUE(p) > 0);
376
377 if (p->dead)
378 return;
379
380 /* pa_log("Releasing block %u", block_id); */
381
382 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
383 item = pa_xnew(struct item_info, 1);
384 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
385 item->block_id = block_id;
386 #ifdef HAVE_CREDS
387 item->with_creds = false;
388 #endif
389
390 pa_queue_push(p->send_queue, item);
391 p->mainloop->defer_enable(p->defer_event, 1);
392 }
393
394 /* might be called from thread context */
395 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
396 pa_pstream *p = userdata;
397
398 pa_assert(p);
399 pa_assert(PA_REFCNT_VALUE(p) > 0);
400
401 if (p->dead)
402 return;
403
404 if (p->release_callback)
405 p->release_callback(p, block_id, p->release_callback_userdata);
406 else
407 pa_pstream_send_release(p, block_id);
408 }
409
410 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
411 struct item_info *item;
412 pa_assert(p);
413 pa_assert(PA_REFCNT_VALUE(p) > 0);
414
415 if (p->dead)
416 return;
417 /* pa_log("Revoking block %u", block_id); */
418
419 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
420 item = pa_xnew(struct item_info, 1);
421 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
422 item->block_id = block_id;
423 #ifdef HAVE_CREDS
424 item->with_creds = false;
425 #endif
426
427 pa_queue_push(p->send_queue, item);
428 p->mainloop->defer_enable(p->defer_event, 1);
429 }
430
431 /* might be called from thread context */
432 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
433 pa_pstream *p = userdata;
434
435 pa_assert(p);
436 pa_assert(PA_REFCNT_VALUE(p) > 0);
437
438 if (p->revoke_callback)
439 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
440 else
441 pa_pstream_send_revoke(p, block_id);
442 }
443
444 static void prepare_next_write_item(pa_pstream *p) {
445 pa_assert(p);
446 pa_assert(PA_REFCNT_VALUE(p) > 0);
447
448 p->write.current = pa_queue_pop(p->send_queue);
449
450 if (!p->write.current)
451 return;
452 p->write.index = 0;
453 p->write.data = NULL;
454 p->write.minibuf_validsize = 0;
455 pa_memchunk_reset(&p->write.memchunk);
456
457 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
458 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
459 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
460 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
461 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
462
463 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
464
465 pa_assert(p->write.current->packet);
466 p->write.data = p->write.current->packet->data;
467 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->packet->length);
468
469 if (p->write.current->packet->length <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
470 memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, p->write.current->packet->length);
471 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + p->write.current->packet->length;
472 }
473
474 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
475
476 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
477 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
478
479 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
480
481 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
482 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
483
484 } else {
485 uint32_t flags;
486 bool send_payload = true;
487
488 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
489 pa_assert(p->write.current->chunk.memblock);
490
491 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
492 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
493 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
494
495 flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
496
497 if (p->use_shm) {
498 uint32_t block_id, shm_id;
499 size_t offset, length;
500 uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
501 size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
502
503 pa_assert(p->export);
504
505 if (pa_memexport_put(p->export,
506 p->write.current->chunk.memblock,
507 &block_id,
508 &shm_id,
509 &offset,
510 &length) >= 0) {
511
512 flags |= PA_FLAG_SHMDATA;
513 send_payload = false;
514
515 shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
516 shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
517 shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
518 shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
519
520 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
521 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
522 }
523 /* else */
524 /* pa_log_warn("Failed to export memory block."); */
525 }
526
527 if (send_payload) {
528 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
529 p->write.memchunk = p->write.current->chunk;
530 pa_memblock_ref(p->write.memchunk.memblock);
531 p->write.data = NULL;
532 }
533
534 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
535 }
536
537 #ifdef HAVE_CREDS
538 if ((p->send_creds_now = p->write.current->with_creds))
539 p->write_creds = p->write.current->creds;
540 #endif
541 }
542
543 static int do_write(pa_pstream *p) {
544 void *d;
545 size_t l;
546 ssize_t r;
547 pa_memblock *release_memblock = NULL;
548
549 pa_assert(p);
550 pa_assert(PA_REFCNT_VALUE(p) > 0);
551
552 if (!p->write.current)
553 prepare_next_write_item(p);
554
555 if (!p->write.current)
556 return 0;
557
558 if (p->write.minibuf_validsize > 0) {
559 d = p->write.minibuf + p->write.index;
560 l = p->write.minibuf_validsize - p->write.index;
561 } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
562 d = (uint8_t*) p->write.descriptor + p->write.index;
563 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
564 } else {
565 pa_assert(p->write.data || p->write.memchunk.memblock);
566
567 if (p->write.data)
568 d = p->write.data;
569 else {
570 d = pa_memblock_acquire_chunk(&p->write.memchunk);
571 release_memblock = p->write.memchunk.memblock;
572 }
573
574 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
575 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
576 }
577
578 pa_assert(l > 0);
579
580 #ifdef HAVE_CREDS
581 if (p->send_creds_now) {
582
583 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
584 goto fail;
585
586 p->send_creds_now = false;
587 } else
588 #endif
589
590 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
591 goto fail;
592
593 if (release_memblock)
594 pa_memblock_release(release_memblock);
595
596 p->write.index += (size_t) r;
597
598 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
599 pa_assert(p->write.current);
600 item_free(p->write.current);
601 p->write.current = NULL;
602
603 if (p->write.memchunk.memblock)
604 pa_memblock_unref(p->write.memchunk.memblock);
605
606 pa_memchunk_reset(&p->write.memchunk);
607
608 if (p->drain_callback && !pa_pstream_is_pending(p))
609 p->drain_callback(p, p->drain_callback_userdata);
610 }
611
612 return (size_t) r == l ? 1 : 0;
613
614 fail:
615
616 if (release_memblock)
617 pa_memblock_release(release_memblock);
618
619 return -1;
620 }
621
622 static int do_read(pa_pstream *p) {
623 void *d;
624 size_t l;
625 ssize_t r;
626 pa_memblock *release_memblock = NULL;
627 pa_assert(p);
628 pa_assert(PA_REFCNT_VALUE(p) > 0);
629
630 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
631 d = (uint8_t*) p->read.descriptor + p->read.index;
632 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
633 } else {
634 pa_assert(p->read.data || p->read.memblock);
635
636 if (p->read.data)
637 d = p->read.data;
638 else {
639 d = pa_memblock_acquire(p->read.memblock);
640 release_memblock = p->read.memblock;
641 }
642
643 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
644 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
645 }
646
647 #ifdef HAVE_CREDS
648 {
649 bool b = 0;
650
651 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
652 goto fail;
653
654 p->read_creds_valid = p->read_creds_valid || b;
655 }
656 #else
657 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
658 goto fail;
659 #endif
660
661 if (release_memblock)
662 pa_memblock_release(release_memblock);
663
664 p->read.index += (size_t) r;
665
666 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
667 uint32_t flags, length, channel;
668 /* Reading of frame descriptor complete */
669
670 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
671
672 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
673 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
674 return -1;
675 }
676
677 if (flags == PA_FLAG_SHMRELEASE) {
678
679 /* This is a SHM memblock release frame with no payload */
680
681 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
682
683 pa_assert(p->export);
684 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
685
686 goto frame_done;
687
688 } else if (flags == PA_FLAG_SHMREVOKE) {
689
690 /* This is a SHM memblock revoke frame with no payload */
691
692 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
693
694 pa_assert(p->import);
695 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
696
697 goto frame_done;
698 }
699
700 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
701
702 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
703 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
704 return -1;
705 }
706
707 pa_assert(!p->read.packet && !p->read.memblock);
708
709 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
710
711 if (channel == (uint32_t) -1) {
712
713 if (flags != 0) {
714 pa_log_warn("Received packet frame with invalid flags value.");
715 return -1;
716 }
717
718 /* Frame is a packet frame */
719 p->read.packet = pa_packet_new(length);
720 p->read.data = p->read.packet->data;
721
722 } else {
723
724 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
725 pa_log_warn("Received memblock frame with invalid seek mode.");
726 return -1;
727 }
728
729 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
730
731 if (length != sizeof(p->read.shm_info)) {
732 pa_log_warn("Received SHM memblock frame with invalid frame length.");
733 return -1;
734 }
735
736 /* Frame is a memblock frame referencing an SHM memblock */
737 p->read.data = p->read.shm_info;
738
739 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
740
741 /* Frame is a memblock frame */
742
743 p->read.memblock = pa_memblock_new(p->mempool, length);
744 p->read.data = NULL;
745 } else {
746
747 pa_log_warn("Received memblock frame with invalid flags value.");
748 return -1;
749 }
750 }
751
752 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
753 /* Frame payload available */
754
755 if (p->read.memblock && p->receive_memblock_callback) {
756
757 /* Is this memblock data? Than pass it to the user */
758 l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
759
760 if (l > 0) {
761 pa_memchunk chunk;
762
763 chunk.memblock = p->read.memblock;
764 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
765 chunk.length = l;
766
767 if (p->receive_memblock_callback) {
768 int64_t offset;
769
770 offset = (int64_t) (
771 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
772 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
773
774 p->receive_memblock_callback(
775 p,
776 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
777 offset,
778 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
779 &chunk,
780 p->receive_memblock_callback_userdata);
781 }
782
783 /* Drop seek info for following callbacks */
784 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
785 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
786 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
787 }
788 }
789
790 /* Frame complete */
791 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
792
793 if (p->read.memblock) {
794
795 /* This was a memblock frame. We can unref the memblock now */
796 pa_memblock_unref(p->read.memblock);
797
798 } else if (p->read.packet) {
799
800 if (p->receive_packet_callback)
801 #ifdef HAVE_CREDS
802 p->receive_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->receive_packet_callback_userdata);
803 #else
804 p->receive_packet_callback(p, p->read.packet, NULL, p->receive_packet_callback_userdata);
805 #endif
806
807 pa_packet_unref(p->read.packet);
808 } else {
809 pa_memblock *b;
810
811 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
812
813 pa_assert(p->import);
814
815 if (!(b = pa_memimport_get(p->import,
816 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
817 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
818 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
819 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
820
821 if (pa_log_ratelimit(PA_LOG_DEBUG))
822 pa_log_debug("Failed to import memory block.");
823 }
824
825 if (p->receive_memblock_callback) {
826 int64_t offset;
827 pa_memchunk chunk;
828
829 chunk.memblock = b;
830 chunk.index = 0;
831 chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
832
833 offset = (int64_t) (
834 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
835 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
836
837 p->receive_memblock_callback(
838 p,
839 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
840 offset,
841 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
842 &chunk,
843 p->receive_memblock_callback_userdata);
844 }
845
846 if (b)
847 pa_memblock_unref(b);
848 }
849
850 goto frame_done;
851 }
852 }
853
854 return 0;
855
856 frame_done:
857 p->read.memblock = NULL;
858 p->read.packet = NULL;
859 p->read.index = 0;
860 p->read.data = NULL;
861
862 #ifdef HAVE_CREDS
863 p->read_creds_valid = false;
864 #endif
865
866 return 0;
867
868 fail:
869 if (release_memblock)
870 pa_memblock_release(release_memblock);
871
872 return -1;
873 }
874
875 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
876 pa_assert(p);
877 pa_assert(PA_REFCNT_VALUE(p) > 0);
878
879 p->die_callback = cb;
880 p->die_callback_userdata = userdata;
881 }
882
883 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
884 pa_assert(p);
885 pa_assert(PA_REFCNT_VALUE(p) > 0);
886
887 p->drain_callback = cb;
888 p->drain_callback_userdata = userdata;
889 }
890
891 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
892 pa_assert(p);
893 pa_assert(PA_REFCNT_VALUE(p) > 0);
894
895 p->receive_packet_callback = cb;
896 p->receive_packet_callback_userdata = userdata;
897 }
898
899 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
900 pa_assert(p);
901 pa_assert(PA_REFCNT_VALUE(p) > 0);
902
903 p->receive_memblock_callback = cb;
904 p->receive_memblock_callback_userdata = userdata;
905 }
906
907 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
908 pa_assert(p);
909 pa_assert(PA_REFCNT_VALUE(p) > 0);
910
911 p->release_callback = cb;
912 p->release_callback_userdata = userdata;
913 }
914
915 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
916 pa_assert(p);
917 pa_assert(PA_REFCNT_VALUE(p) > 0);
918
919 p->release_callback = cb;
920 p->release_callback_userdata = userdata;
921 }
922
923 bool pa_pstream_is_pending(pa_pstream *p) {
924 bool b;
925
926 pa_assert(p);
927 pa_assert(PA_REFCNT_VALUE(p) > 0);
928
929 if (p->dead)
930 b = false;
931 else
932 b = p->write.current || !pa_queue_isempty(p->send_queue);
933
934 return b;
935 }
936
937 void pa_pstream_unref(pa_pstream*p) {
938 pa_assert(p);
939 pa_assert(PA_REFCNT_VALUE(p) > 0);
940
941 if (PA_REFCNT_DEC(p) <= 0)
942 pstream_free(p);
943 }
944
945 pa_pstream* pa_pstream_ref(pa_pstream*p) {
946 pa_assert(p);
947 pa_assert(PA_REFCNT_VALUE(p) > 0);
948
949 PA_REFCNT_INC(p);
950 return p;
951 }
952
953 void pa_pstream_unlink(pa_pstream *p) {
954 pa_assert(p);
955
956 if (p->dead)
957 return;
958
959 p->dead = true;
960
961 if (p->import) {
962 pa_memimport_free(p->import);
963 p->import = NULL;
964 }
965
966 if (p->export) {
967 pa_memexport_free(p->export);
968 p->export = NULL;
969 }
970
971 if (p->io) {
972 pa_iochannel_free(p->io);
973 p->io = NULL;
974 }
975
976 if (p->defer_event) {
977 p->mainloop->defer_free(p->defer_event);
978 p->defer_event = NULL;
979 }
980
981 p->die_callback = NULL;
982 p->drain_callback = NULL;
983 p->receive_packet_callback = NULL;
984 p->receive_memblock_callback = NULL;
985 }
986
987 void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
988 pa_assert(p);
989 pa_assert(PA_REFCNT_VALUE(p) > 0);
990
991 p->use_shm = enable;
992
993 if (enable) {
994
995 if (!p->export)
996 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
997
998 } else {
999
1000 if (p->export) {
1001 pa_memexport_free(p->export);
1002 p->export = NULL;
1003 }
1004 }
1005 }
1006
1007 bool pa_pstream_get_shm(pa_pstream *p) {
1008 pa_assert(p);
1009 pa_assert(PA_REFCNT_VALUE(p) > 0);
1010
1011 return p->use_shm;
1012 }