4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
45 #include <pulse/xmalloc.h>
47 #include <pulsecore/queue.h>
48 #include <pulsecore/log.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/creds.h>
51 #include <pulsecore/refcnt.h>
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA 0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE 0xC0000000LU
59 #define PA_FLAG_SHMMASK 0xFF000000LU
60 #define PA_FLAG_SEEKMASK 0x000000FFLU
62 /* The sequence descriptor header consists of 5 32bit integers: */
64 PA_PSTREAM_DESCRIPTOR_LENGTH
,
65 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
66 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
68 PA_PSTREAM_DESCRIPTOR_FLAGS
,
69 PA_PSTREAM_DESCRIPTOR_MAX
72 /* If we have an SHM block, this info follows the descriptor */
74 PA_PSTREAM_SHM_BLOCKID
,
77 PA_PSTREAM_SHM_LENGTH
,
81 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
85 #define FRAME_SIZE_MAX_USE (1024*64)
89 PA_PSTREAM_ITEM_PACKET
,
90 PA_PSTREAM_ITEM_MEMBLOCK
,
91 PA_PSTREAM_ITEM_SHMRELEASE
,
92 PA_PSTREAM_ITEM_SHMREVOKE
107 pa_seek_mode_t seek_mode
;
109 /* release/revoke info */
116 pa_mainloop_api
*mainloop
;
117 pa_defer_event
*defer_event
;
120 pa_queue
*send_queue
;
125 pa_pstream_descriptor descriptor
;
126 struct item_info
* current
;
127 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
130 pa_memchunk memchunk
;
134 pa_pstream_descriptor descriptor
;
135 pa_memblock
*memblock
;
137 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
143 pa_memimport
*import
;
144 pa_memexport
*export
;
146 pa_pstream_packet_cb_t recieve_packet_callback
;
147 void *recieve_packet_callback_userdata
;
149 pa_pstream_memblock_cb_t recieve_memblock_callback
;
150 void *recieve_memblock_callback_userdata
;
152 pa_pstream_notify_cb_t drain_callback
;
153 void *drain_callback_userdata
;
155 pa_pstream_notify_cb_t die_callback
;
156 void *die_callback_userdata
;
158 pa_pstream_block_id_cb_t revoke_callback
;
159 void *revoke_callback_userdata
;
161 pa_pstream_block_id_cb_t release_callback
;
162 void *release_callback_userdata
;
167 pa_creds read_creds
, write_creds
;
168 int read_creds_valid
, send_creds_now
;
172 static int do_write(pa_pstream
*p
);
173 static int do_read(pa_pstream
*p
);
175 static void do_something(pa_pstream
*p
) {
177 pa_assert(PA_REFCNT_VALUE(p
) > 0);
181 p
->mainloop
->defer_enable(p
->defer_event
, 0);
183 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
186 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
189 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
200 p
->die_callback(p
, p
->die_callback_userdata
);
202 pa_pstream_unlink(p
);
206 static void io_callback(pa_iochannel
*io
, void *userdata
) {
207 pa_pstream
*p
= userdata
;
210 pa_assert(PA_REFCNT_VALUE(p
) > 0);
211 pa_assert(p
->io
== io
);
216 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
217 pa_pstream
*p
= userdata
;
220 pa_assert(PA_REFCNT_VALUE(p
) > 0);
221 pa_assert(p
->defer_event
== e
);
222 pa_assert(p
->mainloop
== m
);
227 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
229 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
236 p
= pa_xnew(pa_pstream
, 1);
239 pa_iochannel_set_callback(io
, io_callback
, p
);
243 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
244 m
->defer_enable(p
->defer_event
, 0);
246 p
->send_queue
= pa_queue_new();
248 p
->write
.current
= NULL
;
250 pa_memchunk_reset(&p
->write
.memchunk
);
251 p
->read
.memblock
= NULL
;
252 p
->read
.packet
= NULL
;
255 p
->recieve_packet_callback
= NULL
;
256 p
->recieve_packet_callback_userdata
= NULL
;
257 p
->recieve_memblock_callback
= NULL
;
258 p
->recieve_memblock_callback_userdata
= NULL
;
259 p
->drain_callback
= NULL
;
260 p
->drain_callback_userdata
= NULL
;
261 p
->die_callback
= NULL
;
262 p
->die_callback_userdata
= NULL
;
263 p
->revoke_callback
= NULL
;
264 p
->revoke_callback_userdata
= NULL
;
265 p
->release_callback
= NULL
;
266 p
->release_callback_userdata
= NULL
;
273 /* We do importing unconditionally */
274 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
276 pa_iochannel_socket_set_rcvbuf(io
, 1024*8);
277 pa_iochannel_socket_set_sndbuf(io
, 1024*8);
280 p
->send_creds_now
= 0;
281 p
->read_creds_valid
= 0;
286 static void item_free(void *item
, PA_GCC_UNUSED
void *q
) {
287 struct item_info
*i
= item
;
290 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
291 pa_assert(i
->chunk
.memblock
);
292 pa_memblock_unref(i
->chunk
.memblock
);
293 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
294 pa_assert(i
->packet
);
295 pa_packet_unref(i
->packet
);
301 static void pstream_free(pa_pstream
*p
) {
304 pa_pstream_unlink(p
);
306 pa_queue_free(p
->send_queue
, item_free
, NULL
);
308 if (p
->write
.current
)
309 item_free(p
->write
.current
, NULL
);
311 if (p
->write
.memchunk
.memblock
)
312 pa_memblock_unref(p
->write
.memchunk
.memblock
);
314 if (p
->read
.memblock
)
315 pa_memblock_unref(p
->read
.memblock
);
318 pa_packet_unref(p
->read
.packet
);
323 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
327 pa_assert(PA_REFCNT_VALUE(p
) > 0);
333 i
= pa_xnew(struct item_info
, 1);
334 i
->type
= PA_PSTREAM_ITEM_PACKET
;
335 i
->packet
= pa_packet_ref(packet
);
338 if ((i
->with_creds
= !!creds
))
342 pa_queue_push(p
->send_queue
, i
);
344 p
->mainloop
->defer_enable(p
->defer_event
, 1);
347 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
351 pa_assert(PA_REFCNT_VALUE(p
) > 0);
352 pa_assert(channel
!= (uint32_t) -1);
359 length
= chunk
->length
;
365 i
= pa_xnew(struct item_info
, 1);
366 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
368 n
= length
< FRAME_SIZE_MAX_USE
? length
: FRAME_SIZE_MAX_USE
;
369 i
->chunk
.index
= chunk
->index
+ idx
;
371 i
->chunk
.memblock
= pa_memblock_ref(chunk
->memblock
);
373 i
->channel
= channel
;
375 i
->seek_mode
= seek_mode
;
380 pa_queue_push(p
->send_queue
, i
);
386 p
->mainloop
->defer_enable(p
->defer_event
, 1);
389 void pa_pstream_send_release(pa_pstream
*p
, uint32_t block_id
) {
390 struct item_info
*item
;
392 pa_assert(PA_REFCNT_VALUE(p
) > 0);
397 /* pa_log("Releasing block %u", block_id); */
399 item
= pa_xnew(struct item_info
, 1);
400 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
401 item
->block_id
= block_id
;
403 item
->with_creds
= 0;
406 pa_queue_push(p
->send_queue
, item
);
407 p
->mainloop
->defer_enable(p
->defer_event
, 1);
410 /* might be called from thread context */
411 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
412 pa_pstream
*p
= userdata
;
415 pa_assert(PA_REFCNT_VALUE(p
) > 0);
420 if (p
->release_callback
)
421 p
->release_callback(p
, block_id
, p
->release_callback_userdata
);
423 pa_pstream_send_release(p
, block_id
);
426 void pa_pstream_send_revoke(pa_pstream
*p
, uint32_t block_id
) {
427 struct item_info
*item
;
429 pa_assert(PA_REFCNT_VALUE(p
) > 0);
433 /* pa_log("Revoking block %u", block_id); */
435 item
= pa_xnew(struct item_info
, 1);
436 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
437 item
->block_id
= block_id
;
439 item
->with_creds
= 0;
442 pa_queue_push(p
->send_queue
, item
);
443 p
->mainloop
->defer_enable(p
->defer_event
, 1);
446 /* might be called from thread context */
447 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
448 pa_pstream
*p
= userdata
;
451 pa_assert(PA_REFCNT_VALUE(p
) > 0);
453 if (p
->revoke_callback
)
454 p
->revoke_callback(p
, block_id
, p
->revoke_callback_userdata
);
456 pa_pstream_send_revoke(p
, block_id
);
459 static void prepare_next_write_item(pa_pstream
*p
) {
461 pa_assert(PA_REFCNT_VALUE(p
) > 0);
463 p
->write
.current
= pa_queue_pop(p
->send_queue
);
465 if (!p
->write
.current
)
469 p
->write
.data
= NULL
;
470 pa_memchunk_reset(&p
->write
.memchunk
);
472 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
473 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
474 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
475 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
476 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
478 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
480 pa_assert(p
->write
.current
->packet
);
481 p
->write
.data
= p
->write
.current
->packet
->data
;
482 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
484 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
486 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
487 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
489 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
491 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
492 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
496 int send_payload
= 1;
498 pa_assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
499 pa_assert(p
->write
.current
->chunk
.memblock
);
501 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
502 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
503 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
505 flags
= p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
;
508 uint32_t block_id
, shm_id
;
509 size_t offset
, length
;
511 pa_assert(p
->export
);
513 if (pa_memexport_put(p
->export
,
514 p
->write
.current
->chunk
.memblock
,
520 flags
|= PA_FLAG_SHMDATA
;
523 p
->write
.shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
524 p
->write
.shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
525 p
->write
.shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
526 p
->write
.shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
528 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(sizeof(p
->write
.shm_info
));
529 p
->write
.data
= p
->write
.shm_info
;
532 /* pa_log_warn("Failed to export memory block."); */
536 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
537 p
->write
.memchunk
= p
->write
.current
->chunk
;
538 pa_memblock_ref(p
->write
.memchunk
.memblock
);
539 p
->write
.data
= NULL
;
542 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
546 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
547 p
->write_creds
= p
->write
.current
->creds
;
551 static int do_write(pa_pstream
*p
) {
555 pa_memblock
*release_memblock
= NULL
;
558 pa_assert(PA_REFCNT_VALUE(p
) > 0);
560 if (!p
->write
.current
)
561 prepare_next_write_item(p
);
563 if (!p
->write
.current
)
566 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
567 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
568 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
570 pa_assert(p
->write
.data
|| p
->write
.memchunk
.memblock
);
575 d
= (uint8_t*) pa_memblock_acquire(p
->write
.memchunk
.memblock
) + p
->write
.memchunk
.index
;
576 release_memblock
= p
->write
.memchunk
.memblock
;
579 d
= (uint8_t*) d
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
580 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
586 if (p
->send_creds_now
) {
588 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
591 p
->send_creds_now
= 0;
595 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
598 if (release_memblock
)
599 pa_memblock_release(release_memblock
);
603 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
604 pa_assert(p
->write
.current
);
605 item_free(p
->write
.current
, NULL
);
606 p
->write
.current
= NULL
;
608 if (p
->write
.memchunk
.memblock
)
609 pa_memblock_unref(p
->write
.memchunk
.memblock
);
611 pa_memchunk_reset(&p
->write
.memchunk
);
613 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
614 p
->drain_callback(p
, p
->drain_callback_userdata
);
621 if (release_memblock
)
622 pa_memblock_release(release_memblock
);
627 static int do_read(pa_pstream
*p
) {
631 pa_memblock
*release_memblock
= NULL
;
633 pa_assert(PA_REFCNT_VALUE(p
) > 0);
635 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
636 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
637 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
639 pa_assert(p
->read
.data
|| p
->read
.memblock
);
644 d
= pa_memblock_acquire(p
->read
.memblock
);
645 release_memblock
= p
->read
.memblock
;
648 d
= (uint8_t*) d
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
649 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
656 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
659 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
662 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
666 if (release_memblock
)
667 pa_memblock_release(release_memblock
);
671 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
672 uint32_t flags
, length
, channel
;
673 /* Reading of frame descriptor complete */
675 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
677 if (!p
->use_shm
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
678 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
682 if (flags
== PA_FLAG_SHMRELEASE
) {
684 /* This is a SHM memblock release frame with no payload */
686 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
688 pa_assert(p
->export
);
689 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
693 } else if (flags
== PA_FLAG_SHMREVOKE
) {
695 /* This is a SHM memblock revoke frame with no payload */
697 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
699 pa_assert(p
->import
);
700 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
705 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
707 if (length
> FRAME_SIZE_MAX_ALLOW
|| length
<= 0) {
708 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length
);
712 pa_assert(!p
->read
.packet
&& !p
->read
.memblock
);
714 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
716 if (channel
== (uint32_t) -1) {
719 pa_log_warn("Received packet frame with invalid flags value.");
723 /* Frame is a packet frame */
724 p
->read
.packet
= pa_packet_new(length
);
725 p
->read
.data
= p
->read
.packet
->data
;
729 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
730 pa_log_warn("Received memblock frame with invalid seek mode.");
734 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
736 if (length
!= sizeof(p
->read
.shm_info
)) {
737 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
741 /* Frame is a memblock frame referencing an SHM memblock */
742 p
->read
.data
= p
->read
.shm_info
;
744 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
746 /* Frame is a memblock frame */
748 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
752 pa_log_warn("Recieved memblock frame with invalid flags value.");
757 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
758 /* Frame payload available */
760 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) {
762 /* Is this memblock data? Than pass it to the user */
763 l
= (p
->read
.index
- r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
: (size_t) r
;
768 chunk
.memblock
= p
->read
.memblock
;
769 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
772 if (p
->recieve_memblock_callback
) {
776 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
777 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
779 p
->recieve_memblock_callback(
781 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
783 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
785 p
->recieve_memblock_callback_userdata
);
788 /* Drop seek info for following callbacks */
789 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
790 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
791 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
796 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
798 if (p
->read
.memblock
) {
800 /* This was a memblock frame. We can unref the memblock now */
801 pa_memblock_unref(p
->read
.memblock
);
803 } else if (p
->read
.packet
) {
805 if (p
->recieve_packet_callback
)
807 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->recieve_packet_callback_userdata
);
809 p
->recieve_packet_callback(p
, p
->read
.packet
, NULL
, p
->recieve_packet_callback_userdata
);
812 pa_packet_unref(p
->read
.packet
);
816 pa_assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
818 pa_assert(p
->import
);
820 if (!(b
= pa_memimport_get(p
->import
,
821 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
822 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
823 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
824 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
826 pa_log_warn("Failed to import memory block.");
830 if (p
->recieve_memblock_callback
) {
836 chunk
.length
= pa_memblock_get_length(b
);
839 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
840 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
842 p
->recieve_memblock_callback(
844 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
846 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
848 p
->recieve_memblock_callback_userdata
);
851 pa_memblock_unref(b
);
861 p
->read
.memblock
= NULL
;
862 p
->read
.packet
= NULL
;
867 p
->read_creds_valid
= 0;
873 if (release_memblock
)
874 pa_memblock_release(release_memblock
);
879 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
881 pa_assert(PA_REFCNT_VALUE(p
) > 0);
883 p
->die_callback
= cb
;
884 p
->die_callback_userdata
= userdata
;
887 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
889 pa_assert(PA_REFCNT_VALUE(p
) > 0);
891 p
->drain_callback
= cb
;
892 p
->drain_callback_userdata
= userdata
;
895 void pa_pstream_set_recieve_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
897 pa_assert(PA_REFCNT_VALUE(p
) > 0);
899 p
->recieve_packet_callback
= cb
;
900 p
->recieve_packet_callback_userdata
= userdata
;
903 void pa_pstream_set_recieve_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
905 pa_assert(PA_REFCNT_VALUE(p
) > 0);
907 p
->recieve_memblock_callback
= cb
;
908 p
->recieve_memblock_callback_userdata
= userdata
;
911 void pa_pstream_set_release_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
913 pa_assert(PA_REFCNT_VALUE(p
) > 0);
915 p
->release_callback
= cb
;
916 p
->release_callback_userdata
= userdata
;
919 void pa_pstream_set_revoke_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
921 pa_assert(PA_REFCNT_VALUE(p
) > 0);
923 p
->release_callback
= cb
;
924 p
->release_callback_userdata
= userdata
;
927 int pa_pstream_is_pending(pa_pstream
*p
) {
931 pa_assert(PA_REFCNT_VALUE(p
) > 0);
936 b
= p
->write
.current
|| !pa_queue_is_empty(p
->send_queue
);
941 void pa_pstream_unref(pa_pstream
*p
) {
943 pa_assert(PA_REFCNT_VALUE(p
) > 0);
945 if (PA_REFCNT_DEC(p
) <= 0)
949 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
951 pa_assert(PA_REFCNT_VALUE(p
) > 0);
957 void pa_pstream_unlink(pa_pstream
*p
) {
966 pa_memimport_free(p
->import
);
971 pa_memexport_free(p
->export
);
976 pa_iochannel_free(p
->io
);
980 if (p
->defer_event
) {
981 p
->mainloop
->defer_free(p
->defer_event
);
982 p
->defer_event
= NULL
;
985 p
->die_callback
= NULL
;
986 p
->drain_callback
= NULL
;
987 p
->recieve_packet_callback
= NULL
;
988 p
->recieve_memblock_callback
= NULL
;
991 void pa_pstream_use_shm(pa_pstream
*p
, int enable
) {
993 pa_assert(PA_REFCNT_VALUE(p
) > 0);
1000 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);
1005 pa_memexport_free(p
->export
);