4 This file is part of PulseAudio.
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
43 #include <pulse/xmalloc.h>
45 #include <pulsecore/queue.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-scache.h>
48 #include <pulsecore/creds.h>
52 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
53 #define PA_FLAG_SHMDATA 0x80000000LU
54 #define PA_FLAG_SHMRELEASE 0x40000000LU
55 #define PA_FLAG_SHMREVOKE 0xC0000000LU
56 #define PA_FLAG_SHMMASK 0xFF000000LU
57 #define PA_FLAG_SEEKMASK 0x000000FFLU
59 /* The sequence descriptor header consists of 5 32bit integers: */
61 PA_PSTREAM_DESCRIPTOR_LENGTH
,
62 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
63 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
64 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
65 PA_PSTREAM_DESCRIPTOR_FLAGS
,
66 PA_PSTREAM_DESCRIPTOR_MAX
69 /* If we have an SHM block, this info follows the descriptor */
71 PA_PSTREAM_SHM_BLOCKID
,
74 PA_PSTREAM_SHM_LENGTH
,
78 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
80 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
81 #define FRAME_SIZE_MAX PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
85 PA_PSTREAM_ITEM_PACKET
,
86 PA_PSTREAM_ITEM_MEMBLOCK
,
87 PA_PSTREAM_ITEM_SHMRELEASE
,
88 PA_PSTREAM_ITEM_SHMREVOKE
103 pa_seek_mode_t seek_mode
;
105 /* release/revoke info */
112 pa_mainloop_api
*mainloop
;
113 pa_defer_event
*defer_event
;
115 pa_queue
*send_queue
;
120 pa_pstream_descriptor descriptor
;
121 struct item_info
* current
;
122 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
128 pa_pstream_descriptor descriptor
;
129 pa_memblock
*memblock
;
131 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
137 pa_memimport
*import
;
138 pa_memexport
*export
;
140 pa_pstream_packet_cb_t recieve_packet_callback
;
141 void *recieve_packet_callback_userdata
;
143 pa_pstream_memblock_cb_t recieve_memblock_callback
;
144 void *recieve_memblock_callback_userdata
;
146 pa_pstream_notify_cb_t drain_callback
;
147 void *drain_callback_userdata
;
149 pa_pstream_notify_cb_t die_callback
;
150 void *die_callback_userdata
;
155 pa_creds read_creds
, write_creds
;
156 int read_creds_valid
, send_creds_now
;
160 static int do_write(pa_pstream
*p
);
161 static int do_read(pa_pstream
*p
);
163 static void do_something(pa_pstream
*p
) {
166 p
->mainloop
->defer_enable(p
->defer_event
, 0);
170 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
173 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
176 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
189 p
->die_callback(p
, p
->die_callback_userdata
);
194 static void io_callback(pa_iochannel
*io
, void *userdata
) {
195 pa_pstream
*p
= userdata
;
203 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
204 pa_pstream
*p
= userdata
;
207 assert(p
->defer_event
== e
);
208 assert(p
->mainloop
== m
);
213 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
215 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
222 p
= pa_xnew(pa_pstream
, 1);
225 pa_iochannel_set_callback(io
, io_callback
, p
);
229 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
230 m
->defer_enable(p
->defer_event
, 0);
232 p
->send_queue
= pa_queue_new();
233 assert(p
->send_queue
);
235 p
->write
.current
= NULL
;
237 p
->read
.memblock
= NULL
;
238 p
->read
.packet
= NULL
;
241 p
->recieve_packet_callback
= NULL
;
242 p
->recieve_packet_callback_userdata
= NULL
;
243 p
->recieve_memblock_callback
= NULL
;
244 p
->recieve_memblock_callback_userdata
= NULL
;
245 p
->drain_callback
= NULL
;
246 p
->drain_callback_userdata
= NULL
;
247 p
->die_callback
= NULL
;
248 p
->die_callback_userdata
= NULL
;
256 pa_iochannel_socket_set_rcvbuf(io
, 1024*8);
257 pa_iochannel_socket_set_sndbuf(io
, 1024*8);
260 p
->send_creds_now
= 0;
261 p
->read_creds_valid
= 0;
266 static void item_free(void *item
, PA_GCC_UNUSED
void *p
) {
267 struct item_info
*i
= item
;
270 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
271 assert(i
->chunk
.memblock
);
272 pa_memblock_unref(i
->chunk
.memblock
);
273 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
275 pa_packet_unref(i
->packet
);
281 static void pstream_free(pa_pstream
*p
) {
286 pa_queue_free(p
->send_queue
, item_free
, NULL
);
288 if (p
->write
.current
)
289 item_free(p
->write
.current
, NULL
);
291 if (p
->read
.memblock
)
292 pa_memblock_unref(p
->read
.memblock
);
295 pa_packet_unref(p
->read
.packet
);
300 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
310 i
= pa_xnew(struct item_info
, 1);
311 i
->type
= PA_PSTREAM_ITEM_PACKET
;
312 i
->packet
= pa_packet_ref(packet
);
315 if ((i
->with_creds
= !!creds
))
319 pa_queue_push(p
->send_queue
, i
);
320 p
->mainloop
->defer_enable(p
->defer_event
, 1);
323 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
328 assert(channel
!= (uint32_t) -1);
334 i
= pa_xnew(struct item_info
, 1);
335 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
337 i
->channel
= channel
;
339 i
->seek_mode
= seek_mode
;
344 pa_memblock_ref(i
->chunk
.memblock
);
346 pa_queue_push(p
->send_queue
, i
);
347 p
->mainloop
->defer_enable(p
->defer_event
, 1);
350 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
351 struct item_info
*item
;
352 pa_pstream
*p
= userdata
;
360 /* pa_log(__FILE__": Releasing block %u", block_id); */
362 item
= pa_xnew(struct item_info
, 1);
363 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
364 item
->block_id
= block_id
;
366 item
->with_creds
= 0;
369 pa_queue_push(p
->send_queue
, item
);
370 p
->mainloop
->defer_enable(p
->defer_event
, 1);
373 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
374 struct item_info
*item
;
375 pa_pstream
*p
= userdata
;
383 /* pa_log(__FILE__": Revoking block %u", block_id); */
385 item
= pa_xnew(struct item_info
, 1);
386 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
387 item
->block_id
= block_id
;
389 item
->with_creds
= 0;
392 pa_queue_push(p
->send_queue
, item
);
393 p
->mainloop
->defer_enable(p
->defer_event
, 1);
396 static void prepare_next_write_item(pa_pstream
*p
) {
399 if (!(p
->write
.current
= pa_queue_pop(p
->send_queue
)))
403 p
->write
.data
= NULL
;
405 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
406 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
407 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
408 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
409 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
411 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
413 assert(p
->write
.current
->packet
);
414 p
->write
.data
= p
->write
.current
->packet
->data
;
415 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
417 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
419 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
420 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
422 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
424 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
425 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
429 int send_payload
= 1;
431 assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
432 assert(p
->write
.current
->chunk
.memblock
);
434 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
435 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
436 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
438 flags
= p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
;
441 uint32_t block_id
, shm_id
;
442 size_t offset
, length
;
446 if (pa_memexport_put(p
->export
,
447 p
->write
.current
->chunk
.memblock
,
453 flags
|= PA_FLAG_SHMDATA
;
456 p
->write
.shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
457 p
->write
.shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
458 p
->write
.shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
459 p
->write
.shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
461 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(sizeof(p
->write
.shm_info
));
462 p
->write
.data
= p
->write
.shm_info
;
465 /* pa_log_warn(__FILE__": Failed to export memory block."); */
469 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
470 p
->write
.data
= (uint8_t*) p
->write
.current
->chunk
.memblock
->data
+ p
->write
.current
->chunk
.index
;
473 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
477 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
478 p
->write_creds
= p
->write
.current
->creds
;
483 static int do_write(pa_pstream
*p
) {
489 if (!p
->write
.current
)
490 prepare_next_write_item(p
);
492 if (!p
->write
.current
)
495 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
496 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
497 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
499 assert(p
->write
.data
);
501 d
= (uint8_t*) p
->write
.data
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
502 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
508 if (p
->send_creds_now
) {
510 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
513 p
->send_creds_now
= 0;
517 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
522 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
523 assert(p
->write
.current
);
524 item_free(p
->write
.current
, (void *) 1);
525 p
->write
.current
= NULL
;
527 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
528 p
->drain_callback(p
, p
->drain_callback_userdata
);
534 static int do_read(pa_pstream
*p
) {
540 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
541 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
542 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
544 assert(p
->read
.data
);
545 d
= (uint8_t*) p
->read
.data
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
546 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
553 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
556 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
559 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
565 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
566 uint32_t flags
, length
, channel
;
567 /* Reading of frame descriptor complete */
569 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
571 if (!p
->use_shm
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
572 pa_log_warn(__FILE__
": Recieved SHM frame on a socket where SHM is disabled.");
576 if (flags
== PA_FLAG_SHMRELEASE
) {
578 /* This is a SHM memblock release frame with no payload */
580 /* pa_log(__FILE__": Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
583 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
587 } else if (flags
== PA_FLAG_SHMREVOKE
) {
589 /* This is a SHM memblock revoke frame with no payload */
591 /* pa_log(__FILE__": Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
594 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
599 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
601 if (length
> FRAME_SIZE_MAX
) {
602 pa_log_warn(__FILE__
": Recieved invalid frame size : %lu", (unsigned long) length
);
606 assert(!p
->read
.packet
&& !p
->read
.memblock
);
608 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
610 if (channel
== (uint32_t) -1) {
613 pa_log_warn(__FILE__
": Received packet frame with invalid flags value.");
617 /* Frame is a packet frame */
618 p
->read
.packet
= pa_packet_new(length
);
619 p
->read
.data
= p
->read
.packet
->data
;
623 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
624 pa_log_warn(__FILE__
": Received memblock frame with invalid seek mode.");
628 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
630 if (length
!= sizeof(p
->read
.shm_info
)) {
631 pa_log_warn(__FILE__
": Recieved SHM memblock frame with Invalid frame length.");
635 /* Frame is a memblock frame referencing an SHM memblock */
636 p
->read
.data
= p
->read
.shm_info
;
638 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
640 /* Frame is a memblock frame */
642 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
643 p
->read
.data
= p
->read
.memblock
->data
;
646 pa_log_warn(__FILE__
": Recieved memblock frame with invalid flags value.");
651 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
652 /* Frame payload available */
654 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) {
656 /* Is this memblock data? Than pass it to the user */
657 l
= (p
->read
.index
- r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
: (size_t) r
;
662 chunk
.memblock
= p
->read
.memblock
;
663 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
666 if (p
->recieve_memblock_callback
) {
670 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
671 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
673 p
->recieve_memblock_callback(
675 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
677 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
679 p
->recieve_memblock_callback_userdata
);
682 /* Drop seek info for following callbacks */
683 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
684 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
685 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
690 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
692 if (p
->read
.memblock
) {
694 /* This was a memblock frame. We can unref the memblock now */
695 pa_memblock_unref(p
->read
.memblock
);
697 } else if (p
->read
.packet
) {
699 if (p
->recieve_packet_callback
)
701 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->recieve_packet_callback_userdata
);
703 p
->recieve_packet_callback(p
, p
->read
.packet
, NULL
, p
->recieve_packet_callback_userdata
);
706 pa_packet_unref(p
->read
.packet
);
710 assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
714 if (!(b
= pa_memimport_get(p
->import
,
715 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
716 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
717 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
718 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
720 pa_log_warn(__FILE__
": Failed to import memory block.");
724 if (p
->recieve_memblock_callback
) {
730 chunk
.length
= b
->length
;
733 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
734 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
736 p
->recieve_memblock_callback(
738 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
740 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
742 p
->recieve_memblock_callback_userdata
);
745 pa_memblock_unref(b
);
755 p
->read
.memblock
= NULL
;
756 p
->read
.packet
= NULL
;
760 p
->read_creds_valid
= 0;
766 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
770 p
->die_callback
= cb
;
771 p
->die_callback_userdata
= userdata
;
775 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
779 p
->drain_callback
= cb
;
780 p
->drain_callback_userdata
= userdata
;
783 void pa_pstream_set_recieve_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
787 p
->recieve_packet_callback
= cb
;
788 p
->recieve_packet_callback_userdata
= userdata
;
791 void pa_pstream_set_recieve_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
795 p
->recieve_memblock_callback
= cb
;
796 p
->recieve_memblock_callback_userdata
= userdata
;
799 int pa_pstream_is_pending(pa_pstream
*p
) {
805 return p
->write
.current
|| !pa_queue_is_empty(p
->send_queue
);
808 void pa_pstream_unref(pa_pstream
*p
) {
816 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
824 void pa_pstream_close(pa_pstream
*p
) {
830 pa_memimport_free(p
->import
);
835 pa_memexport_free(p
->export
);
840 pa_iochannel_free(p
->io
);
844 if (p
->defer_event
) {
845 p
->mainloop
->defer_free(p
->defer_event
);
846 p
->defer_event
= NULL
;
849 p
->die_callback
= NULL
;
850 p
->drain_callback
= NULL
;
851 p
->recieve_packet_callback
= NULL
;
852 p
->recieve_memblock_callback
= NULL
;
857 void pa_pstream_use_shm(pa_pstream
*p
, int enable
) {
864 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
867 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);