2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #ifdef HAVE_NETINET_IN_H
32 #include <netinet/in.h>
35 #include <pulse/xmalloc.h>
37 #include <pulsecore/socket.h>
38 #include <pulsecore/queue.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/creds.h>
41 #include <pulsecore/refcnt.h>
42 #include <pulsecore/flist.h>
43 #include <pulsecore/macro.h>
47 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
48 #define PA_FLAG_SHMDATA 0x80000000LU
49 #define PA_FLAG_SHMRELEASE 0x40000000LU
50 #define PA_FLAG_SHMREVOKE 0xC0000000LU
51 #define PA_FLAG_SHMMASK 0xFF000000LU
52 #define PA_FLAG_SEEKMASK 0x000000FFLU
54 /* The sequence descriptor header consists of 5 32bit integers: */
56 PA_PSTREAM_DESCRIPTOR_LENGTH
,
57 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
58 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
59 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
60 PA_PSTREAM_DESCRIPTOR_FLAGS
,
61 PA_PSTREAM_DESCRIPTOR_MAX
64 /* If we have an SHM block, this info follows the descriptor */
66 PA_PSTREAM_SHM_BLOCKID
,
69 PA_PSTREAM_SHM_LENGTH
,
73 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
75 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77 #define MINIBUF_SIZE (256)
79 /* To allow uploading a single sample in one frame, this value should be the
80 * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
82 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
84 PA_STATIC_FLIST_DECLARE(items
, 0, pa_xfree
);
88 PA_PSTREAM_ITEM_PACKET
,
89 PA_PSTREAM_ITEM_MEMBLOCK
,
90 PA_PSTREAM_ITEM_SHMRELEASE
,
91 PA_PSTREAM_ITEM_SHMREVOKE
105 pa_seek_mode_t seek_mode
;
107 /* release/revoke info */
114 pa_mainloop_api
*mainloop
;
115 pa_defer_event
*defer_event
;
118 pa_queue
*send_queue
;
124 uint8_t minibuf
[MINIBUF_SIZE
];
125 pa_pstream_descriptor descriptor
;
127 struct item_info
* current
;
130 int minibuf_validsize
;
131 pa_memchunk memchunk
;
135 pa_pstream_descriptor descriptor
;
136 pa_memblock
*memblock
;
138 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
144 pa_memimport
*import
;
145 pa_memexport
*export
;
147 pa_pstream_packet_cb_t receive_packet_callback
;
148 void *receive_packet_callback_userdata
;
150 pa_pstream_memblock_cb_t receive_memblock_callback
;
151 void *receive_memblock_callback_userdata
;
153 pa_pstream_notify_cb_t drain_callback
;
154 void *drain_callback_userdata
;
156 pa_pstream_notify_cb_t die_callback
;
157 void *die_callback_userdata
;
159 pa_pstream_block_id_cb_t revoke_callback
;
160 void *revoke_callback_userdata
;
162 pa_pstream_block_id_cb_t release_callback
;
163 void *release_callback_userdata
;
168 pa_creds read_creds
, write_creds
;
169 pa_bool_t read_creds_valid
, send_creds_now
;
173 static int do_write(pa_pstream
*p
);
174 static int do_read(pa_pstream
*p
);
176 static void do_pstream_read_write(pa_pstream
*p
) {
178 pa_assert(PA_REFCNT_VALUE(p
) > 0);
182 p
->mainloop
->defer_enable(p
->defer_event
, 0);
184 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
187 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
190 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
201 p
->die_callback(p
, p
->die_callback_userdata
);
203 pa_pstream_unlink(p
);
207 static void io_callback(pa_iochannel
*io
, void *userdata
) {
208 pa_pstream
*p
= userdata
;
211 pa_assert(PA_REFCNT_VALUE(p
) > 0);
212 pa_assert(p
->io
== io
);
214 do_pstream_read_write(p
);
217 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
218 pa_pstream
*p
= userdata
;
221 pa_assert(PA_REFCNT_VALUE(p
) > 0);
222 pa_assert(p
->defer_event
== e
);
223 pa_assert(p
->mainloop
== m
);
225 do_pstream_read_write(p
);
228 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
230 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
237 p
= pa_xnew(pa_pstream
, 1);
240 pa_iochannel_set_callback(io
, io_callback
, p
);
244 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
245 m
->defer_enable(p
->defer_event
, 0);
247 p
->send_queue
= pa_queue_new();
249 p
->write
.current
= NULL
;
251 pa_memchunk_reset(&p
->write
.memchunk
);
252 p
->read
.memblock
= NULL
;
253 p
->read
.packet
= NULL
;
256 p
->receive_packet_callback
= NULL
;
257 p
->receive_packet_callback_userdata
= NULL
;
258 p
->receive_memblock_callback
= NULL
;
259 p
->receive_memblock_callback_userdata
= NULL
;
260 p
->drain_callback
= NULL
;
261 p
->drain_callback_userdata
= NULL
;
262 p
->die_callback
= NULL
;
263 p
->die_callback_userdata
= NULL
;
264 p
->revoke_callback
= NULL
;
265 p
->revoke_callback_userdata
= NULL
;
266 p
->release_callback
= NULL
;
267 p
->release_callback_userdata
= NULL
;
274 /* We do importing unconditionally */
275 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
277 pa_iochannel_socket_set_rcvbuf(io
, pa_mempool_block_size_max(p
->mempool
));
278 pa_iochannel_socket_set_sndbuf(io
, pa_mempool_block_size_max(p
->mempool
));
281 p
->send_creds_now
= FALSE
;
282 p
->read_creds_valid
= FALSE
;
287 static void item_free(void *item
) {
288 struct item_info
*i
= item
;
291 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
292 pa_assert(i
->chunk
.memblock
);
293 pa_memblock_unref(i
->chunk
.memblock
);
294 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
295 pa_assert(i
->packet
);
296 pa_packet_unref(i
->packet
);
299 if (pa_flist_push(PA_STATIC_FLIST_GET(items
), i
) < 0)
303 static void pstream_free(pa_pstream
*p
) {
306 pa_pstream_unlink(p
);
308 pa_queue_free(p
->send_queue
, item_free
);
310 if (p
->write
.current
)
311 item_free(p
->write
.current
);
313 if (p
->write
.memchunk
.memblock
)
314 pa_memblock_unref(p
->write
.memchunk
.memblock
);
316 if (p
->read
.memblock
)
317 pa_memblock_unref(p
->read
.memblock
);
320 pa_packet_unref(p
->read
.packet
);
325 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
329 pa_assert(PA_REFCNT_VALUE(p
) > 0);
335 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
336 i
= pa_xnew(struct item_info
, 1);
338 i
->type
= PA_PSTREAM_ITEM_PACKET
;
339 i
->packet
= pa_packet_ref(packet
);
342 if ((i
->with_creds
= !!creds
))
346 pa_queue_push(p
->send_queue
, i
);
348 p
->mainloop
->defer_enable(p
->defer_event
, 1);
351 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
356 pa_assert(PA_REFCNT_VALUE(p
) > 0);
357 pa_assert(channel
!= (uint32_t) -1);
364 length
= chunk
->length
;
366 bsm
= pa_mempool_block_size_max(p
->mempool
);
372 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
373 i
= pa_xnew(struct item_info
, 1);
374 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
376 n
= PA_MIN(length
, bsm
);
377 i
->chunk
.index
= chunk
->index
+ idx
;
379 i
->chunk
.memblock
= pa_memblock_ref(chunk
->memblock
);
381 i
->channel
= channel
;
383 i
->seek_mode
= seek_mode
;
385 i
->with_creds
= FALSE
;
388 pa_queue_push(p
->send_queue
, i
);
394 p
->mainloop
->defer_enable(p
->defer_event
, 1);
397 void pa_pstream_send_release(pa_pstream
*p
, uint32_t block_id
) {
398 struct item_info
*item
;
400 pa_assert(PA_REFCNT_VALUE(p
) > 0);
405 /* pa_log("Releasing block %u", block_id); */
407 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
408 item
= pa_xnew(struct item_info
, 1);
409 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
410 item
->block_id
= block_id
;
412 item
->with_creds
= FALSE
;
415 pa_queue_push(p
->send_queue
, item
);
416 p
->mainloop
->defer_enable(p
->defer_event
, 1);
419 /* might be called from thread context */
420 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
421 pa_pstream
*p
= userdata
;
424 pa_assert(PA_REFCNT_VALUE(p
) > 0);
429 if (p
->release_callback
)
430 p
->release_callback(p
, block_id
, p
->release_callback_userdata
);
432 pa_pstream_send_release(p
, block_id
);
435 void pa_pstream_send_revoke(pa_pstream
*p
, uint32_t block_id
) {
436 struct item_info
*item
;
438 pa_assert(PA_REFCNT_VALUE(p
) > 0);
442 /* pa_log("Revoking block %u", block_id); */
444 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
445 item
= pa_xnew(struct item_info
, 1);
446 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
447 item
->block_id
= block_id
;
449 item
->with_creds
= FALSE
;
452 pa_queue_push(p
->send_queue
, item
);
453 p
->mainloop
->defer_enable(p
->defer_event
, 1);
456 /* might be called from thread context */
457 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
458 pa_pstream
*p
= userdata
;
461 pa_assert(PA_REFCNT_VALUE(p
) > 0);
463 if (p
->revoke_callback
)
464 p
->revoke_callback(p
, block_id
, p
->revoke_callback_userdata
);
466 pa_pstream_send_revoke(p
, block_id
);
469 static void prepare_next_write_item(pa_pstream
*p
) {
471 pa_assert(PA_REFCNT_VALUE(p
) > 0);
473 p
->write
.current
= pa_queue_pop(p
->send_queue
);
475 if (!p
->write
.current
)
478 p
->write
.data
= NULL
;
479 p
->write
.minibuf_validsize
= 0;
480 pa_memchunk_reset(&p
->write
.memchunk
);
482 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
483 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
484 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
485 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
486 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
488 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
490 pa_assert(p
->write
.current
->packet
);
491 p
->write
.data
= p
->write
.current
->packet
->data
;
492 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl((uint32_t) p
->write
.current
->packet
->length
);
494 if (p
->write
.current
->packet
->length
<= MINIBUF_SIZE
- PA_PSTREAM_DESCRIPTOR_SIZE
) {
495 memcpy(&p
->write
.minibuf
[PA_PSTREAM_DESCRIPTOR_SIZE
], p
->write
.data
, p
->write
.current
->packet
->length
);
496 p
->write
.minibuf_validsize
= PA_PSTREAM_DESCRIPTOR_SIZE
+ p
->write
.current
->packet
->length
;
499 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
501 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
502 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
504 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
506 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
507 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
511 pa_bool_t send_payload
= TRUE
;
513 pa_assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
514 pa_assert(p
->write
.current
->chunk
.memblock
);
516 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
517 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
518 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
520 flags
= (uint32_t) (p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
);
523 uint32_t block_id
, shm_id
;
524 size_t offset
, length
;
525 uint32_t *shm_info
= (uint32_t *) &p
->write
.minibuf
[PA_PSTREAM_DESCRIPTOR_SIZE
];
526 size_t shm_size
= sizeof(uint32_t) * PA_PSTREAM_SHM_MAX
;
528 pa_assert(p
->export
);
530 if (pa_memexport_put(p
->export
,
531 p
->write
.current
->chunk
.memblock
,
537 flags
|= PA_FLAG_SHMDATA
;
538 send_payload
= FALSE
;
540 shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
541 shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
542 shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
543 shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
545 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(shm_size
);
546 p
->write
.minibuf_validsize
= PA_PSTREAM_DESCRIPTOR_SIZE
+ shm_size
;
549 /* pa_log_warn("Failed to export memory block."); */
553 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
554 p
->write
.memchunk
= p
->write
.current
->chunk
;
555 pa_memblock_ref(p
->write
.memchunk
.memblock
);
556 p
->write
.data
= NULL
;
559 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
563 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
564 p
->write_creds
= p
->write
.current
->creds
;
568 static int do_write(pa_pstream
*p
) {
572 pa_memblock
*release_memblock
= NULL
;
575 pa_assert(PA_REFCNT_VALUE(p
) > 0);
577 if (!p
->write
.current
)
578 prepare_next_write_item(p
);
580 if (!p
->write
.current
)
583 if (p
->write
.minibuf_validsize
> 0) {
584 d
= p
->write
.minibuf
+ p
->write
.index
;
585 l
= p
->write
.minibuf_validsize
- p
->write
.index
;
586 } else if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
587 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
588 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
590 pa_assert(p
->write
.data
|| p
->write
.memchunk
.memblock
);
595 d
= pa_memblock_acquire_chunk(&p
->write
.memchunk
);
596 release_memblock
= p
->write
.memchunk
.memblock
;
599 d
= (uint8_t*) d
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
600 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
606 if (p
->send_creds_now
) {
608 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
611 p
->send_creds_now
= FALSE
;
615 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
618 if (release_memblock
)
619 pa_memblock_release(release_memblock
);
621 p
->write
.index
+= (size_t) r
;
623 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
624 pa_assert(p
->write
.current
);
625 item_free(p
->write
.current
);
626 p
->write
.current
= NULL
;
628 if (p
->write
.memchunk
.memblock
)
629 pa_memblock_unref(p
->write
.memchunk
.memblock
);
631 pa_memchunk_reset(&p
->write
.memchunk
);
633 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
634 p
->drain_callback(p
, p
->drain_callback_userdata
);
641 if (release_memblock
)
642 pa_memblock_release(release_memblock
);
647 static int do_read(pa_pstream
*p
) {
651 pa_memblock
*release_memblock
= NULL
;
653 pa_assert(PA_REFCNT_VALUE(p
) > 0);
655 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
656 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
657 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
659 pa_assert(p
->read
.data
|| p
->read
.memblock
);
664 d
= pa_memblock_acquire(p
->read
.memblock
);
665 release_memblock
= p
->read
.memblock
;
668 d
= (uint8_t*) d
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
669 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
676 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
679 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
682 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
686 if (release_memblock
)
687 pa_memblock_release(release_memblock
);
689 p
->read
.index
+= (size_t) r
;
691 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
692 uint32_t flags
, length
, channel
;
693 /* Reading of frame descriptor complete */
695 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
697 if (!p
->use_shm
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
698 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
702 if (flags
== PA_FLAG_SHMRELEASE
) {
704 /* This is a SHM memblock release frame with no payload */
706 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
708 pa_assert(p
->export
);
709 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
713 } else if (flags
== PA_FLAG_SHMREVOKE
) {
715 /* This is a SHM memblock revoke frame with no payload */
717 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
719 pa_assert(p
->import
);
720 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
725 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
727 if (length
> FRAME_SIZE_MAX_ALLOW
|| length
<= 0) {
728 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length
);
732 pa_assert(!p
->read
.packet
&& !p
->read
.memblock
);
734 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
736 if (channel
== (uint32_t) -1) {
739 pa_log_warn("Received packet frame with invalid flags value.");
743 /* Frame is a packet frame */
744 p
->read
.packet
= pa_packet_new(length
);
745 p
->read
.data
= p
->read
.packet
->data
;
749 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
750 pa_log_warn("Received memblock frame with invalid seek mode.");
754 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
756 if (length
!= sizeof(p
->read
.shm_info
)) {
757 pa_log_warn("Received SHM memblock frame with invalid frame length.");
761 /* Frame is a memblock frame referencing an SHM memblock */
762 p
->read
.data
= p
->read
.shm_info
;
764 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
766 /* Frame is a memblock frame */
768 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
772 pa_log_warn("Received memblock frame with invalid flags value.");
777 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
778 /* Frame payload available */
780 if (p
->read
.memblock
&& p
->receive_memblock_callback
) {
782 /* Is this memblock data? Than pass it to the user */
783 l
= (p
->read
.index
- (size_t) r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? (size_t) (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
) : (size_t) r
;
788 chunk
.memblock
= p
->read
.memblock
;
789 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
792 if (p
->receive_memblock_callback
) {
796 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
797 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
799 p
->receive_memblock_callback(
801 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
803 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
805 p
->receive_memblock_callback_userdata
);
808 /* Drop seek info for following callbacks */
809 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
810 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
811 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
816 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
818 if (p
->read
.memblock
) {
820 /* This was a memblock frame. We can unref the memblock now */
821 pa_memblock_unref(p
->read
.memblock
);
823 } else if (p
->read
.packet
) {
825 if (p
->receive_packet_callback
)
827 p
->receive_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->receive_packet_callback_userdata
);
829 p
->receive_packet_callback(p
, p
->read
.packet
, NULL
, p
->receive_packet_callback_userdata
);
832 pa_packet_unref(p
->read
.packet
);
836 pa_assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
838 pa_assert(p
->import
);
840 if (!(b
= pa_memimport_get(p
->import
,
841 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
842 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
843 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
844 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
846 if (pa_log_ratelimit(PA_LOG_DEBUG
))
847 pa_log_debug("Failed to import memory block.");
850 if (p
->receive_memblock_callback
) {
856 chunk
.length
= b
? pa_memblock_get_length(b
) : ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
]);
859 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
860 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
862 p
->receive_memblock_callback(
864 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
866 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
868 p
->receive_memblock_callback_userdata
);
872 pa_memblock_unref(b
);
882 p
->read
.memblock
= NULL
;
883 p
->read
.packet
= NULL
;
888 p
->read_creds_valid
= FALSE
;
894 if (release_memblock
)
895 pa_memblock_release(release_memblock
);
900 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
902 pa_assert(PA_REFCNT_VALUE(p
) > 0);
904 p
->die_callback
= cb
;
905 p
->die_callback_userdata
= userdata
;
908 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
910 pa_assert(PA_REFCNT_VALUE(p
) > 0);
912 p
->drain_callback
= cb
;
913 p
->drain_callback_userdata
= userdata
;
916 void pa_pstream_set_receive_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
918 pa_assert(PA_REFCNT_VALUE(p
) > 0);
920 p
->receive_packet_callback
= cb
;
921 p
->receive_packet_callback_userdata
= userdata
;
924 void pa_pstream_set_receive_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
926 pa_assert(PA_REFCNT_VALUE(p
) > 0);
928 p
->receive_memblock_callback
= cb
;
929 p
->receive_memblock_callback_userdata
= userdata
;
932 void pa_pstream_set_release_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
934 pa_assert(PA_REFCNT_VALUE(p
) > 0);
936 p
->release_callback
= cb
;
937 p
->release_callback_userdata
= userdata
;
940 void pa_pstream_set_revoke_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
942 pa_assert(PA_REFCNT_VALUE(p
) > 0);
944 p
->release_callback
= cb
;
945 p
->release_callback_userdata
= userdata
;
948 pa_bool_t
pa_pstream_is_pending(pa_pstream
*p
) {
952 pa_assert(PA_REFCNT_VALUE(p
) > 0);
957 b
= p
->write
.current
|| !pa_queue_isempty(p
->send_queue
);
962 void pa_pstream_unref(pa_pstream
*p
) {
964 pa_assert(PA_REFCNT_VALUE(p
) > 0);
966 if (PA_REFCNT_DEC(p
) <= 0)
970 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
972 pa_assert(PA_REFCNT_VALUE(p
) > 0);
978 void pa_pstream_unlink(pa_pstream
*p
) {
987 pa_memimport_free(p
->import
);
992 pa_memexport_free(p
->export
);
997 pa_iochannel_free(p
->io
);
1001 if (p
->defer_event
) {
1002 p
->mainloop
->defer_free(p
->defer_event
);
1003 p
->defer_event
= NULL
;
1006 p
->die_callback
= NULL
;
1007 p
->drain_callback
= NULL
;
1008 p
->receive_packet_callback
= NULL
;
1009 p
->receive_memblock_callback
= NULL
;
1012 void pa_pstream_enable_shm(pa_pstream
*p
, pa_bool_t enable
) {
1014 pa_assert(PA_REFCNT_VALUE(p
) > 0);
1016 p
->use_shm
= enable
;
1021 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);
1026 pa_memexport_free(p
->export
);
1032 pa_bool_t
pa_pstream_get_shm(pa_pstream
*p
) {
1034 pa_assert(PA_REFCNT_VALUE(p
) > 0);