]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
pstream: Optimise write of smaller packages
[pulseaudio] / src / pulsecore / pstream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30
31 #ifdef HAVE_NETINET_IN_H
32 #include <netinet/in.h>
33 #endif
34
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/socket.h>
38 #include <pulsecore/queue.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/creds.h>
41 #include <pulsecore/refcnt.h>
42 #include <pulsecore/flist.h>
43 #include <pulsecore/macro.h>
44
45 #include "pstream.h"
46
47 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
48 #define PA_FLAG_SHMDATA 0x80000000LU
49 #define PA_FLAG_SHMRELEASE 0x40000000LU
50 #define PA_FLAG_SHMREVOKE 0xC0000000LU
51 #define PA_FLAG_SHMMASK 0xFF000000LU
52 #define PA_FLAG_SEEKMASK 0x000000FFLU
53
54 /* The sequence descriptor header consists of 5 32bit integers: */
55 enum {
56 PA_PSTREAM_DESCRIPTOR_LENGTH,
57 PA_PSTREAM_DESCRIPTOR_CHANNEL,
58 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
59 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
60 PA_PSTREAM_DESCRIPTOR_FLAGS,
61 PA_PSTREAM_DESCRIPTOR_MAX
62 };
63
64 /* If we have an SHM block, this info follows the descriptor */
65 enum {
66 PA_PSTREAM_SHM_BLOCKID,
67 PA_PSTREAM_SHM_SHMID,
68 PA_PSTREAM_SHM_INDEX,
69 PA_PSTREAM_SHM_LENGTH,
70 PA_PSTREAM_SHM_MAX
71 };
72
73 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
74
75 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
76
77 #define MINIBUF_SIZE (256)
78
79 /* To allow uploading a single sample in one frame, this value should be the
80 * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
81 */
82 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
83
84 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
85
86 struct item_info {
87 enum {
88 PA_PSTREAM_ITEM_PACKET,
89 PA_PSTREAM_ITEM_MEMBLOCK,
90 PA_PSTREAM_ITEM_SHMRELEASE,
91 PA_PSTREAM_ITEM_SHMREVOKE
92 } type;
93
94 /* packet info */
95 pa_packet *packet;
96 #ifdef HAVE_CREDS
97 pa_bool_t with_creds;
98 pa_creds creds;
99 #endif
100
101 /* memblock info */
102 pa_memchunk chunk;
103 uint32_t channel;
104 int64_t offset;
105 pa_seek_mode_t seek_mode;
106
107 /* release/revoke info */
108 uint32_t block_id;
109 };
110
111 struct pa_pstream {
112 PA_REFCNT_DECLARE;
113
114 pa_mainloop_api *mainloop;
115 pa_defer_event *defer_event;
116 pa_iochannel *io;
117
118 pa_queue *send_queue;
119
120 pa_bool_t dead;
121
122 struct {
123 union {
124 uint8_t minibuf[MINIBUF_SIZE];
125 pa_pstream_descriptor descriptor;
126 };
127 struct item_info* current;
128 void *data;
129 size_t index;
130 int minibuf_validsize;
131 pa_memchunk memchunk;
132 } write;
133
134 struct {
135 pa_pstream_descriptor descriptor;
136 pa_memblock *memblock;
137 pa_packet *packet;
138 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
139 void *data;
140 size_t index;
141 } read;
142
143 pa_bool_t use_shm;
144 pa_memimport *import;
145 pa_memexport *export;
146
147 pa_pstream_packet_cb_t receive_packet_callback;
148 void *receive_packet_callback_userdata;
149
150 pa_pstream_memblock_cb_t receive_memblock_callback;
151 void *receive_memblock_callback_userdata;
152
153 pa_pstream_notify_cb_t drain_callback;
154 void *drain_callback_userdata;
155
156 pa_pstream_notify_cb_t die_callback;
157 void *die_callback_userdata;
158
159 pa_pstream_block_id_cb_t revoke_callback;
160 void *revoke_callback_userdata;
161
162 pa_pstream_block_id_cb_t release_callback;
163 void *release_callback_userdata;
164
165 pa_mempool *mempool;
166
167 #ifdef HAVE_CREDS
168 pa_creds read_creds, write_creds;
169 pa_bool_t read_creds_valid, send_creds_now;
170 #endif
171 };
172
173 static int do_write(pa_pstream *p);
174 static int do_read(pa_pstream *p);
175
176 static void do_pstream_read_write(pa_pstream *p) {
177 pa_assert(p);
178 pa_assert(PA_REFCNT_VALUE(p) > 0);
179
180 pa_pstream_ref(p);
181
182 p->mainloop->defer_enable(p->defer_event, 0);
183
184 if (!p->dead && pa_iochannel_is_readable(p->io)) {
185 if (do_read(p) < 0)
186 goto fail;
187 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
188 goto fail;
189
190 if (!p->dead && pa_iochannel_is_writable(p->io)) {
191 if (do_write(p) < 0)
192 goto fail;
193 }
194
195 pa_pstream_unref(p);
196 return;
197
198 fail:
199
200 if (p->die_callback)
201 p->die_callback(p, p->die_callback_userdata);
202
203 pa_pstream_unlink(p);
204 pa_pstream_unref(p);
205 }
206
207 static void io_callback(pa_iochannel*io, void *userdata) {
208 pa_pstream *p = userdata;
209
210 pa_assert(p);
211 pa_assert(PA_REFCNT_VALUE(p) > 0);
212 pa_assert(p->io == io);
213
214 do_pstream_read_write(p);
215 }
216
217 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
218 pa_pstream *p = userdata;
219
220 pa_assert(p);
221 pa_assert(PA_REFCNT_VALUE(p) > 0);
222 pa_assert(p->defer_event == e);
223 pa_assert(p->mainloop == m);
224
225 do_pstream_read_write(p);
226 }
227
228 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
229
230 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
231 pa_pstream *p;
232
233 pa_assert(m);
234 pa_assert(io);
235 pa_assert(pool);
236
237 p = pa_xnew(pa_pstream, 1);
238 PA_REFCNT_INIT(p);
239 p->io = io;
240 pa_iochannel_set_callback(io, io_callback, p);
241 p->dead = FALSE;
242
243 p->mainloop = m;
244 p->defer_event = m->defer_new(m, defer_callback, p);
245 m->defer_enable(p->defer_event, 0);
246
247 p->send_queue = pa_queue_new();
248
249 p->write.current = NULL;
250 p->write.index = 0;
251 pa_memchunk_reset(&p->write.memchunk);
252 p->read.memblock = NULL;
253 p->read.packet = NULL;
254 p->read.index = 0;
255
256 p->receive_packet_callback = NULL;
257 p->receive_packet_callback_userdata = NULL;
258 p->receive_memblock_callback = NULL;
259 p->receive_memblock_callback_userdata = NULL;
260 p->drain_callback = NULL;
261 p->drain_callback_userdata = NULL;
262 p->die_callback = NULL;
263 p->die_callback_userdata = NULL;
264 p->revoke_callback = NULL;
265 p->revoke_callback_userdata = NULL;
266 p->release_callback = NULL;
267 p->release_callback_userdata = NULL;
268
269 p->mempool = pool;
270
271 p->use_shm = FALSE;
272 p->export = NULL;
273
274 /* We do importing unconditionally */
275 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
276
277 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
278 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
279
280 #ifdef HAVE_CREDS
281 p->send_creds_now = FALSE;
282 p->read_creds_valid = FALSE;
283 #endif
284 return p;
285 }
286
287 static void item_free(void *item) {
288 struct item_info *i = item;
289 pa_assert(i);
290
291 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
292 pa_assert(i->chunk.memblock);
293 pa_memblock_unref(i->chunk.memblock);
294 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
295 pa_assert(i->packet);
296 pa_packet_unref(i->packet);
297 }
298
299 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
300 pa_xfree(i);
301 }
302
303 static void pstream_free(pa_pstream *p) {
304 pa_assert(p);
305
306 pa_pstream_unlink(p);
307
308 pa_queue_free(p->send_queue, item_free);
309
310 if (p->write.current)
311 item_free(p->write.current);
312
313 if (p->write.memchunk.memblock)
314 pa_memblock_unref(p->write.memchunk.memblock);
315
316 if (p->read.memblock)
317 pa_memblock_unref(p->read.memblock);
318
319 if (p->read.packet)
320 pa_packet_unref(p->read.packet);
321
322 pa_xfree(p);
323 }
324
325 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
326 struct item_info *i;
327
328 pa_assert(p);
329 pa_assert(PA_REFCNT_VALUE(p) > 0);
330 pa_assert(packet);
331
332 if (p->dead)
333 return;
334
335 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
336 i = pa_xnew(struct item_info, 1);
337
338 i->type = PA_PSTREAM_ITEM_PACKET;
339 i->packet = pa_packet_ref(packet);
340
341 #ifdef HAVE_CREDS
342 if ((i->with_creds = !!creds))
343 i->creds = *creds;
344 #endif
345
346 pa_queue_push(p->send_queue, i);
347
348 p->mainloop->defer_enable(p->defer_event, 1);
349 }
350
351 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
352 size_t length, idx;
353 size_t bsm;
354
355 pa_assert(p);
356 pa_assert(PA_REFCNT_VALUE(p) > 0);
357 pa_assert(channel != (uint32_t) -1);
358 pa_assert(chunk);
359
360 if (p->dead)
361 return;
362
363 idx = 0;
364 length = chunk->length;
365
366 bsm = pa_mempool_block_size_max(p->mempool);
367
368 while (length > 0) {
369 struct item_info *i;
370 size_t n;
371
372 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
373 i = pa_xnew(struct item_info, 1);
374 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
375
376 n = PA_MIN(length, bsm);
377 i->chunk.index = chunk->index + idx;
378 i->chunk.length = n;
379 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
380
381 i->channel = channel;
382 i->offset = offset;
383 i->seek_mode = seek_mode;
384 #ifdef HAVE_CREDS
385 i->with_creds = FALSE;
386 #endif
387
388 pa_queue_push(p->send_queue, i);
389
390 idx += n;
391 length -= n;
392 }
393
394 p->mainloop->defer_enable(p->defer_event, 1);
395 }
396
397 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
398 struct item_info *item;
399 pa_assert(p);
400 pa_assert(PA_REFCNT_VALUE(p) > 0);
401
402 if (p->dead)
403 return;
404
405 /* pa_log("Releasing block %u", block_id); */
406
407 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
408 item = pa_xnew(struct item_info, 1);
409 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
410 item->block_id = block_id;
411 #ifdef HAVE_CREDS
412 item->with_creds = FALSE;
413 #endif
414
415 pa_queue_push(p->send_queue, item);
416 p->mainloop->defer_enable(p->defer_event, 1);
417 }
418
419 /* might be called from thread context */
420 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
421 pa_pstream *p = userdata;
422
423 pa_assert(p);
424 pa_assert(PA_REFCNT_VALUE(p) > 0);
425
426 if (p->dead)
427 return;
428
429 if (p->release_callback)
430 p->release_callback(p, block_id, p->release_callback_userdata);
431 else
432 pa_pstream_send_release(p, block_id);
433 }
434
435 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
436 struct item_info *item;
437 pa_assert(p);
438 pa_assert(PA_REFCNT_VALUE(p) > 0);
439
440 if (p->dead)
441 return;
442 /* pa_log("Revoking block %u", block_id); */
443
444 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
445 item = pa_xnew(struct item_info, 1);
446 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
447 item->block_id = block_id;
448 #ifdef HAVE_CREDS
449 item->with_creds = FALSE;
450 #endif
451
452 pa_queue_push(p->send_queue, item);
453 p->mainloop->defer_enable(p->defer_event, 1);
454 }
455
456 /* might be called from thread context */
457 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
458 pa_pstream *p = userdata;
459
460 pa_assert(p);
461 pa_assert(PA_REFCNT_VALUE(p) > 0);
462
463 if (p->revoke_callback)
464 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
465 else
466 pa_pstream_send_revoke(p, block_id);
467 }
468
469 static void prepare_next_write_item(pa_pstream *p) {
470 pa_assert(p);
471 pa_assert(PA_REFCNT_VALUE(p) > 0);
472
473 p->write.current = pa_queue_pop(p->send_queue);
474
475 if (!p->write.current)
476 return;
477 p->write.index = 0;
478 p->write.data = NULL;
479 p->write.minibuf_validsize = 0;
480 pa_memchunk_reset(&p->write.memchunk);
481
482 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
483 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
484 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
485 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
486 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
487
488 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
489
490 pa_assert(p->write.current->packet);
491 p->write.data = p->write.current->packet->data;
492 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->packet->length);
493
494 if (p->write.current->packet->length <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
495 memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, p->write.current->packet->length);
496 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + p->write.current->packet->length;
497 }
498
499 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
500
501 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
502 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
503
504 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
505
506 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
507 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
508
509 } else {
510 uint32_t flags;
511 pa_bool_t send_payload = TRUE;
512
513 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
514 pa_assert(p->write.current->chunk.memblock);
515
516 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
517 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
518 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
519
520 flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
521
522 if (p->use_shm) {
523 uint32_t block_id, shm_id;
524 size_t offset, length;
525 uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
526 size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
527
528 pa_assert(p->export);
529
530 if (pa_memexport_put(p->export,
531 p->write.current->chunk.memblock,
532 &block_id,
533 &shm_id,
534 &offset,
535 &length) >= 0) {
536
537 flags |= PA_FLAG_SHMDATA;
538 send_payload = FALSE;
539
540 shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
541 shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
542 shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
543 shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
544
545 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
546 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
547 }
548 /* else */
549 /* pa_log_warn("Failed to export memory block."); */
550 }
551
552 if (send_payload) {
553 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
554 p->write.memchunk = p->write.current->chunk;
555 pa_memblock_ref(p->write.memchunk.memblock);
556 p->write.data = NULL;
557 }
558
559 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
560 }
561
562 #ifdef HAVE_CREDS
563 if ((p->send_creds_now = p->write.current->with_creds))
564 p->write_creds = p->write.current->creds;
565 #endif
566 }
567
568 static int do_write(pa_pstream *p) {
569 void *d;
570 size_t l;
571 ssize_t r;
572 pa_memblock *release_memblock = NULL;
573
574 pa_assert(p);
575 pa_assert(PA_REFCNT_VALUE(p) > 0);
576
577 if (!p->write.current)
578 prepare_next_write_item(p);
579
580 if (!p->write.current)
581 return 0;
582
583 if (p->write.minibuf_validsize > 0) {
584 d = p->write.minibuf + p->write.index;
585 l = p->write.minibuf_validsize - p->write.index;
586 } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
587 d = (uint8_t*) p->write.descriptor + p->write.index;
588 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
589 } else {
590 pa_assert(p->write.data || p->write.memchunk.memblock);
591
592 if (p->write.data)
593 d = p->write.data;
594 else {
595 d = pa_memblock_acquire_chunk(&p->write.memchunk);
596 release_memblock = p->write.memchunk.memblock;
597 }
598
599 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
600 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
601 }
602
603 pa_assert(l > 0);
604
605 #ifdef HAVE_CREDS
606 if (p->send_creds_now) {
607
608 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
609 goto fail;
610
611 p->send_creds_now = FALSE;
612 } else
613 #endif
614
615 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
616 goto fail;
617
618 if (release_memblock)
619 pa_memblock_release(release_memblock);
620
621 p->write.index += (size_t) r;
622
623 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
624 pa_assert(p->write.current);
625 item_free(p->write.current);
626 p->write.current = NULL;
627
628 if (p->write.memchunk.memblock)
629 pa_memblock_unref(p->write.memchunk.memblock);
630
631 pa_memchunk_reset(&p->write.memchunk);
632
633 if (p->drain_callback && !pa_pstream_is_pending(p))
634 p->drain_callback(p, p->drain_callback_userdata);
635 }
636
637 return 0;
638
639 fail:
640
641 if (release_memblock)
642 pa_memblock_release(release_memblock);
643
644 return -1;
645 }
646
647 static int do_read(pa_pstream *p) {
648 void *d;
649 size_t l;
650 ssize_t r;
651 pa_memblock *release_memblock = NULL;
652 pa_assert(p);
653 pa_assert(PA_REFCNT_VALUE(p) > 0);
654
655 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
656 d = (uint8_t*) p->read.descriptor + p->read.index;
657 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
658 } else {
659 pa_assert(p->read.data || p->read.memblock);
660
661 if (p->read.data)
662 d = p->read.data;
663 else {
664 d = pa_memblock_acquire(p->read.memblock);
665 release_memblock = p->read.memblock;
666 }
667
668 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
669 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
670 }
671
672 #ifdef HAVE_CREDS
673 {
674 pa_bool_t b = 0;
675
676 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
677 goto fail;
678
679 p->read_creds_valid = p->read_creds_valid || b;
680 }
681 #else
682 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
683 goto fail;
684 #endif
685
686 if (release_memblock)
687 pa_memblock_release(release_memblock);
688
689 p->read.index += (size_t) r;
690
691 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
692 uint32_t flags, length, channel;
693 /* Reading of frame descriptor complete */
694
695 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
696
697 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
698 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
699 return -1;
700 }
701
702 if (flags == PA_FLAG_SHMRELEASE) {
703
704 /* This is a SHM memblock release frame with no payload */
705
706 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
707
708 pa_assert(p->export);
709 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
710
711 goto frame_done;
712
713 } else if (flags == PA_FLAG_SHMREVOKE) {
714
715 /* This is a SHM memblock revoke frame with no payload */
716
717 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
718
719 pa_assert(p->import);
720 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
721
722 goto frame_done;
723 }
724
725 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
726
727 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
728 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
729 return -1;
730 }
731
732 pa_assert(!p->read.packet && !p->read.memblock);
733
734 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
735
736 if (channel == (uint32_t) -1) {
737
738 if (flags != 0) {
739 pa_log_warn("Received packet frame with invalid flags value.");
740 return -1;
741 }
742
743 /* Frame is a packet frame */
744 p->read.packet = pa_packet_new(length);
745 p->read.data = p->read.packet->data;
746
747 } else {
748
749 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
750 pa_log_warn("Received memblock frame with invalid seek mode.");
751 return -1;
752 }
753
754 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
755
756 if (length != sizeof(p->read.shm_info)) {
757 pa_log_warn("Received SHM memblock frame with Invalid frame length.");
758 return -1;
759 }
760
761 /* Frame is a memblock frame referencing an SHM memblock */
762 p->read.data = p->read.shm_info;
763
764 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
765
766 /* Frame is a memblock frame */
767
768 p->read.memblock = pa_memblock_new(p->mempool, length);
769 p->read.data = NULL;
770 } else {
771
772 pa_log_warn("Received memblock frame with invalid flags value.");
773 return -1;
774 }
775 }
776
777 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
778 /* Frame payload available */
779
780 if (p->read.memblock && p->receive_memblock_callback) {
781
782 /* Is this memblock data? Than pass it to the user */
783 l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
784
785 if (l > 0) {
786 pa_memchunk chunk;
787
788 chunk.memblock = p->read.memblock;
789 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
790 chunk.length = l;
791
792 if (p->receive_memblock_callback) {
793 int64_t offset;
794
795 offset = (int64_t) (
796 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
797 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
798
799 p->receive_memblock_callback(
800 p,
801 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
802 offset,
803 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
804 &chunk,
805 p->receive_memblock_callback_userdata);
806 }
807
808 /* Drop seek info for following callbacks */
809 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
810 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
811 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
812 }
813 }
814
815 /* Frame complete */
816 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
817
818 if (p->read.memblock) {
819
820 /* This was a memblock frame. We can unref the memblock now */
821 pa_memblock_unref(p->read.memblock);
822
823 } else if (p->read.packet) {
824
825 if (p->receive_packet_callback)
826 #ifdef HAVE_CREDS
827 p->receive_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->receive_packet_callback_userdata);
828 #else
829 p->receive_packet_callback(p, p->read.packet, NULL, p->receive_packet_callback_userdata);
830 #endif
831
832 pa_packet_unref(p->read.packet);
833 } else {
834 pa_memblock *b;
835
836 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
837
838 pa_assert(p->import);
839
840 if (!(b = pa_memimport_get(p->import,
841 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
842 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
843 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
844 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
845
846 if (pa_log_ratelimit(PA_LOG_DEBUG))
847 pa_log_debug("Failed to import memory block.");
848 }
849
850 if (p->receive_memblock_callback) {
851 int64_t offset;
852 pa_memchunk chunk;
853
854 chunk.memblock = b;
855 chunk.index = 0;
856 chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
857
858 offset = (int64_t) (
859 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
860 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
861
862 p->receive_memblock_callback(
863 p,
864 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
865 offset,
866 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
867 &chunk,
868 p->receive_memblock_callback_userdata);
869 }
870
871 if (b)
872 pa_memblock_unref(b);
873 }
874
875 goto frame_done;
876 }
877 }
878
879 return 0;
880
881 frame_done:
882 p->read.memblock = NULL;
883 p->read.packet = NULL;
884 p->read.index = 0;
885 p->read.data = NULL;
886
887 #ifdef HAVE_CREDS
888 p->read_creds_valid = FALSE;
889 #endif
890
891 return 0;
892
893 fail:
894 if (release_memblock)
895 pa_memblock_release(release_memblock);
896
897 return -1;
898 }
899
900 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
901 pa_assert(p);
902 pa_assert(PA_REFCNT_VALUE(p) > 0);
903
904 p->die_callback = cb;
905 p->die_callback_userdata = userdata;
906 }
907
908 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
909 pa_assert(p);
910 pa_assert(PA_REFCNT_VALUE(p) > 0);
911
912 p->drain_callback = cb;
913 p->drain_callback_userdata = userdata;
914 }
915
916 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
917 pa_assert(p);
918 pa_assert(PA_REFCNT_VALUE(p) > 0);
919
920 p->receive_packet_callback = cb;
921 p->receive_packet_callback_userdata = userdata;
922 }
923
924 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
925 pa_assert(p);
926 pa_assert(PA_REFCNT_VALUE(p) > 0);
927
928 p->receive_memblock_callback = cb;
929 p->receive_memblock_callback_userdata = userdata;
930 }
931
932 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
933 pa_assert(p);
934 pa_assert(PA_REFCNT_VALUE(p) > 0);
935
936 p->release_callback = cb;
937 p->release_callback_userdata = userdata;
938 }
939
940 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
941 pa_assert(p);
942 pa_assert(PA_REFCNT_VALUE(p) > 0);
943
944 p->release_callback = cb;
945 p->release_callback_userdata = userdata;
946 }
947
948 pa_bool_t pa_pstream_is_pending(pa_pstream *p) {
949 pa_bool_t b;
950
951 pa_assert(p);
952 pa_assert(PA_REFCNT_VALUE(p) > 0);
953
954 if (p->dead)
955 b = FALSE;
956 else
957 b = p->write.current || !pa_queue_isempty(p->send_queue);
958
959 return b;
960 }
961
962 void pa_pstream_unref(pa_pstream*p) {
963 pa_assert(p);
964 pa_assert(PA_REFCNT_VALUE(p) > 0);
965
966 if (PA_REFCNT_DEC(p) <= 0)
967 pstream_free(p);
968 }
969
970 pa_pstream* pa_pstream_ref(pa_pstream*p) {
971 pa_assert(p);
972 pa_assert(PA_REFCNT_VALUE(p) > 0);
973
974 PA_REFCNT_INC(p);
975 return p;
976 }
977
978 void pa_pstream_unlink(pa_pstream *p) {
979 pa_assert(p);
980
981 if (p->dead)
982 return;
983
984 p->dead = TRUE;
985
986 if (p->import) {
987 pa_memimport_free(p->import);
988 p->import = NULL;
989 }
990
991 if (p->export) {
992 pa_memexport_free(p->export);
993 p->export = NULL;
994 }
995
996 if (p->io) {
997 pa_iochannel_free(p->io);
998 p->io = NULL;
999 }
1000
1001 if (p->defer_event) {
1002 p->mainloop->defer_free(p->defer_event);
1003 p->defer_event = NULL;
1004 }
1005
1006 p->die_callback = NULL;
1007 p->drain_callback = NULL;
1008 p->receive_packet_callback = NULL;
1009 p->receive_memblock_callback = NULL;
1010 }
1011
1012 void pa_pstream_enable_shm(pa_pstream *p, pa_bool_t enable) {
1013 pa_assert(p);
1014 pa_assert(PA_REFCNT_VALUE(p) > 0);
1015
1016 p->use_shm = enable;
1017
1018 if (enable) {
1019
1020 if (!p->export)
1021 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1022
1023 } else {
1024
1025 if (p->export) {
1026 pa_memexport_free(p->export);
1027 p->export = NULL;
1028 }
1029 }
1030 }
1031
1032 pa_bool_t pa_pstream_get_shm(pa_pstream *p) {
1033 pa_assert(p);
1034 pa_assert(PA_REFCNT_VALUE(p) > 0);
1035
1036 return p->use_shm;
1037 }