4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
45 #include <pulse/xmalloc.h>
47 #include <pulsecore/queue.h>
48 #include <pulsecore/log.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/creds.h>
51 #include <pulsecore/refcnt.h>
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA 0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE 0xC0000000LU
59 #define PA_FLAG_SHMMASK 0xFF000000LU
60 #define PA_FLAG_SEEKMASK 0x000000FFLU
62 /* The sequence descriptor header consists of 5 32bit integers: */
64 PA_PSTREAM_DESCRIPTOR_LENGTH
,
65 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
66 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
68 PA_PSTREAM_DESCRIPTOR_FLAGS
,
69 PA_PSTREAM_DESCRIPTOR_MAX
72 /* If we have an SHM block, this info follows the descriptor */
74 PA_PSTREAM_SHM_BLOCKID
,
77 PA_PSTREAM_SHM_LENGTH
,
81 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
85 #define FRAME_SIZE_MAX_USE (1024*64)
89 PA_PSTREAM_ITEM_PACKET
,
90 PA_PSTREAM_ITEM_MEMBLOCK
,
91 PA_PSTREAM_ITEM_SHMRELEASE
,
92 PA_PSTREAM_ITEM_SHMREVOKE
107 pa_seek_mode_t seek_mode
;
109 /* release/revoke info */
116 pa_mainloop_api
*mainloop
;
117 pa_defer_event
*defer_event
;
120 pa_queue
*send_queue
;
125 pa_pstream_descriptor descriptor
;
126 struct item_info
* current
;
127 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
130 pa_memchunk memchunk
;
134 pa_pstream_descriptor descriptor
;
135 pa_memblock
*memblock
;
137 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
143 pa_memimport
*import
;
144 pa_memexport
*export
;
146 pa_pstream_packet_cb_t recieve_packet_callback
;
147 void *recieve_packet_callback_userdata
;
149 pa_pstream_memblock_cb_t recieve_memblock_callback
;
150 void *recieve_memblock_callback_userdata
;
152 pa_pstream_notify_cb_t drain_callback
;
153 void *drain_callback_userdata
;
155 pa_pstream_notify_cb_t die_callback
;
156 void *die_callback_userdata
;
161 pa_creds read_creds
, write_creds
;
162 int read_creds_valid
, send_creds_now
;
166 static int do_write(pa_pstream
*p
);
167 static int do_read(pa_pstream
*p
);
169 static void do_something(pa_pstream
*p
) {
171 pa_assert(PA_REFCNT_VALUE(p
) > 0);
175 p
->mainloop
->defer_enable(p
->defer_event
, 0);
177 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
180 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
183 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
194 p
->die_callback(p
, p
->die_callback_userdata
);
196 pa_pstream_unlink(p
);
200 static void io_callback(pa_iochannel
*io
, void *userdata
) {
201 pa_pstream
*p
= userdata
;
204 pa_assert(PA_REFCNT_VALUE(p
) > 0);
205 pa_assert(p
->io
== io
);
210 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
211 pa_pstream
*p
= userdata
;
214 pa_assert(PA_REFCNT_VALUE(p
) > 0);
215 pa_assert(p
->defer_event
== e
);
216 pa_assert(p
->mainloop
== m
);
221 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
223 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
230 p
= pa_xnew(pa_pstream
, 1);
233 pa_iochannel_set_callback(io
, io_callback
, p
);
237 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
238 m
->defer_enable(p
->defer_event
, 0);
240 p
->send_queue
= pa_queue_new();
241 pa_assert(p
->send_queue
);
243 p
->write
.current
= NULL
;
245 pa_memchunk_reset(&p
->write
.memchunk
);
246 p
->read
.memblock
= NULL
;
247 p
->read
.packet
= NULL
;
250 p
->recieve_packet_callback
= NULL
;
251 p
->recieve_packet_callback_userdata
= NULL
;
252 p
->recieve_memblock_callback
= NULL
;
253 p
->recieve_memblock_callback_userdata
= NULL
;
254 p
->drain_callback
= NULL
;
255 p
->drain_callback_userdata
= NULL
;
256 p
->die_callback
= NULL
;
257 p
->die_callback_userdata
= NULL
;
264 /* We do importing unconditionally */
265 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
267 pa_iochannel_socket_set_rcvbuf(io
, 1024*8);
268 pa_iochannel_socket_set_sndbuf(io
, 1024*8);
271 p
->send_creds_now
= 0;
272 p
->read_creds_valid
= 0;
277 static void item_free(void *item
, PA_GCC_UNUSED
void *p
) {
278 struct item_info
*i
= item
;
281 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
282 pa_assert(i
->chunk
.memblock
);
283 pa_memblock_unref(i
->chunk
.memblock
);
284 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
285 pa_assert(i
->packet
);
286 pa_packet_unref(i
->packet
);
292 static void pstream_free(pa_pstream
*p
) {
295 pa_pstream_unlink(p
);
297 pa_queue_free(p
->send_queue
, item_free
, NULL
);
299 if (p
->write
.current
)
300 item_free(p
->write
.current
, NULL
);
302 if (p
->read
.memblock
)
303 pa_memblock_unref(p
->read
.memblock
);
306 pa_packet_unref(p
->read
.packet
);
308 if (p
->write
.memchunk
.memblock
)
309 pa_memblock_unref(p
->write
.memchunk
.memblock
);
314 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
318 pa_assert(PA_REFCNT_VALUE(p
) > 0);
324 i
= pa_xnew(struct item_info
, 1);
325 i
->type
= PA_PSTREAM_ITEM_PACKET
;
326 i
->packet
= pa_packet_ref(packet
);
329 if ((i
->with_creds
= !!creds
))
333 pa_queue_push(p
->send_queue
, i
);
335 p
->mainloop
->defer_enable(p
->defer_event
, 1);
338 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
342 pa_assert(PA_REFCNT_VALUE(p
) > 0);
343 pa_assert(channel
!= (uint32_t) -1);
350 length
= chunk
->length
;
356 i
= pa_xnew(struct item_info
, 1);
357 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
359 n
= length
< FRAME_SIZE_MAX_USE
? length
: FRAME_SIZE_MAX_USE
;
360 i
->chunk
.index
= chunk
->index
+ idx
;
362 i
->chunk
.memblock
= pa_memblock_ref(chunk
->memblock
);
364 i
->channel
= channel
;
366 i
->seek_mode
= seek_mode
;
371 pa_queue_push(p
->send_queue
, i
);
377 p
->mainloop
->defer_enable(p
->defer_event
, 1);
380 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
381 struct item_info
*item
;
382 pa_pstream
*p
= userdata
;
385 pa_assert(PA_REFCNT_VALUE(p
) > 0);
390 /* pa_log("Releasing block %u", block_id); */
392 item
= pa_xnew(struct item_info
, 1);
393 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
394 item
->block_id
= block_id
;
396 item
->with_creds
= 0;
399 pa_queue_push(p
->send_queue
, item
);
400 p
->mainloop
->defer_enable(p
->defer_event
, 1);
403 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
404 struct item_info
*item
;
405 pa_pstream
*p
= userdata
;
408 pa_assert(PA_REFCNT_VALUE(p
) > 0);
412 /* pa_log("Revoking block %u", block_id); */
414 item
= pa_xnew(struct item_info
, 1);
415 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
416 item
->block_id
= block_id
;
418 item
->with_creds
= 0;
421 pa_queue_push(p
->send_queue
, item
);
422 p
->mainloop
->defer_enable(p
->defer_event
, 1);
425 static void prepare_next_write_item(pa_pstream
*p
) {
427 pa_assert(PA_REFCNT_VALUE(p
) > 0);
429 p
->write
.current
= pa_queue_pop(p
->send_queue
);
431 if (!p
->write
.current
)
435 p
->write
.data
= NULL
;
436 pa_memchunk_reset(&p
->write
.memchunk
);
438 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
439 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
440 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
441 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
442 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
444 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
446 pa_assert(p
->write
.current
->packet
);
447 p
->write
.data
= p
->write
.current
->packet
->data
;
448 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
450 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
452 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
453 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
455 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
457 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
458 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
462 int send_payload
= 1;
464 pa_assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
465 pa_assert(p
->write
.current
->chunk
.memblock
);
467 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
468 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
469 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
471 flags
= p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
;
474 uint32_t block_id
, shm_id
;
475 size_t offset
, length
;
477 pa_assert(p
->export
);
479 if (pa_memexport_put(p
->export
,
480 p
->write
.current
->chunk
.memblock
,
486 flags
|= PA_FLAG_SHMDATA
;
489 p
->write
.shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
490 p
->write
.shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
491 p
->write
.shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
492 p
->write
.shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
494 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(sizeof(p
->write
.shm_info
));
495 p
->write
.data
= p
->write
.shm_info
;
498 /* pa_log_warn("Failed to export memory block."); */
502 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
503 p
->write
.memchunk
= p
->write
.current
->chunk
;
504 pa_memblock_ref(p
->write
.memchunk
.memblock
);
505 p
->write
.data
= NULL
;
508 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
512 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
513 p
->write_creds
= p
->write
.current
->creds
;
517 static int do_write(pa_pstream
*p
) {
521 pa_memblock
*release_memblock
= NULL
;
524 pa_assert(PA_REFCNT_VALUE(p
) > 0);
526 if (!p
->write
.current
)
527 prepare_next_write_item(p
);
529 if (!p
->write
.current
)
532 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
533 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
534 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
536 pa_assert(p
->write
.data
|| p
->write
.memchunk
.memblock
);
541 d
= (uint8_t*) pa_memblock_acquire(p
->write
.memchunk
.memblock
) + p
->write
.memchunk
.index
;
542 release_memblock
= p
->write
.memchunk
.memblock
;
545 d
= (uint8_t*) d
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
546 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
552 if (p
->send_creds_now
) {
554 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
557 p
->send_creds_now
= 0;
561 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
564 if (release_memblock
)
565 pa_memblock_release(release_memblock
);
569 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
570 pa_assert(p
->write
.current
);
571 item_free(p
->write
.current
, (void *) 1);
572 p
->write
.current
= NULL
;
574 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
575 p
->drain_callback(p
, p
->drain_callback_userdata
);
582 if (release_memblock
)
583 pa_memblock_release(release_memblock
);
588 static int do_read(pa_pstream
*p
) {
592 pa_memblock
*release_memblock
= NULL
;
594 pa_assert(PA_REFCNT_VALUE(p
) > 0);
596 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
597 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
598 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
600 pa_assert(p
->read
.data
|| p
->read
.memblock
);
605 d
= pa_memblock_acquire(p
->read
.memblock
);
606 release_memblock
= p
->read
.memblock
;
609 d
= (uint8_t*) d
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
610 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
617 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
620 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
623 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
627 if (release_memblock
)
628 pa_memblock_release(release_memblock
);
632 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
633 uint32_t flags
, length
, channel
;
634 /* Reading of frame descriptor complete */
636 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
638 if (!p
->use_shm
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
639 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
643 if (flags
== PA_FLAG_SHMRELEASE
) {
645 /* This is a SHM memblock release frame with no payload */
647 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
649 pa_assert(p
->export
);
650 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
654 } else if (flags
== PA_FLAG_SHMREVOKE
) {
656 /* This is a SHM memblock revoke frame with no payload */
658 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
660 pa_assert(p
->import
);
661 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
666 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
668 if (length
> FRAME_SIZE_MAX_ALLOW
|| length
<= 0) {
669 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length
);
673 pa_assert(!p
->read
.packet
&& !p
->read
.memblock
);
675 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
677 if (channel
== (uint32_t) -1) {
680 pa_log_warn("Received packet frame with invalid flags value.");
684 /* Frame is a packet frame */
685 p
->read
.packet
= pa_packet_new(length
);
686 p
->read
.data
= p
->read
.packet
->data
;
690 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
691 pa_log_warn("Received memblock frame with invalid seek mode.");
695 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
697 if (length
!= sizeof(p
->read
.shm_info
)) {
698 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
702 /* Frame is a memblock frame referencing an SHM memblock */
703 p
->read
.data
= p
->read
.shm_info
;
705 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
707 /* Frame is a memblock frame */
709 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
713 pa_log_warn("Recieved memblock frame with invalid flags value.");
718 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
719 /* Frame payload available */
721 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) {
723 /* Is this memblock data? Than pass it to the user */
724 l
= (p
->read
.index
- r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
: (size_t) r
;
729 chunk
.memblock
= p
->read
.memblock
;
730 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
733 if (p
->recieve_memblock_callback
) {
737 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
738 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
740 p
->recieve_memblock_callback(
742 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
744 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
746 p
->recieve_memblock_callback_userdata
);
749 /* Drop seek info for following callbacks */
750 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
751 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
752 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
757 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
759 if (p
->read
.memblock
) {
761 /* This was a memblock frame. We can unref the memblock now */
762 pa_memblock_unref(p
->read
.memblock
);
764 } else if (p
->read
.packet
) {
766 if (p
->recieve_packet_callback
)
768 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->recieve_packet_callback_userdata
);
770 p
->recieve_packet_callback(p
, p
->read
.packet
, NULL
, p
->recieve_packet_callback_userdata
);
773 pa_packet_unref(p
->read
.packet
);
777 pa_assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
779 pa_assert(p
->import
);
781 if (!(b
= pa_memimport_get(p
->import
,
782 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
783 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
784 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
785 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
787 pa_log_warn("Failed to import memory block.");
791 if (p
->recieve_memblock_callback
) {
797 chunk
.length
= pa_memblock_get_length(b
);
800 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
801 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
803 p
->recieve_memblock_callback(
805 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
807 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
809 p
->recieve_memblock_callback_userdata
);
812 pa_memblock_unref(b
);
822 p
->read
.memblock
= NULL
;
823 p
->read
.packet
= NULL
;
828 p
->read_creds_valid
= 0;
834 if (release_memblock
)
835 pa_memblock_release(release_memblock
);
840 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
842 pa_assert(PA_REFCNT_VALUE(p
) > 0);
844 p
->die_callback
= cb
;
845 p
->die_callback_userdata
= userdata
;
848 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
850 pa_assert(PA_REFCNT_VALUE(p
) > 0);
852 p
->drain_callback
= cb
;
853 p
->drain_callback_userdata
= userdata
;
856 void pa_pstream_set_recieve_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
858 pa_assert(PA_REFCNT_VALUE(p
) > 0);
860 p
->recieve_packet_callback
= cb
;
861 p
->recieve_packet_callback_userdata
= userdata
;
864 void pa_pstream_set_recieve_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
866 pa_assert(PA_REFCNT_VALUE(p
) > 0);
868 p
->recieve_memblock_callback
= cb
;
869 p
->recieve_memblock_callback_userdata
= userdata
;
872 int pa_pstream_is_pending(pa_pstream
*p
) {
876 pa_assert(PA_REFCNT_VALUE(p
) > 0);
881 b
= p
->write
.current
|| !pa_queue_is_empty(p
->send_queue
);
886 void pa_pstream_unref(pa_pstream
*p
) {
888 pa_assert(PA_REFCNT_VALUE(p
) > 0);
890 if (PA_REFCNT_DEC(p
) <= 0)
894 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
896 pa_assert(PA_REFCNT_VALUE(p
) > 0);
902 void pa_pstream_unlink(pa_pstream
*p
) {
911 pa_memimport_free(p
->import
);
916 pa_memexport_free(p
->export
);
921 pa_iochannel_free(p
->io
);
925 if (p
->defer_event
) {
926 p
->mainloop
->defer_free(p
->defer_event
);
927 p
->defer_event
= NULL
;
930 p
->die_callback
= NULL
;
931 p
->drain_callback
= NULL
;
932 p
->recieve_packet_callback
= NULL
;
933 p
->recieve_memblock_callback
= NULL
;
936 void pa_pstream_use_shm(pa_pstream
*p
, int enable
) {
938 pa_assert(PA_REFCNT_VALUE(p
) > 0);
945 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);
950 pa_memexport_free(p
->export
);