2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #ifdef HAVE_NETINET_IN_H
32 #include <netinet/in.h>
35 #include <pulse/xmalloc.h>
37 #include <pulsecore/socket.h>
38 #include <pulsecore/queue.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/creds.h>
41 #include <pulsecore/refcnt.h>
42 #include <pulsecore/flist.h>
43 #include <pulsecore/macro.h>
47 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
48 #define PA_FLAG_SHMDATA 0x80000000LU
49 #define PA_FLAG_SHMRELEASE 0x40000000LU
50 #define PA_FLAG_SHMREVOKE 0xC0000000LU
51 #define PA_FLAG_SHMMASK 0xFF000000LU
52 #define PA_FLAG_SEEKMASK 0x000000FFLU
54 /* The sequence descriptor header consists of 5 32bit integers: */
56 PA_PSTREAM_DESCRIPTOR_LENGTH
,
57 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
58 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
59 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
60 PA_PSTREAM_DESCRIPTOR_FLAGS
,
61 PA_PSTREAM_DESCRIPTOR_MAX
64 /* If we have an SHM block, this info follows the descriptor */
66 PA_PSTREAM_SHM_BLOCKID
,
69 PA_PSTREAM_SHM_LENGTH
,
73 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
75 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77 #define MINIBUF_SIZE (256)
79 /* To allow uploading a single sample in one frame, this value should be the
80 * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
82 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
84 PA_STATIC_FLIST_DECLARE(items
, 0, pa_xfree
);
88 PA_PSTREAM_ITEM_PACKET
,
89 PA_PSTREAM_ITEM_MEMBLOCK
,
90 PA_PSTREAM_ITEM_SHMRELEASE
,
91 PA_PSTREAM_ITEM_SHMREVOKE
105 pa_seek_mode_t seek_mode
;
107 /* release/revoke info */
114 pa_mainloop_api
*mainloop
;
115 pa_defer_event
*defer_event
;
118 pa_queue
*send_queue
;
124 uint8_t minibuf
[MINIBUF_SIZE
];
125 pa_pstream_descriptor descriptor
;
127 struct item_info
* current
;
130 int minibuf_validsize
;
131 pa_memchunk memchunk
;
135 pa_pstream_descriptor descriptor
;
136 pa_memblock
*memblock
;
138 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
144 pa_memimport
*import
;
145 pa_memexport
*export
;
147 pa_pstream_packet_cb_t receive_packet_callback
;
148 void *receive_packet_callback_userdata
;
150 pa_pstream_memblock_cb_t receive_memblock_callback
;
151 void *receive_memblock_callback_userdata
;
153 pa_pstream_notify_cb_t drain_callback
;
154 void *drain_callback_userdata
;
156 pa_pstream_notify_cb_t die_callback
;
157 void *die_callback_userdata
;
159 pa_pstream_block_id_cb_t revoke_callback
;
160 void *revoke_callback_userdata
;
162 pa_pstream_block_id_cb_t release_callback
;
163 void *release_callback_userdata
;
168 pa_creds read_creds
, write_creds
;
169 bool read_creds_valid
, send_creds_now
;
173 static int do_write(pa_pstream
*p
);
174 static int do_read(pa_pstream
*p
);
176 static void do_pstream_read_write(pa_pstream
*p
) {
178 pa_assert(PA_REFCNT_VALUE(p
) > 0);
182 p
->mainloop
->defer_enable(p
->defer_event
, 0);
184 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
187 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
190 while (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
204 p
->die_callback(p
, p
->die_callback_userdata
);
206 pa_pstream_unlink(p
);
210 static void io_callback(pa_iochannel
*io
, void *userdata
) {
211 pa_pstream
*p
= userdata
;
214 pa_assert(PA_REFCNT_VALUE(p
) > 0);
215 pa_assert(p
->io
== io
);
217 do_pstream_read_write(p
);
220 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
221 pa_pstream
*p
= userdata
;
224 pa_assert(PA_REFCNT_VALUE(p
) > 0);
225 pa_assert(p
->defer_event
== e
);
226 pa_assert(p
->mainloop
== m
);
228 do_pstream_read_write(p
);
231 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
233 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
240 p
= pa_xnew0(pa_pstream
, 1);
243 pa_iochannel_set_callback(io
, io_callback
, p
);
246 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
247 m
->defer_enable(p
->defer_event
, 0);
249 p
->send_queue
= pa_queue_new();
253 /* We do importing unconditionally */
254 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
256 pa_iochannel_socket_set_rcvbuf(io
, pa_mempool_block_size_max(p
->mempool
));
257 pa_iochannel_socket_set_sndbuf(io
, pa_mempool_block_size_max(p
->mempool
));
262 static void item_free(void *item
) {
263 struct item_info
*i
= item
;
266 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
267 pa_assert(i
->chunk
.memblock
);
268 pa_memblock_unref(i
->chunk
.memblock
);
269 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
270 pa_assert(i
->packet
);
271 pa_packet_unref(i
->packet
);
274 if (pa_flist_push(PA_STATIC_FLIST_GET(items
), i
) < 0)
278 static void pstream_free(pa_pstream
*p
) {
281 pa_pstream_unlink(p
);
283 pa_queue_free(p
->send_queue
, item_free
);
285 if (p
->write
.current
)
286 item_free(p
->write
.current
);
288 if (p
->write
.memchunk
.memblock
)
289 pa_memblock_unref(p
->write
.memchunk
.memblock
);
291 if (p
->read
.memblock
)
292 pa_memblock_unref(p
->read
.memblock
);
295 pa_packet_unref(p
->read
.packet
);
300 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
304 pa_assert(PA_REFCNT_VALUE(p
) > 0);
310 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
311 i
= pa_xnew(struct item_info
, 1);
313 i
->type
= PA_PSTREAM_ITEM_PACKET
;
314 i
->packet
= pa_packet_ref(packet
);
317 if ((i
->with_creds
= !!creds
))
321 pa_queue_push(p
->send_queue
, i
);
323 p
->mainloop
->defer_enable(p
->defer_event
, 1);
326 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
331 pa_assert(PA_REFCNT_VALUE(p
) > 0);
332 pa_assert(channel
!= (uint32_t) -1);
339 length
= chunk
->length
;
341 bsm
= pa_mempool_block_size_max(p
->mempool
);
347 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
348 i
= pa_xnew(struct item_info
, 1);
349 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
351 n
= PA_MIN(length
, bsm
);
352 i
->chunk
.index
= chunk
->index
+ idx
;
354 i
->chunk
.memblock
= pa_memblock_ref(chunk
->memblock
);
356 i
->channel
= channel
;
358 i
->seek_mode
= seek_mode
;
360 i
->with_creds
= false;
363 pa_queue_push(p
->send_queue
, i
);
369 p
->mainloop
->defer_enable(p
->defer_event
, 1);
372 void pa_pstream_send_release(pa_pstream
*p
, uint32_t block_id
) {
373 struct item_info
*item
;
375 pa_assert(PA_REFCNT_VALUE(p
) > 0);
380 /* pa_log("Releasing block %u", block_id); */
382 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
383 item
= pa_xnew(struct item_info
, 1);
384 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
385 item
->block_id
= block_id
;
387 item
->with_creds
= false;
390 pa_queue_push(p
->send_queue
, item
);
391 p
->mainloop
->defer_enable(p
->defer_event
, 1);
394 /* might be called from thread context */
395 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
396 pa_pstream
*p
= userdata
;
399 pa_assert(PA_REFCNT_VALUE(p
) > 0);
404 if (p
->release_callback
)
405 p
->release_callback(p
, block_id
, p
->release_callback_userdata
);
407 pa_pstream_send_release(p
, block_id
);
410 void pa_pstream_send_revoke(pa_pstream
*p
, uint32_t block_id
) {
411 struct item_info
*item
;
413 pa_assert(PA_REFCNT_VALUE(p
) > 0);
417 /* pa_log("Revoking block %u", block_id); */
419 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
420 item
= pa_xnew(struct item_info
, 1);
421 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
422 item
->block_id
= block_id
;
424 item
->with_creds
= false;
427 pa_queue_push(p
->send_queue
, item
);
428 p
->mainloop
->defer_enable(p
->defer_event
, 1);
431 /* might be called from thread context */
432 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
433 pa_pstream
*p
= userdata
;
436 pa_assert(PA_REFCNT_VALUE(p
) > 0);
438 if (p
->revoke_callback
)
439 p
->revoke_callback(p
, block_id
, p
->revoke_callback_userdata
);
441 pa_pstream_send_revoke(p
, block_id
);
444 static void prepare_next_write_item(pa_pstream
*p
) {
446 pa_assert(PA_REFCNT_VALUE(p
) > 0);
448 p
->write
.current
= pa_queue_pop(p
->send_queue
);
450 if (!p
->write
.current
)
453 p
->write
.data
= NULL
;
454 p
->write
.minibuf_validsize
= 0;
455 pa_memchunk_reset(&p
->write
.memchunk
);
457 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
458 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
459 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
460 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
461 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
463 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
465 pa_assert(p
->write
.current
->packet
);
466 p
->write
.data
= p
->write
.current
->packet
->data
;
467 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl((uint32_t) p
->write
.current
->packet
->length
);
469 if (p
->write
.current
->packet
->length
<= MINIBUF_SIZE
- PA_PSTREAM_DESCRIPTOR_SIZE
) {
470 memcpy(&p
->write
.minibuf
[PA_PSTREAM_DESCRIPTOR_SIZE
], p
->write
.data
, p
->write
.current
->packet
->length
);
471 p
->write
.minibuf_validsize
= PA_PSTREAM_DESCRIPTOR_SIZE
+ p
->write
.current
->packet
->length
;
474 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
476 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
477 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
479 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
481 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
482 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
486 bool send_payload
= true;
488 pa_assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
489 pa_assert(p
->write
.current
->chunk
.memblock
);
491 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
492 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
493 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
495 flags
= (uint32_t) (p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
);
498 uint32_t block_id
, shm_id
;
499 size_t offset
, length
;
500 uint32_t *shm_info
= (uint32_t *) &p
->write
.minibuf
[PA_PSTREAM_DESCRIPTOR_SIZE
];
501 size_t shm_size
= sizeof(uint32_t) * PA_PSTREAM_SHM_MAX
;
503 pa_assert(p
->export
);
505 if (pa_memexport_put(p
->export
,
506 p
->write
.current
->chunk
.memblock
,
512 flags
|= PA_FLAG_SHMDATA
;
513 send_payload
= false;
515 shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
516 shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
517 shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
518 shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
520 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(shm_size
);
521 p
->write
.minibuf_validsize
= PA_PSTREAM_DESCRIPTOR_SIZE
+ shm_size
;
524 /* pa_log_warn("Failed to export memory block."); */
528 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
529 p
->write
.memchunk
= p
->write
.current
->chunk
;
530 pa_memblock_ref(p
->write
.memchunk
.memblock
);
531 p
->write
.data
= NULL
;
534 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
538 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
539 p
->write_creds
= p
->write
.current
->creds
;
543 static int do_write(pa_pstream
*p
) {
547 pa_memblock
*release_memblock
= NULL
;
550 pa_assert(PA_REFCNT_VALUE(p
) > 0);
552 if (!p
->write
.current
)
553 prepare_next_write_item(p
);
555 if (!p
->write
.current
)
558 if (p
->write
.minibuf_validsize
> 0) {
559 d
= p
->write
.minibuf
+ p
->write
.index
;
560 l
= p
->write
.minibuf_validsize
- p
->write
.index
;
561 } else if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
562 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
563 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
565 pa_assert(p
->write
.data
|| p
->write
.memchunk
.memblock
);
570 d
= pa_memblock_acquire_chunk(&p
->write
.memchunk
);
571 release_memblock
= p
->write
.memchunk
.memblock
;
574 d
= (uint8_t*) d
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
575 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
581 if (p
->send_creds_now
) {
583 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
586 p
->send_creds_now
= false;
590 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
593 if (release_memblock
)
594 pa_memblock_release(release_memblock
);
596 p
->write
.index
+= (size_t) r
;
598 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
599 pa_assert(p
->write
.current
);
600 item_free(p
->write
.current
);
601 p
->write
.current
= NULL
;
603 if (p
->write
.memchunk
.memblock
)
604 pa_memblock_unref(p
->write
.memchunk
.memblock
);
606 pa_memchunk_reset(&p
->write
.memchunk
);
608 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
609 p
->drain_callback(p
, p
->drain_callback_userdata
);
612 return (size_t) r
== l
? 1 : 0;
616 if (release_memblock
)
617 pa_memblock_release(release_memblock
);
622 static int do_read(pa_pstream
*p
) {
626 pa_memblock
*release_memblock
= NULL
;
628 pa_assert(PA_REFCNT_VALUE(p
) > 0);
630 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
631 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
632 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
634 pa_assert(p
->read
.data
|| p
->read
.memblock
);
639 d
= pa_memblock_acquire(p
->read
.memblock
);
640 release_memblock
= p
->read
.memblock
;
643 d
= (uint8_t*) d
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
644 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
651 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
654 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
657 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
661 if (release_memblock
)
662 pa_memblock_release(release_memblock
);
664 p
->read
.index
+= (size_t) r
;
666 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
667 uint32_t flags
, length
, channel
;
668 /* Reading of frame descriptor complete */
670 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
672 if (!p
->use_shm
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
673 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
677 if (flags
== PA_FLAG_SHMRELEASE
) {
679 /* This is a SHM memblock release frame with no payload */
681 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
683 pa_assert(p
->export
);
684 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
688 } else if (flags
== PA_FLAG_SHMREVOKE
) {
690 /* This is a SHM memblock revoke frame with no payload */
692 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
694 pa_assert(p
->import
);
695 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
700 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
702 if (length
> FRAME_SIZE_MAX_ALLOW
|| length
<= 0) {
703 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length
);
707 pa_assert(!p
->read
.packet
&& !p
->read
.memblock
);
709 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
711 if (channel
== (uint32_t) -1) {
714 pa_log_warn("Received packet frame with invalid flags value.");
718 /* Frame is a packet frame */
719 p
->read
.packet
= pa_packet_new(length
);
720 p
->read
.data
= p
->read
.packet
->data
;
724 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
725 pa_log_warn("Received memblock frame with invalid seek mode.");
729 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
731 if (length
!= sizeof(p
->read
.shm_info
)) {
732 pa_log_warn("Received SHM memblock frame with invalid frame length.");
736 /* Frame is a memblock frame referencing an SHM memblock */
737 p
->read
.data
= p
->read
.shm_info
;
739 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
741 /* Frame is a memblock frame */
743 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
747 pa_log_warn("Received memblock frame with invalid flags value.");
752 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
753 /* Frame payload available */
755 if (p
->read
.memblock
&& p
->receive_memblock_callback
) {
757 /* Is this memblock data? Than pass it to the user */
758 l
= (p
->read
.index
- (size_t) r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? (size_t) (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
) : (size_t) r
;
763 chunk
.memblock
= p
->read
.memblock
;
764 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
767 if (p
->receive_memblock_callback
) {
771 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
772 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
774 p
->receive_memblock_callback(
776 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
778 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
780 p
->receive_memblock_callback_userdata
);
783 /* Drop seek info for following callbacks */
784 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
785 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
786 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
791 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
793 if (p
->read
.memblock
) {
795 /* This was a memblock frame. We can unref the memblock now */
796 pa_memblock_unref(p
->read
.memblock
);
798 } else if (p
->read
.packet
) {
800 if (p
->receive_packet_callback
)
802 p
->receive_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->receive_packet_callback_userdata
);
804 p
->receive_packet_callback(p
, p
->read
.packet
, NULL
, p
->receive_packet_callback_userdata
);
807 pa_packet_unref(p
->read
.packet
);
811 pa_assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
813 pa_assert(p
->import
);
815 if (!(b
= pa_memimport_get(p
->import
,
816 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
817 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
818 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
819 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
821 if (pa_log_ratelimit(PA_LOG_DEBUG
))
822 pa_log_debug("Failed to import memory block.");
825 if (p
->receive_memblock_callback
) {
831 chunk
.length
= b
? pa_memblock_get_length(b
) : ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
]);
834 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
835 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
837 p
->receive_memblock_callback(
839 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
841 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
843 p
->receive_memblock_callback_userdata
);
847 pa_memblock_unref(b
);
857 p
->read
.memblock
= NULL
;
858 p
->read
.packet
= NULL
;
863 p
->read_creds_valid
= false;
869 if (release_memblock
)
870 pa_memblock_release(release_memblock
);
875 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
877 pa_assert(PA_REFCNT_VALUE(p
) > 0);
879 p
->die_callback
= cb
;
880 p
->die_callback_userdata
= userdata
;
883 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
885 pa_assert(PA_REFCNT_VALUE(p
) > 0);
887 p
->drain_callback
= cb
;
888 p
->drain_callback_userdata
= userdata
;
891 void pa_pstream_set_receive_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
893 pa_assert(PA_REFCNT_VALUE(p
) > 0);
895 p
->receive_packet_callback
= cb
;
896 p
->receive_packet_callback_userdata
= userdata
;
899 void pa_pstream_set_receive_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
901 pa_assert(PA_REFCNT_VALUE(p
) > 0);
903 p
->receive_memblock_callback
= cb
;
904 p
->receive_memblock_callback_userdata
= userdata
;
907 void pa_pstream_set_release_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
909 pa_assert(PA_REFCNT_VALUE(p
) > 0);
911 p
->release_callback
= cb
;
912 p
->release_callback_userdata
= userdata
;
915 void pa_pstream_set_revoke_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
917 pa_assert(PA_REFCNT_VALUE(p
) > 0);
919 p
->release_callback
= cb
;
920 p
->release_callback_userdata
= userdata
;
923 bool pa_pstream_is_pending(pa_pstream
*p
) {
927 pa_assert(PA_REFCNT_VALUE(p
) > 0);
932 b
= p
->write
.current
|| !pa_queue_isempty(p
->send_queue
);
937 void pa_pstream_unref(pa_pstream
*p
) {
939 pa_assert(PA_REFCNT_VALUE(p
) > 0);
941 if (PA_REFCNT_DEC(p
) <= 0)
945 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
947 pa_assert(PA_REFCNT_VALUE(p
) > 0);
953 void pa_pstream_unlink(pa_pstream
*p
) {
962 pa_memimport_free(p
->import
);
967 pa_memexport_free(p
->export
);
972 pa_iochannel_free(p
->io
);
976 if (p
->defer_event
) {
977 p
->mainloop
->defer_free(p
->defer_event
);
978 p
->defer_event
= NULL
;
981 p
->die_callback
= NULL
;
982 p
->drain_callback
= NULL
;
983 p
->receive_packet_callback
= NULL
;
984 p
->receive_memblock_callback
= NULL
;
987 void pa_pstream_enable_shm(pa_pstream
*p
, bool enable
) {
989 pa_assert(PA_REFCNT_VALUE(p
) > 0);
996 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);
1001 pa_memexport_free(p
->export
);
1007 bool pa_pstream_get_shm(pa_pstream
*p
) {
1009 pa_assert(PA_REFCNT_VALUE(p
) > 0);