4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
45 #include <pulse/xmalloc.h>
47 #include <pulsecore/queue.h>
48 #include <pulsecore/log.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/creds.h>
51 #include <pulsecore/refcnt.h>
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA 0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE 0xC0000000LU
59 #define PA_FLAG_SHMMASK 0xFF000000LU
60 #define PA_FLAG_SEEKMASK 0x000000FFLU
62 /* The sequence descriptor header consists of 5 32bit integers: */
64 PA_PSTREAM_DESCRIPTOR_LENGTH
,
65 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
66 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
68 PA_PSTREAM_DESCRIPTOR_FLAGS
,
69 PA_PSTREAM_DESCRIPTOR_MAX
72 /* If we have an SHM block, this info follows the descriptor */
74 PA_PSTREAM_SHM_BLOCKID
,
77 PA_PSTREAM_SHM_LENGTH
,
81 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
85 #define FRAME_SIZE_MAX_USE (1024*64)
89 PA_PSTREAM_ITEM_PACKET
,
90 PA_PSTREAM_ITEM_MEMBLOCK
,
91 PA_PSTREAM_ITEM_SHMRELEASE
,
92 PA_PSTREAM_ITEM_SHMREVOKE
107 pa_seek_mode_t seek_mode
;
109 /* release/revoke info */
116 pa_mainloop_api
*mainloop
;
117 pa_defer_event
*defer_event
;
120 pa_queue
*send_queue
;
125 pa_pstream_descriptor descriptor
;
126 struct item_info
* current
;
127 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
130 pa_memchunk memchunk
;
134 pa_pstream_descriptor descriptor
;
135 pa_memblock
*memblock
;
137 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
143 pa_memimport
*import
;
144 pa_memexport
*export
;
146 pa_pstream_packet_cb_t recieve_packet_callback
;
147 void *recieve_packet_callback_userdata
;
149 pa_pstream_memblock_cb_t recieve_memblock_callback
;
150 void *recieve_memblock_callback_userdata
;
152 pa_pstream_notify_cb_t drain_callback
;
153 void *drain_callback_userdata
;
155 pa_pstream_notify_cb_t die_callback
;
156 void *die_callback_userdata
;
161 pa_creds read_creds
, write_creds
;
162 int read_creds_valid
, send_creds_now
;
166 static int do_write(pa_pstream
*p
);
167 static int do_read(pa_pstream
*p
);
169 static void do_something(pa_pstream
*p
) {
171 pa_assert(PA_REFCNT_VALUE(p
) > 0);
175 p
->mainloop
->defer_enable(p
->defer_event
, 0);
177 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
180 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
183 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
194 p
->die_callback(p
, p
->die_callback_userdata
);
196 pa_pstream_unlink(p
);
200 static void io_callback(pa_iochannel
*io
, void *userdata
) {
201 pa_pstream
*p
= userdata
;
204 pa_assert(PA_REFCNT_VALUE(p
) > 0);
205 pa_assert(p
->io
== io
);
210 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
211 pa_pstream
*p
= userdata
;
214 pa_assert(PA_REFCNT_VALUE(p
) > 0);
215 pa_assert(p
->defer_event
== e
);
216 pa_assert(p
->mainloop
== m
);
221 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
223 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
230 p
= pa_xnew(pa_pstream
, 1);
233 pa_iochannel_set_callback(io
, io_callback
, p
);
237 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
238 m
->defer_enable(p
->defer_event
, 0);
240 p
->send_queue
= pa_queue_new();
242 p
->write
.current
= NULL
;
244 pa_memchunk_reset(&p
->write
.memchunk
);
245 p
->read
.memblock
= NULL
;
246 p
->read
.packet
= NULL
;
249 p
->recieve_packet_callback
= NULL
;
250 p
->recieve_packet_callback_userdata
= NULL
;
251 p
->recieve_memblock_callback
= NULL
;
252 p
->recieve_memblock_callback_userdata
= NULL
;
253 p
->drain_callback
= NULL
;
254 p
->drain_callback_userdata
= NULL
;
255 p
->die_callback
= NULL
;
256 p
->die_callback_userdata
= NULL
;
263 /* We do importing unconditionally */
264 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
266 pa_iochannel_socket_set_rcvbuf(io
, 1024*8);
267 pa_iochannel_socket_set_sndbuf(io
, 1024*8);
270 p
->send_creds_now
= 0;
271 p
->read_creds_valid
= 0;
276 static void item_free(void *item
, PA_GCC_UNUSED
void *q
) {
277 struct item_info
*i
= item
;
280 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
281 pa_assert(i
->chunk
.memblock
);
282 pa_memblock_unref(i
->chunk
.memblock
);
283 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
284 pa_assert(i
->packet
);
285 pa_packet_unref(i
->packet
);
291 static void pstream_free(pa_pstream
*p
) {
294 pa_pstream_unlink(p
);
296 pa_queue_free(p
->send_queue
, item_free
, NULL
);
298 if (p
->write
.current
)
299 item_free(p
->write
.current
, NULL
);
301 if (p
->write
.memchunk
.memblock
)
302 pa_memblock_unref(p
->write
.memchunk
.memblock
);
304 if (p
->read
.memblock
)
305 pa_memblock_unref(p
->read
.memblock
);
308 pa_packet_unref(p
->read
.packet
);
313 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
317 pa_assert(PA_REFCNT_VALUE(p
) > 0);
323 i
= pa_xnew(struct item_info
, 1);
324 i
->type
= PA_PSTREAM_ITEM_PACKET
;
325 i
->packet
= pa_packet_ref(packet
);
328 if ((i
->with_creds
= !!creds
))
332 pa_queue_push(p
->send_queue
, i
);
334 p
->mainloop
->defer_enable(p
->defer_event
, 1);
337 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
341 pa_assert(PA_REFCNT_VALUE(p
) > 0);
342 pa_assert(channel
!= (uint32_t) -1);
349 length
= chunk
->length
;
355 i
= pa_xnew(struct item_info
, 1);
356 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
358 n
= length
< FRAME_SIZE_MAX_USE
? length
: FRAME_SIZE_MAX_USE
;
359 i
->chunk
.index
= chunk
->index
+ idx
;
361 i
->chunk
.memblock
= pa_memblock_ref(chunk
->memblock
);
363 i
->channel
= channel
;
365 i
->seek_mode
= seek_mode
;
370 pa_queue_push(p
->send_queue
, i
);
376 p
->mainloop
->defer_enable(p
->defer_event
, 1);
379 /* might be called from thread context */
380 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
381 struct item_info
*item
;
382 pa_pstream
*p
= userdata
;
385 pa_assert(PA_REFCNT_VALUE(p
) > 0);
390 /* pa_log("Releasing block %u", block_id); */
392 item
= pa_xnew(struct item_info
, 1);
393 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
394 item
->block_id
= block_id
;
396 item
->with_creds
= 0;
399 pa_queue_push(p
->send_queue
, item
);
400 p
->mainloop
->defer_enable(p
->defer_event
, 1);
403 /* might be called from thread context */
404 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
405 struct item_info
*item
;
406 pa_pstream
*p
= userdata
;
409 pa_assert(PA_REFCNT_VALUE(p
) > 0);
413 /* pa_log("Revoking block %u", block_id); */
415 item
= pa_xnew(struct item_info
, 1);
416 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
417 item
->block_id
= block_id
;
419 item
->with_creds
= 0;
422 pa_queue_push(p
->send_queue
, item
);
423 p
->mainloop
->defer_enable(p
->defer_event
, 1);
426 static void prepare_next_write_item(pa_pstream
*p
) {
428 pa_assert(PA_REFCNT_VALUE(p
) > 0);
430 p
->write
.current
= pa_queue_pop(p
->send_queue
);
432 if (!p
->write
.current
)
436 p
->write
.data
= NULL
;
437 pa_memchunk_reset(&p
->write
.memchunk
);
439 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
440 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
441 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
442 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
443 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
445 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
447 pa_assert(p
->write
.current
->packet
);
448 p
->write
.data
= p
->write
.current
->packet
->data
;
449 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
451 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
453 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
454 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
456 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
458 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
459 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
463 int send_payload
= 1;
465 pa_assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
466 pa_assert(p
->write
.current
->chunk
.memblock
);
468 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
469 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
470 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
472 flags
= p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
;
475 uint32_t block_id
, shm_id
;
476 size_t offset
, length
;
478 pa_assert(p
->export
);
480 if (pa_memexport_put(p
->export
,
481 p
->write
.current
->chunk
.memblock
,
487 flags
|= PA_FLAG_SHMDATA
;
490 p
->write
.shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
491 p
->write
.shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
492 p
->write
.shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
493 p
->write
.shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
495 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(sizeof(p
->write
.shm_info
));
496 p
->write
.data
= p
->write
.shm_info
;
499 /* pa_log_warn("Failed to export memory block."); */
503 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
504 p
->write
.memchunk
= p
->write
.current
->chunk
;
505 pa_memblock_ref(p
->write
.memchunk
.memblock
);
506 p
->write
.data
= NULL
;
509 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
513 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
514 p
->write_creds
= p
->write
.current
->creds
;
518 static int do_write(pa_pstream
*p
) {
522 pa_memblock
*release_memblock
= NULL
;
525 pa_assert(PA_REFCNT_VALUE(p
) > 0);
527 if (!p
->write
.current
)
528 prepare_next_write_item(p
);
530 if (!p
->write
.current
)
533 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
534 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
535 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
537 pa_assert(p
->write
.data
|| p
->write
.memchunk
.memblock
);
542 d
= (uint8_t*) pa_memblock_acquire(p
->write
.memchunk
.memblock
) + p
->write
.memchunk
.index
;
543 release_memblock
= p
->write
.memchunk
.memblock
;
546 d
= (uint8_t*) d
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
547 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
553 if (p
->send_creds_now
) {
555 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
558 p
->send_creds_now
= 0;
562 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
565 if (release_memblock
)
566 pa_memblock_release(release_memblock
);
570 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
571 pa_assert(p
->write
.current
);
572 item_free(p
->write
.current
, NULL
);
573 p
->write
.current
= NULL
;
575 if (p
->write
.memchunk
.memblock
)
576 pa_memblock_unref(p
->write
.memchunk
.memblock
);
578 pa_memchunk_reset(&p
->write
.memchunk
);
580 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
581 p
->drain_callback(p
, p
->drain_callback_userdata
);
588 if (release_memblock
)
589 pa_memblock_release(release_memblock
);
594 static int do_read(pa_pstream
*p
) {
598 pa_memblock
*release_memblock
= NULL
;
600 pa_assert(PA_REFCNT_VALUE(p
) > 0);
602 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
603 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
604 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
606 pa_assert(p
->read
.data
|| p
->read
.memblock
);
611 d
= pa_memblock_acquire(p
->read
.memblock
);
612 release_memblock
= p
->read
.memblock
;
615 d
= (uint8_t*) d
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
616 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
623 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
626 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
629 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
633 if (release_memblock
)
634 pa_memblock_release(release_memblock
);
638 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
639 uint32_t flags
, length
, channel
;
640 /* Reading of frame descriptor complete */
642 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
644 if (!p
->use_shm
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
645 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
649 if (flags
== PA_FLAG_SHMRELEASE
) {
651 /* This is a SHM memblock release frame with no payload */
653 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
655 pa_assert(p
->export
);
656 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
660 } else if (flags
== PA_FLAG_SHMREVOKE
) {
662 /* This is a SHM memblock revoke frame with no payload */
664 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
666 pa_assert(p
->import
);
667 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
672 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
674 if (length
> FRAME_SIZE_MAX_ALLOW
|| length
<= 0) {
675 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length
);
679 pa_assert(!p
->read
.packet
&& !p
->read
.memblock
);
681 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
683 if (channel
== (uint32_t) -1) {
686 pa_log_warn("Received packet frame with invalid flags value.");
690 /* Frame is a packet frame */
691 p
->read
.packet
= pa_packet_new(length
);
692 p
->read
.data
= p
->read
.packet
->data
;
696 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
697 pa_log_warn("Received memblock frame with invalid seek mode.");
701 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
703 if (length
!= sizeof(p
->read
.shm_info
)) {
704 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
708 /* Frame is a memblock frame referencing an SHM memblock */
709 p
->read
.data
= p
->read
.shm_info
;
711 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
713 /* Frame is a memblock frame */
715 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
719 pa_log_warn("Recieved memblock frame with invalid flags value.");
724 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
725 /* Frame payload available */
727 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) {
729 /* Is this memblock data? Than pass it to the user */
730 l
= (p
->read
.index
- r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
: (size_t) r
;
735 chunk
.memblock
= p
->read
.memblock
;
736 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
739 if (p
->recieve_memblock_callback
) {
743 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
744 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
746 p
->recieve_memblock_callback(
748 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
750 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
752 p
->recieve_memblock_callback_userdata
);
755 /* Drop seek info for following callbacks */
756 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
757 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
758 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
763 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
765 if (p
->read
.memblock
) {
767 /* This was a memblock frame. We can unref the memblock now */
768 pa_memblock_unref(p
->read
.memblock
);
770 } else if (p
->read
.packet
) {
772 if (p
->recieve_packet_callback
)
774 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->recieve_packet_callback_userdata
);
776 p
->recieve_packet_callback(p
, p
->read
.packet
, NULL
, p
->recieve_packet_callback_userdata
);
779 pa_packet_unref(p
->read
.packet
);
783 pa_assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
785 pa_assert(p
->import
);
787 if (!(b
= pa_memimport_get(p
->import
,
788 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
789 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
790 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
791 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
793 pa_log_warn("Failed to import memory block.");
797 if (p
->recieve_memblock_callback
) {
803 chunk
.length
= pa_memblock_get_length(b
);
806 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
807 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
809 p
->recieve_memblock_callback(
811 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
813 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
815 p
->recieve_memblock_callback_userdata
);
818 pa_memblock_unref(b
);
828 p
->read
.memblock
= NULL
;
829 p
->read
.packet
= NULL
;
834 p
->read_creds_valid
= 0;
840 if (release_memblock
)
841 pa_memblock_release(release_memblock
);
846 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
848 pa_assert(PA_REFCNT_VALUE(p
) > 0);
850 p
->die_callback
= cb
;
851 p
->die_callback_userdata
= userdata
;
854 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
856 pa_assert(PA_REFCNT_VALUE(p
) > 0);
858 p
->drain_callback
= cb
;
859 p
->drain_callback_userdata
= userdata
;
862 void pa_pstream_set_recieve_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
864 pa_assert(PA_REFCNT_VALUE(p
) > 0);
866 p
->recieve_packet_callback
= cb
;
867 p
->recieve_packet_callback_userdata
= userdata
;
870 void pa_pstream_set_recieve_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
872 pa_assert(PA_REFCNT_VALUE(p
) > 0);
874 p
->recieve_memblock_callback
= cb
;
875 p
->recieve_memblock_callback_userdata
= userdata
;
878 int pa_pstream_is_pending(pa_pstream
*p
) {
882 pa_assert(PA_REFCNT_VALUE(p
) > 0);
887 b
= p
->write
.current
|| !pa_queue_is_empty(p
->send_queue
);
892 void pa_pstream_unref(pa_pstream
*p
) {
894 pa_assert(PA_REFCNT_VALUE(p
) > 0);
896 if (PA_REFCNT_DEC(p
) <= 0)
900 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
902 pa_assert(PA_REFCNT_VALUE(p
) > 0);
908 void pa_pstream_unlink(pa_pstream
*p
) {
917 pa_memimport_free(p
->import
);
922 pa_memexport_free(p
->export
);
927 pa_iochannel_free(p
->io
);
931 if (p
->defer_event
) {
932 p
->mainloop
->defer_free(p
->defer_event
);
933 p
->defer_event
= NULL
;
936 p
->die_callback
= NULL
;
937 p
->drain_callback
= NULL
;
938 p
->recieve_packet_callback
= NULL
;
939 p
->recieve_memblock_callback
= NULL
;
942 void pa_pstream_use_shm(pa_pstream
*p
, int enable
) {
944 pa_assert(PA_REFCNT_VALUE(p
) > 0);
951 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);
956 pa_memexport_free(p
->export
);