2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #ifdef HAVE_NETINET_IN_H
32 #include <netinet/in.h>
35 #include <pulse/xmalloc.h>
37 #include <pulsecore/socket.h>
38 #include <pulsecore/queue.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/creds.h>
41 #include <pulsecore/refcnt.h>
42 #include <pulsecore/flist.h>
43 #include <pulsecore/macro.h>
47 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
48 #define PA_FLAG_SHMDATA 0x80000000LU
49 #define PA_FLAG_SHMRELEASE 0x40000000LU
50 #define PA_FLAG_SHMREVOKE 0xC0000000LU
51 #define PA_FLAG_SHMMASK 0xFF000000LU
52 #define PA_FLAG_SEEKMASK 0x000000FFLU
54 /* The sequence descriptor header consists of 5 32bit integers: */
56 PA_PSTREAM_DESCRIPTOR_LENGTH
,
57 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
58 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
59 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
60 PA_PSTREAM_DESCRIPTOR_FLAGS
,
61 PA_PSTREAM_DESCRIPTOR_MAX
64 /* If we have an SHM block, this info follows the descriptor */
66 PA_PSTREAM_SHM_BLOCKID
,
69 PA_PSTREAM_SHM_LENGTH
,
73 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
75 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77 #define MINIBUF_SIZE (256)
79 /* To allow uploading a single sample in one frame, this value should be the
80 * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
82 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
84 PA_STATIC_FLIST_DECLARE(items
, 0, pa_xfree
);
88 PA_PSTREAM_ITEM_PACKET
,
89 PA_PSTREAM_ITEM_MEMBLOCK
,
90 PA_PSTREAM_ITEM_SHMRELEASE
,
91 PA_PSTREAM_ITEM_SHMREVOKE
105 pa_seek_mode_t seek_mode
;
107 /* release/revoke info */
114 pa_mainloop_api
*mainloop
;
115 pa_defer_event
*defer_event
;
118 pa_queue
*send_queue
;
124 uint8_t minibuf
[MINIBUF_SIZE
];
125 pa_pstream_descriptor descriptor
;
127 struct item_info
* current
;
130 int minibuf_validsize
;
131 pa_memchunk memchunk
;
135 pa_pstream_descriptor descriptor
;
136 pa_memblock
*memblock
;
138 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
144 pa_memimport
*import
;
145 pa_memexport
*export
;
147 pa_pstream_packet_cb_t receive_packet_callback
;
148 void *receive_packet_callback_userdata
;
150 pa_pstream_memblock_cb_t receive_memblock_callback
;
151 void *receive_memblock_callback_userdata
;
153 pa_pstream_notify_cb_t drain_callback
;
154 void *drain_callback_userdata
;
156 pa_pstream_notify_cb_t die_callback
;
157 void *die_callback_userdata
;
159 pa_pstream_block_id_cb_t revoke_callback
;
160 void *revoke_callback_userdata
;
162 pa_pstream_block_id_cb_t release_callback
;
163 void *release_callback_userdata
;
168 pa_creds read_creds
, write_creds
;
169 bool read_creds_valid
, send_creds_now
;
173 static int do_write(pa_pstream
*p
);
174 static int do_read(pa_pstream
*p
);
176 static void do_pstream_read_write(pa_pstream
*p
) {
178 pa_assert(PA_REFCNT_VALUE(p
) > 0);
182 p
->mainloop
->defer_enable(p
->defer_event
, 0);
184 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
187 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
190 while (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
204 p
->die_callback(p
, p
->die_callback_userdata
);
206 pa_pstream_unlink(p
);
210 static void io_callback(pa_iochannel
*io
, void *userdata
) {
211 pa_pstream
*p
= userdata
;
214 pa_assert(PA_REFCNT_VALUE(p
) > 0);
215 pa_assert(p
->io
== io
);
217 do_pstream_read_write(p
);
220 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
221 pa_pstream
*p
= userdata
;
224 pa_assert(PA_REFCNT_VALUE(p
) > 0);
225 pa_assert(p
->defer_event
== e
);
226 pa_assert(p
->mainloop
== m
);
228 do_pstream_read_write(p
);
231 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
233 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
240 p
= pa_xnew(pa_pstream
, 1);
243 pa_iochannel_set_callback(io
, io_callback
, p
);
247 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
248 m
->defer_enable(p
->defer_event
, 0);
250 p
->send_queue
= pa_queue_new();
252 p
->write
.current
= NULL
;
254 pa_memchunk_reset(&p
->write
.memchunk
);
255 p
->read
.memblock
= NULL
;
256 p
->read
.packet
= NULL
;
259 p
->receive_packet_callback
= NULL
;
260 p
->receive_packet_callback_userdata
= NULL
;
261 p
->receive_memblock_callback
= NULL
;
262 p
->receive_memblock_callback_userdata
= NULL
;
263 p
->drain_callback
= NULL
;
264 p
->drain_callback_userdata
= NULL
;
265 p
->die_callback
= NULL
;
266 p
->die_callback_userdata
= NULL
;
267 p
->revoke_callback
= NULL
;
268 p
->revoke_callback_userdata
= NULL
;
269 p
->release_callback
= NULL
;
270 p
->release_callback_userdata
= NULL
;
277 /* We do importing unconditionally */
278 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
280 pa_iochannel_socket_set_rcvbuf(io
, pa_mempool_block_size_max(p
->mempool
));
281 pa_iochannel_socket_set_sndbuf(io
, pa_mempool_block_size_max(p
->mempool
));
284 p
->send_creds_now
= false;
285 p
->read_creds_valid
= false;
290 static void item_free(void *item
) {
291 struct item_info
*i
= item
;
294 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
295 pa_assert(i
->chunk
.memblock
);
296 pa_memblock_unref(i
->chunk
.memblock
);
297 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
298 pa_assert(i
->packet
);
299 pa_packet_unref(i
->packet
);
302 if (pa_flist_push(PA_STATIC_FLIST_GET(items
), i
) < 0)
306 static void pstream_free(pa_pstream
*p
) {
309 pa_pstream_unlink(p
);
311 pa_queue_free(p
->send_queue
, item_free
);
313 if (p
->write
.current
)
314 item_free(p
->write
.current
);
316 if (p
->write
.memchunk
.memblock
)
317 pa_memblock_unref(p
->write
.memchunk
.memblock
);
319 if (p
->read
.memblock
)
320 pa_memblock_unref(p
->read
.memblock
);
323 pa_packet_unref(p
->read
.packet
);
328 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
332 pa_assert(PA_REFCNT_VALUE(p
) > 0);
338 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
339 i
= pa_xnew(struct item_info
, 1);
341 i
->type
= PA_PSTREAM_ITEM_PACKET
;
342 i
->packet
= pa_packet_ref(packet
);
345 if ((i
->with_creds
= !!creds
))
349 pa_queue_push(p
->send_queue
, i
);
351 p
->mainloop
->defer_enable(p
->defer_event
, 1);
354 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
359 pa_assert(PA_REFCNT_VALUE(p
) > 0);
360 pa_assert(channel
!= (uint32_t) -1);
367 length
= chunk
->length
;
369 bsm
= pa_mempool_block_size_max(p
->mempool
);
375 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
376 i
= pa_xnew(struct item_info
, 1);
377 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
379 n
= PA_MIN(length
, bsm
);
380 i
->chunk
.index
= chunk
->index
+ idx
;
382 i
->chunk
.memblock
= pa_memblock_ref(chunk
->memblock
);
384 i
->channel
= channel
;
386 i
->seek_mode
= seek_mode
;
388 i
->with_creds
= false;
391 pa_queue_push(p
->send_queue
, i
);
397 p
->mainloop
->defer_enable(p
->defer_event
, 1);
400 void pa_pstream_send_release(pa_pstream
*p
, uint32_t block_id
) {
401 struct item_info
*item
;
403 pa_assert(PA_REFCNT_VALUE(p
) > 0);
408 /* pa_log("Releasing block %u", block_id); */
410 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
411 item
= pa_xnew(struct item_info
, 1);
412 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
413 item
->block_id
= block_id
;
415 item
->with_creds
= false;
418 pa_queue_push(p
->send_queue
, item
);
419 p
->mainloop
->defer_enable(p
->defer_event
, 1);
422 /* might be called from thread context */
423 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
424 pa_pstream
*p
= userdata
;
427 pa_assert(PA_REFCNT_VALUE(p
) > 0);
432 if (p
->release_callback
)
433 p
->release_callback(p
, block_id
, p
->release_callback_userdata
);
435 pa_pstream_send_release(p
, block_id
);
438 void pa_pstream_send_revoke(pa_pstream
*p
, uint32_t block_id
) {
439 struct item_info
*item
;
441 pa_assert(PA_REFCNT_VALUE(p
) > 0);
445 /* pa_log("Revoking block %u", block_id); */
447 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
448 item
= pa_xnew(struct item_info
, 1);
449 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
450 item
->block_id
= block_id
;
452 item
->with_creds
= false;
455 pa_queue_push(p
->send_queue
, item
);
456 p
->mainloop
->defer_enable(p
->defer_event
, 1);
459 /* might be called from thread context */
460 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
461 pa_pstream
*p
= userdata
;
464 pa_assert(PA_REFCNT_VALUE(p
) > 0);
466 if (p
->revoke_callback
)
467 p
->revoke_callback(p
, block_id
, p
->revoke_callback_userdata
);
469 pa_pstream_send_revoke(p
, block_id
);
472 static void prepare_next_write_item(pa_pstream
*p
) {
474 pa_assert(PA_REFCNT_VALUE(p
) > 0);
476 p
->write
.current
= pa_queue_pop(p
->send_queue
);
478 if (!p
->write
.current
)
481 p
->write
.data
= NULL
;
482 p
->write
.minibuf_validsize
= 0;
483 pa_memchunk_reset(&p
->write
.memchunk
);
485 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
486 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
487 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
488 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
489 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
491 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
493 pa_assert(p
->write
.current
->packet
);
494 p
->write
.data
= p
->write
.current
->packet
->data
;
495 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl((uint32_t) p
->write
.current
->packet
->length
);
497 if (p
->write
.current
->packet
->length
<= MINIBUF_SIZE
- PA_PSTREAM_DESCRIPTOR_SIZE
) {
498 memcpy(&p
->write
.minibuf
[PA_PSTREAM_DESCRIPTOR_SIZE
], p
->write
.data
, p
->write
.current
->packet
->length
);
499 p
->write
.minibuf_validsize
= PA_PSTREAM_DESCRIPTOR_SIZE
+ p
->write
.current
->packet
->length
;
502 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
504 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
505 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
507 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
509 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
510 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
514 bool send_payload
= true;
516 pa_assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
517 pa_assert(p
->write
.current
->chunk
.memblock
);
519 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
520 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
521 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
523 flags
= (uint32_t) (p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
);
526 uint32_t block_id
, shm_id
;
527 size_t offset
, length
;
528 uint32_t *shm_info
= (uint32_t *) &p
->write
.minibuf
[PA_PSTREAM_DESCRIPTOR_SIZE
];
529 size_t shm_size
= sizeof(uint32_t) * PA_PSTREAM_SHM_MAX
;
531 pa_assert(p
->export
);
533 if (pa_memexport_put(p
->export
,
534 p
->write
.current
->chunk
.memblock
,
540 flags
|= PA_FLAG_SHMDATA
;
541 send_payload
= false;
543 shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
544 shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
545 shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
546 shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
548 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(shm_size
);
549 p
->write
.minibuf_validsize
= PA_PSTREAM_DESCRIPTOR_SIZE
+ shm_size
;
552 /* pa_log_warn("Failed to export memory block."); */
556 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
557 p
->write
.memchunk
= p
->write
.current
->chunk
;
558 pa_memblock_ref(p
->write
.memchunk
.memblock
);
559 p
->write
.data
= NULL
;
562 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
566 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
567 p
->write_creds
= p
->write
.current
->creds
;
571 static int do_write(pa_pstream
*p
) {
575 pa_memblock
*release_memblock
= NULL
;
578 pa_assert(PA_REFCNT_VALUE(p
) > 0);
580 if (!p
->write
.current
)
581 prepare_next_write_item(p
);
583 if (!p
->write
.current
)
586 if (p
->write
.minibuf_validsize
> 0) {
587 d
= p
->write
.minibuf
+ p
->write
.index
;
588 l
= p
->write
.minibuf_validsize
- p
->write
.index
;
589 } else if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
590 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
591 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
593 pa_assert(p
->write
.data
|| p
->write
.memchunk
.memblock
);
598 d
= pa_memblock_acquire_chunk(&p
->write
.memchunk
);
599 release_memblock
= p
->write
.memchunk
.memblock
;
602 d
= (uint8_t*) d
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
603 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
609 if (p
->send_creds_now
) {
611 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
614 p
->send_creds_now
= false;
618 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
621 if (release_memblock
)
622 pa_memblock_release(release_memblock
);
624 p
->write
.index
+= (size_t) r
;
626 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
627 pa_assert(p
->write
.current
);
628 item_free(p
->write
.current
);
629 p
->write
.current
= NULL
;
631 if (p
->write
.memchunk
.memblock
)
632 pa_memblock_unref(p
->write
.memchunk
.memblock
);
634 pa_memchunk_reset(&p
->write
.memchunk
);
636 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
637 p
->drain_callback(p
, p
->drain_callback_userdata
);
640 return (size_t) r
== l
? 1 : 0;
644 if (release_memblock
)
645 pa_memblock_release(release_memblock
);
650 static int do_read(pa_pstream
*p
) {
654 pa_memblock
*release_memblock
= NULL
;
656 pa_assert(PA_REFCNT_VALUE(p
) > 0);
658 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
659 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
660 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
662 pa_assert(p
->read
.data
|| p
->read
.memblock
);
667 d
= pa_memblock_acquire(p
->read
.memblock
);
668 release_memblock
= p
->read
.memblock
;
671 d
= (uint8_t*) d
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
672 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
679 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
682 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
685 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
689 if (release_memblock
)
690 pa_memblock_release(release_memblock
);
692 p
->read
.index
+= (size_t) r
;
694 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
695 uint32_t flags
, length
, channel
;
696 /* Reading of frame descriptor complete */
698 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
700 if (!p
->use_shm
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
701 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
705 if (flags
== PA_FLAG_SHMRELEASE
) {
707 /* This is a SHM memblock release frame with no payload */
709 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
711 pa_assert(p
->export
);
712 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
716 } else if (flags
== PA_FLAG_SHMREVOKE
) {
718 /* This is a SHM memblock revoke frame with no payload */
720 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
722 pa_assert(p
->import
);
723 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
728 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
730 if (length
> FRAME_SIZE_MAX_ALLOW
|| length
<= 0) {
731 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length
);
735 pa_assert(!p
->read
.packet
&& !p
->read
.memblock
);
737 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
739 if (channel
== (uint32_t) -1) {
742 pa_log_warn("Received packet frame with invalid flags value.");
746 /* Frame is a packet frame */
747 p
->read
.packet
= pa_packet_new(length
);
748 p
->read
.data
= p
->read
.packet
->data
;
752 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
753 pa_log_warn("Received memblock frame with invalid seek mode.");
757 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
759 if (length
!= sizeof(p
->read
.shm_info
)) {
760 pa_log_warn("Received SHM memblock frame with invalid frame length.");
764 /* Frame is a memblock frame referencing an SHM memblock */
765 p
->read
.data
= p
->read
.shm_info
;
767 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
769 /* Frame is a memblock frame */
771 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
775 pa_log_warn("Received memblock frame with invalid flags value.");
780 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
781 /* Frame payload available */
783 if (p
->read
.memblock
&& p
->receive_memblock_callback
) {
785 /* Is this memblock data? Than pass it to the user */
786 l
= (p
->read
.index
- (size_t) r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? (size_t) (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
) : (size_t) r
;
791 chunk
.memblock
= p
->read
.memblock
;
792 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
795 if (p
->receive_memblock_callback
) {
799 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
800 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
802 p
->receive_memblock_callback(
804 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
806 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
808 p
->receive_memblock_callback_userdata
);
811 /* Drop seek info for following callbacks */
812 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
813 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
814 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
819 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
821 if (p
->read
.memblock
) {
823 /* This was a memblock frame. We can unref the memblock now */
824 pa_memblock_unref(p
->read
.memblock
);
826 } else if (p
->read
.packet
) {
828 if (p
->receive_packet_callback
)
830 p
->receive_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->receive_packet_callback_userdata
);
832 p
->receive_packet_callback(p
, p
->read
.packet
, NULL
, p
->receive_packet_callback_userdata
);
835 pa_packet_unref(p
->read
.packet
);
839 pa_assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
841 pa_assert(p
->import
);
843 if (!(b
= pa_memimport_get(p
->import
,
844 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
845 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
846 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
847 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
849 if (pa_log_ratelimit(PA_LOG_DEBUG
))
850 pa_log_debug("Failed to import memory block.");
853 if (p
->receive_memblock_callback
) {
859 chunk
.length
= b
? pa_memblock_get_length(b
) : ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
]);
862 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
863 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
865 p
->receive_memblock_callback(
867 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
869 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
871 p
->receive_memblock_callback_userdata
);
875 pa_memblock_unref(b
);
885 p
->read
.memblock
= NULL
;
886 p
->read
.packet
= NULL
;
891 p
->read_creds_valid
= false;
897 if (release_memblock
)
898 pa_memblock_release(release_memblock
);
903 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
905 pa_assert(PA_REFCNT_VALUE(p
) > 0);
907 p
->die_callback
= cb
;
908 p
->die_callback_userdata
= userdata
;
911 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
913 pa_assert(PA_REFCNT_VALUE(p
) > 0);
915 p
->drain_callback
= cb
;
916 p
->drain_callback_userdata
= userdata
;
919 void pa_pstream_set_receive_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
921 pa_assert(PA_REFCNT_VALUE(p
) > 0);
923 p
->receive_packet_callback
= cb
;
924 p
->receive_packet_callback_userdata
= userdata
;
927 void pa_pstream_set_receive_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
929 pa_assert(PA_REFCNT_VALUE(p
) > 0);
931 p
->receive_memblock_callback
= cb
;
932 p
->receive_memblock_callback_userdata
= userdata
;
935 void pa_pstream_set_release_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
937 pa_assert(PA_REFCNT_VALUE(p
) > 0);
939 p
->release_callback
= cb
;
940 p
->release_callback_userdata
= userdata
;
943 void pa_pstream_set_revoke_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
945 pa_assert(PA_REFCNT_VALUE(p
) > 0);
947 p
->release_callback
= cb
;
948 p
->release_callback_userdata
= userdata
;
951 bool pa_pstream_is_pending(pa_pstream
*p
) {
955 pa_assert(PA_REFCNT_VALUE(p
) > 0);
960 b
= p
->write
.current
|| !pa_queue_isempty(p
->send_queue
);
965 void pa_pstream_unref(pa_pstream
*p
) {
967 pa_assert(PA_REFCNT_VALUE(p
) > 0);
969 if (PA_REFCNT_DEC(p
) <= 0)
973 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
975 pa_assert(PA_REFCNT_VALUE(p
) > 0);
981 void pa_pstream_unlink(pa_pstream
*p
) {
990 pa_memimport_free(p
->import
);
995 pa_memexport_free(p
->export
);
1000 pa_iochannel_free(p
->io
);
1004 if (p
->defer_event
) {
1005 p
->mainloop
->defer_free(p
->defer_event
);
1006 p
->defer_event
= NULL
;
1009 p
->die_callback
= NULL
;
1010 p
->drain_callback
= NULL
;
1011 p
->receive_packet_callback
= NULL
;
1012 p
->receive_memblock_callback
= NULL
;
1015 void pa_pstream_enable_shm(pa_pstream
*p
, bool enable
) {
1017 pa_assert(PA_REFCNT_VALUE(p
) > 0);
1019 p
->use_shm
= enable
;
1024 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);
1029 pa_memexport_free(p
->export
);
1035 bool pa_pstream_get_shm(pa_pstream
*p
) {
1037 pa_assert(PA_REFCNT_VALUE(p
) > 0);