]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
move pstream item allocation to pa_flist
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
35 #endif
36 #ifdef HAVE_SYS_UN_H
37 #include <sys/un.h>
38 #endif
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
41 #endif
42
43 #include "winsock.h"
44
45 #include <pulse/xmalloc.h>
46
47 #include <pulsecore/queue.h>
48 #include <pulsecore/log.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/creds.h>
51 #include <pulsecore/refcnt.h>
52 #include <pulsecore/flist.h>
53
54 #include "pstream.h"
55
56 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
57 #define PA_FLAG_SHMDATA 0x80000000LU
58 #define PA_FLAG_SHMRELEASE 0x40000000LU
59 #define PA_FLAG_SHMREVOKE 0xC0000000LU
60 #define PA_FLAG_SHMMASK 0xFF000000LU
61 #define PA_FLAG_SEEKMASK 0x000000FFLU
62
63 /* The sequence descriptor header consists of 5 32bit integers: */
64 enum {
65 PA_PSTREAM_DESCRIPTOR_LENGTH,
66 PA_PSTREAM_DESCRIPTOR_CHANNEL,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
68 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
69 PA_PSTREAM_DESCRIPTOR_FLAGS,
70 PA_PSTREAM_DESCRIPTOR_MAX
71 };
72
73 /* If we have an SHM block, this info follows the descriptor */
74 enum {
75 PA_PSTREAM_SHM_BLOCKID,
76 PA_PSTREAM_SHM_SHMID,
77 PA_PSTREAM_SHM_INDEX,
78 PA_PSTREAM_SHM_LENGTH,
79 PA_PSTREAM_SHM_MAX
80 };
81
82 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
83
84 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
85 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
86 #define FRAME_SIZE_MAX_USE (1024*64)
87
88 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
89
90 struct item_info {
91 enum {
92 PA_PSTREAM_ITEM_PACKET,
93 PA_PSTREAM_ITEM_MEMBLOCK,
94 PA_PSTREAM_ITEM_SHMRELEASE,
95 PA_PSTREAM_ITEM_SHMREVOKE
96 } type;
97
98 /* packet info */
99 pa_packet *packet;
100 #ifdef HAVE_CREDS
101 int with_creds;
102 pa_creds creds;
103 #endif
104
105 /* memblock info */
106 pa_memchunk chunk;
107 uint32_t channel;
108 int64_t offset;
109 pa_seek_mode_t seek_mode;
110
111 /* release/revoke info */
112 uint32_t block_id;
113 };
114
115 struct pa_pstream {
116 PA_REFCNT_DECLARE;
117
118 pa_mainloop_api *mainloop;
119 pa_defer_event *defer_event;
120 pa_iochannel *io;
121
122 pa_queue *send_queue;
123
124 int dead;
125
126 struct {
127 pa_pstream_descriptor descriptor;
128 struct item_info* current;
129 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
130 void *data;
131 size_t index;
132 pa_memchunk memchunk;
133 } write;
134
135 struct {
136 pa_pstream_descriptor descriptor;
137 pa_memblock *memblock;
138 pa_packet *packet;
139 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
140 void *data;
141 size_t index;
142 } read;
143
144 int use_shm;
145 pa_memimport *import;
146 pa_memexport *export;
147
148 pa_pstream_packet_cb_t recieve_packet_callback;
149 void *recieve_packet_callback_userdata;
150
151 pa_pstream_memblock_cb_t recieve_memblock_callback;
152 void *recieve_memblock_callback_userdata;
153
154 pa_pstream_notify_cb_t drain_callback;
155 void *drain_callback_userdata;
156
157 pa_pstream_notify_cb_t die_callback;
158 void *die_callback_userdata;
159
160 pa_pstream_block_id_cb_t revoke_callback;
161 void *revoke_callback_userdata;
162
163 pa_pstream_block_id_cb_t release_callback;
164 void *release_callback_userdata;
165
166 pa_mempool *mempool;
167
168 #ifdef HAVE_CREDS
169 pa_creds read_creds, write_creds;
170 int read_creds_valid, send_creds_now;
171 #endif
172 };
173
174 static int do_write(pa_pstream *p);
175 static int do_read(pa_pstream *p);
176
177 static void do_something(pa_pstream *p) {
178 pa_assert(p);
179 pa_assert(PA_REFCNT_VALUE(p) > 0);
180
181 pa_pstream_ref(p);
182
183 p->mainloop->defer_enable(p->defer_event, 0);
184
185 if (!p->dead && pa_iochannel_is_readable(p->io)) {
186 if (do_read(p) < 0)
187 goto fail;
188 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
189 goto fail;
190
191 if (!p->dead && pa_iochannel_is_writable(p->io)) {
192 if (do_write(p) < 0)
193 goto fail;
194 }
195
196 pa_pstream_unref(p);
197 return;
198
199 fail:
200
201 if (p->die_callback)
202 p->die_callback(p, p->die_callback_userdata);
203
204 pa_pstream_unlink(p);
205 pa_pstream_unref(p);
206 }
207
208 static void io_callback(pa_iochannel*io, void *userdata) {
209 pa_pstream *p = userdata;
210
211 pa_assert(p);
212 pa_assert(PA_REFCNT_VALUE(p) > 0);
213 pa_assert(p->io == io);
214
215 do_something(p);
216 }
217
218 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
219 pa_pstream *p = userdata;
220
221 pa_assert(p);
222 pa_assert(PA_REFCNT_VALUE(p) > 0);
223 pa_assert(p->defer_event == e);
224 pa_assert(p->mainloop == m);
225
226 do_something(p);
227 }
228
229 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
230
231 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
232 pa_pstream *p;
233
234 pa_assert(m);
235 pa_assert(io);
236 pa_assert(pool);
237
238 p = pa_xnew(pa_pstream, 1);
239 PA_REFCNT_INIT(p);
240 p->io = io;
241 pa_iochannel_set_callback(io, io_callback, p);
242 p->dead = 0;
243
244 p->mainloop = m;
245 p->defer_event = m->defer_new(m, defer_callback, p);
246 m->defer_enable(p->defer_event, 0);
247
248 p->send_queue = pa_queue_new();
249
250 p->write.current = NULL;
251 p->write.index = 0;
252 pa_memchunk_reset(&p->write.memchunk);
253 p->read.memblock = NULL;
254 p->read.packet = NULL;
255 p->read.index = 0;
256
257 p->recieve_packet_callback = NULL;
258 p->recieve_packet_callback_userdata = NULL;
259 p->recieve_memblock_callback = NULL;
260 p->recieve_memblock_callback_userdata = NULL;
261 p->drain_callback = NULL;
262 p->drain_callback_userdata = NULL;
263 p->die_callback = NULL;
264 p->die_callback_userdata = NULL;
265 p->revoke_callback = NULL;
266 p->revoke_callback_userdata = NULL;
267 p->release_callback = NULL;
268 p->release_callback_userdata = NULL;
269
270 p->mempool = pool;
271
272 p->use_shm = 0;
273 p->export = NULL;
274
275 /* We do importing unconditionally */
276 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
277
278 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
279 pa_iochannel_socket_set_sndbuf(io, 1024*8);
280
281 #ifdef HAVE_CREDS
282 p->send_creds_now = 0;
283 p->read_creds_valid = 0;
284 #endif
285 return p;
286 }
287
288 static void item_free(void *item, PA_GCC_UNUSED void *q) {
289 struct item_info *i = item;
290 pa_assert(i);
291
292 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
293 pa_assert(i->chunk.memblock);
294 pa_memblock_unref(i->chunk.memblock);
295 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
296 pa_assert(i->packet);
297 pa_packet_unref(i->packet);
298 }
299
300 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
301 pa_xfree(i);
302 }
303
304 static void pstream_free(pa_pstream *p) {
305 pa_assert(p);
306
307 pa_pstream_unlink(p);
308
309 pa_queue_free(p->send_queue, item_free, NULL);
310
311 if (p->write.current)
312 item_free(p->write.current, NULL);
313
314 if (p->write.memchunk.memblock)
315 pa_memblock_unref(p->write.memchunk.memblock);
316
317 if (p->read.memblock)
318 pa_memblock_unref(p->read.memblock);
319
320 if (p->read.packet)
321 pa_packet_unref(p->read.packet);
322
323 pa_xfree(p);
324 }
325
326 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
327 struct item_info *i;
328
329 pa_assert(p);
330 pa_assert(PA_REFCNT_VALUE(p) > 0);
331 pa_assert(packet);
332
333 if (p->dead)
334 return;
335
336 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
337 i = pa_xnew(struct item_info, 1);
338
339 i->type = PA_PSTREAM_ITEM_PACKET;
340 i->packet = pa_packet_ref(packet);
341
342 #ifdef HAVE_CREDS
343 if ((i->with_creds = !!creds))
344 i->creds = *creds;
345 #endif
346
347 pa_queue_push(p->send_queue, i);
348
349 p->mainloop->defer_enable(p->defer_event, 1);
350 }
351
352 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
353 size_t length, idx;
354
355 pa_assert(p);
356 pa_assert(PA_REFCNT_VALUE(p) > 0);
357 pa_assert(channel != (uint32_t) -1);
358 pa_assert(chunk);
359
360 if (p->dead)
361 return;
362
363 idx = 0;
364 length = chunk->length;
365
366 while (length > 0) {
367 struct item_info *i;
368 size_t n;
369
370 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
371 i = pa_xnew(struct item_info, 1);
372 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
373
374 n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
375 i->chunk.index = chunk->index + idx;
376 i->chunk.length = n;
377 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
378
379 i->channel = channel;
380 i->offset = offset;
381 i->seek_mode = seek_mode;
382 #ifdef HAVE_CREDS
383 i->with_creds = 0;
384 #endif
385
386 pa_queue_push(p->send_queue, i);
387
388 idx += n;
389 length -= n;
390 }
391
392 p->mainloop->defer_enable(p->defer_event, 1);
393 }
394
395 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
396 struct item_info *item;
397 pa_assert(p);
398 pa_assert(PA_REFCNT_VALUE(p) > 0);
399
400 if (p->dead)
401 return;
402
403 /* pa_log("Releasing block %u", block_id); */
404
405 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
406 item = pa_xnew(struct item_info, 1);
407 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
408 item->block_id = block_id;
409 #ifdef HAVE_CREDS
410 item->with_creds = 0;
411 #endif
412
413 pa_queue_push(p->send_queue, item);
414 p->mainloop->defer_enable(p->defer_event, 1);
415 }
416
417 /* might be called from thread context */
418 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
419 pa_pstream *p = userdata;
420
421 pa_assert(p);
422 pa_assert(PA_REFCNT_VALUE(p) > 0);
423
424 if (p->dead)
425 return;
426
427 if (p->release_callback)
428 p->release_callback(p, block_id, p->release_callback_userdata);
429 else
430 pa_pstream_send_release(p, block_id);
431 }
432
433 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
434 struct item_info *item;
435 pa_assert(p);
436 pa_assert(PA_REFCNT_VALUE(p) > 0);
437
438 if (p->dead)
439 return;
440 /* pa_log("Revoking block %u", block_id); */
441
442 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
443 item = pa_xnew(struct item_info, 1);
444 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
445 item->block_id = block_id;
446 #ifdef HAVE_CREDS
447 item->with_creds = 0;
448 #endif
449
450 pa_queue_push(p->send_queue, item);
451 p->mainloop->defer_enable(p->defer_event, 1);
452 }
453
454 /* might be called from thread context */
455 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
456 pa_pstream *p = userdata;
457
458 pa_assert(p);
459 pa_assert(PA_REFCNT_VALUE(p) > 0);
460
461 if (p->revoke_callback)
462 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
463 else
464 pa_pstream_send_revoke(p, block_id);
465 }
466
467 static void prepare_next_write_item(pa_pstream *p) {
468 pa_assert(p);
469 pa_assert(PA_REFCNT_VALUE(p) > 0);
470
471 p->write.current = pa_queue_pop(p->send_queue);
472
473 if (!p->write.current)
474 return;
475
476 p->write.index = 0;
477 p->write.data = NULL;
478 pa_memchunk_reset(&p->write.memchunk);
479
480 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
481 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
482 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
483 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
484 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
485
486 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
487
488 pa_assert(p->write.current->packet);
489 p->write.data = p->write.current->packet->data;
490 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
491
492 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
493
494 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
495 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
496
497 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
498
499 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
500 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
501
502 } else {
503 uint32_t flags;
504 int send_payload = 1;
505
506 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
507 pa_assert(p->write.current->chunk.memblock);
508
509 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
510 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
511 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
512
513 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
514
515 if (p->use_shm) {
516 uint32_t block_id, shm_id;
517 size_t offset, length;
518
519 pa_assert(p->export);
520
521 if (pa_memexport_put(p->export,
522 p->write.current->chunk.memblock,
523 &block_id,
524 &shm_id,
525 &offset,
526 &length) >= 0) {
527
528 flags |= PA_FLAG_SHMDATA;
529 send_payload = 0;
530
531 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
532 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
533 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
534 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
535
536 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
537 p->write.data = p->write.shm_info;
538 }
539 /* else */
540 /* pa_log_warn("Failed to export memory block."); */
541 }
542
543 if (send_payload) {
544 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
545 p->write.memchunk = p->write.current->chunk;
546 pa_memblock_ref(p->write.memchunk.memblock);
547 p->write.data = NULL;
548 }
549
550 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
551 }
552
553 #ifdef HAVE_CREDS
554 if ((p->send_creds_now = p->write.current->with_creds))
555 p->write_creds = p->write.current->creds;
556 #endif
557 }
558
559 static int do_write(pa_pstream *p) {
560 void *d;
561 size_t l;
562 ssize_t r;
563 pa_memblock *release_memblock = NULL;
564
565 pa_assert(p);
566 pa_assert(PA_REFCNT_VALUE(p) > 0);
567
568 if (!p->write.current)
569 prepare_next_write_item(p);
570
571 if (!p->write.current)
572 return 0;
573
574 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
575 d = (uint8_t*) p->write.descriptor + p->write.index;
576 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
577 } else {
578 pa_assert(p->write.data || p->write.memchunk.memblock);
579
580 if (p->write.data)
581 d = p->write.data;
582 else {
583 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
584 release_memblock = p->write.memchunk.memblock;
585 }
586
587 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
588 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
589 }
590
591 pa_assert(l > 0);
592
593 #ifdef HAVE_CREDS
594 if (p->send_creds_now) {
595
596 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
597 goto fail;
598
599 p->send_creds_now = 0;
600 } else
601 #endif
602
603 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
604 goto fail;
605
606 if (release_memblock)
607 pa_memblock_release(release_memblock);
608
609 p->write.index += r;
610
611 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
612 pa_assert(p->write.current);
613 item_free(p->write.current, NULL);
614 p->write.current = NULL;
615
616 if (p->write.memchunk.memblock)
617 pa_memblock_unref(p->write.memchunk.memblock);
618
619 pa_memchunk_reset(&p->write.memchunk);
620
621 if (p->drain_callback && !pa_pstream_is_pending(p))
622 p->drain_callback(p, p->drain_callback_userdata);
623 }
624
625 return 0;
626
627 fail:
628
629 if (release_memblock)
630 pa_memblock_release(release_memblock);
631
632 return -1;
633 }
634
635 static int do_read(pa_pstream *p) {
636 void *d;
637 size_t l;
638 ssize_t r;
639 pa_memblock *release_memblock = NULL;
640 pa_assert(p);
641 pa_assert(PA_REFCNT_VALUE(p) > 0);
642
643 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
644 d = (uint8_t*) p->read.descriptor + p->read.index;
645 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
646 } else {
647 pa_assert(p->read.data || p->read.memblock);
648
649 if (p->read.data)
650 d = p->read.data;
651 else {
652 d = pa_memblock_acquire(p->read.memblock);
653 release_memblock = p->read.memblock;
654 }
655
656 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
657 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
658 }
659
660 #ifdef HAVE_CREDS
661 {
662 int b = 0;
663
664 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
665 goto fail;
666
667 p->read_creds_valid = p->read_creds_valid || b;
668 }
669 #else
670 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
671 goto fail;
672 #endif
673
674 if (release_memblock)
675 pa_memblock_release(release_memblock);
676
677 p->read.index += r;
678
679 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
680 uint32_t flags, length, channel;
681 /* Reading of frame descriptor complete */
682
683 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
684
685 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
686 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
687 return -1;
688 }
689
690 if (flags == PA_FLAG_SHMRELEASE) {
691
692 /* This is a SHM memblock release frame with no payload */
693
694 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
695
696 pa_assert(p->export);
697 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
698
699 goto frame_done;
700
701 } else if (flags == PA_FLAG_SHMREVOKE) {
702
703 /* This is a SHM memblock revoke frame with no payload */
704
705 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
706
707 pa_assert(p->import);
708 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
709
710 goto frame_done;
711 }
712
713 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
714
715 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
716 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length);
717 return -1;
718 }
719
720 pa_assert(!p->read.packet && !p->read.memblock);
721
722 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
723
724 if (channel == (uint32_t) -1) {
725
726 if (flags != 0) {
727 pa_log_warn("Received packet frame with invalid flags value.");
728 return -1;
729 }
730
731 /* Frame is a packet frame */
732 p->read.packet = pa_packet_new(length);
733 p->read.data = p->read.packet->data;
734
735 } else {
736
737 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
738 pa_log_warn("Received memblock frame with invalid seek mode.");
739 return -1;
740 }
741
742 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
743
744 if (length != sizeof(p->read.shm_info)) {
745 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
746 return -1;
747 }
748
749 /* Frame is a memblock frame referencing an SHM memblock */
750 p->read.data = p->read.shm_info;
751
752 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
753
754 /* Frame is a memblock frame */
755
756 p->read.memblock = pa_memblock_new(p->mempool, length);
757 p->read.data = NULL;
758 } else {
759
760 pa_log_warn("Recieved memblock frame with invalid flags value.");
761 return -1;
762 }
763 }
764
765 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
766 /* Frame payload available */
767
768 if (p->read.memblock && p->recieve_memblock_callback) {
769
770 /* Is this memblock data? Than pass it to the user */
771 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
772
773 if (l > 0) {
774 pa_memchunk chunk;
775
776 chunk.memblock = p->read.memblock;
777 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
778 chunk.length = l;
779
780 if (p->recieve_memblock_callback) {
781 int64_t offset;
782
783 offset = (int64_t) (
784 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
785 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
786
787 p->recieve_memblock_callback(
788 p,
789 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
790 offset,
791 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
792 &chunk,
793 p->recieve_memblock_callback_userdata);
794 }
795
796 /* Drop seek info for following callbacks */
797 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
798 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
799 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
800 }
801 }
802
803 /* Frame complete */
804 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
805
806 if (p->read.memblock) {
807
808 /* This was a memblock frame. We can unref the memblock now */
809 pa_memblock_unref(p->read.memblock);
810
811 } else if (p->read.packet) {
812
813 if (p->recieve_packet_callback)
814 #ifdef HAVE_CREDS
815 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
816 #else
817 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
818 #endif
819
820 pa_packet_unref(p->read.packet);
821 } else {
822 pa_memblock *b;
823
824 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
825
826 pa_assert(p->import);
827
828 if (!(b = pa_memimport_get(p->import,
829 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
830 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
831 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
832 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
833
834 pa_log_warn("Failed to import memory block.");
835 return -1;
836 }
837
838 if (p->recieve_memblock_callback) {
839 int64_t offset;
840 pa_memchunk chunk;
841
842 chunk.memblock = b;
843 chunk.index = 0;
844 chunk.length = pa_memblock_get_length(b);
845
846 offset = (int64_t) (
847 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
848 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
849
850 p->recieve_memblock_callback(
851 p,
852 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
853 offset,
854 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
855 &chunk,
856 p->recieve_memblock_callback_userdata);
857 }
858
859 pa_memblock_unref(b);
860 }
861
862 goto frame_done;
863 }
864 }
865
866 return 0;
867
868 frame_done:
869 p->read.memblock = NULL;
870 p->read.packet = NULL;
871 p->read.index = 0;
872 p->read.data = NULL;
873
874 #ifdef HAVE_CREDS
875 p->read_creds_valid = 0;
876 #endif
877
878 return 0;
879
880 fail:
881 if (release_memblock)
882 pa_memblock_release(release_memblock);
883
884 return -1;
885 }
886
887 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
888 pa_assert(p);
889 pa_assert(PA_REFCNT_VALUE(p) > 0);
890
891 p->die_callback = cb;
892 p->die_callback_userdata = userdata;
893 }
894
895 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
896 pa_assert(p);
897 pa_assert(PA_REFCNT_VALUE(p) > 0);
898
899 p->drain_callback = cb;
900 p->drain_callback_userdata = userdata;
901 }
902
903 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
904 pa_assert(p);
905 pa_assert(PA_REFCNT_VALUE(p) > 0);
906
907 p->recieve_packet_callback = cb;
908 p->recieve_packet_callback_userdata = userdata;
909 }
910
911 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
912 pa_assert(p);
913 pa_assert(PA_REFCNT_VALUE(p) > 0);
914
915 p->recieve_memblock_callback = cb;
916 p->recieve_memblock_callback_userdata = userdata;
917 }
918
919 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
920 pa_assert(p);
921 pa_assert(PA_REFCNT_VALUE(p) > 0);
922
923 p->release_callback = cb;
924 p->release_callback_userdata = userdata;
925 }
926
927 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
928 pa_assert(p);
929 pa_assert(PA_REFCNT_VALUE(p) > 0);
930
931 p->release_callback = cb;
932 p->release_callback_userdata = userdata;
933 }
934
935 int pa_pstream_is_pending(pa_pstream *p) {
936 int b;
937
938 pa_assert(p);
939 pa_assert(PA_REFCNT_VALUE(p) > 0);
940
941 if (p->dead)
942 b = 0;
943 else
944 b = p->write.current || !pa_queue_is_empty(p->send_queue);
945
946 return b;
947 }
948
949 void pa_pstream_unref(pa_pstream*p) {
950 pa_assert(p);
951 pa_assert(PA_REFCNT_VALUE(p) > 0);
952
953 if (PA_REFCNT_DEC(p) <= 0)
954 pstream_free(p);
955 }
956
957 pa_pstream* pa_pstream_ref(pa_pstream*p) {
958 pa_assert(p);
959 pa_assert(PA_REFCNT_VALUE(p) > 0);
960
961 PA_REFCNT_INC(p);
962 return p;
963 }
964
965 void pa_pstream_unlink(pa_pstream *p) {
966 pa_assert(p);
967
968 if (p->dead)
969 return;
970
971 p->dead = 1;
972
973 if (p->import) {
974 pa_memimport_free(p->import);
975 p->import = NULL;
976 }
977
978 if (p->export) {
979 pa_memexport_free(p->export);
980 p->export = NULL;
981 }
982
983 if (p->io) {
984 pa_iochannel_free(p->io);
985 p->io = NULL;
986 }
987
988 if (p->defer_event) {
989 p->mainloop->defer_free(p->defer_event);
990 p->defer_event = NULL;
991 }
992
993 p->die_callback = NULL;
994 p->drain_callback = NULL;
995 p->recieve_packet_callback = NULL;
996 p->recieve_memblock_callback = NULL;
997 }
998
999 void pa_pstream_use_shm(pa_pstream *p, int enable) {
1000 pa_assert(p);
1001 pa_assert(PA_REFCNT_VALUE(p) > 0);
1002
1003 p->use_shm = enable;
1004
1005 if (enable) {
1006
1007 if (!p->export)
1008 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1009
1010 } else {
1011
1012 if (p->export) {
1013 pa_memexport_free(p->export);
1014 p->export = NULL;
1015 }
1016 }
1017 }