4 This file is part of PulseAudio.
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
43 #include <pulse/xmalloc.h>
45 #include <pulsecore/queue.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-scache.h>
48 #include <pulsecore/creds.h>
49 #include <pulsecore/mutex.h>
50 #include <pulsecore/refcnt.h>
51 #include <pulsecore/anotify.h>
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA 0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE 0xC0000000LU
59 #define PA_FLAG_SHMMASK 0xFF000000LU
60 #define PA_FLAG_SEEKMASK 0x000000FFLU
62 /* The sequence descriptor header consists of 5 32bit integers: */
64 PA_PSTREAM_DESCRIPTOR_LENGTH
,
65 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
66 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
68 PA_PSTREAM_DESCRIPTOR_FLAGS
,
69 PA_PSTREAM_DESCRIPTOR_MAX
72 /* If we have an SHM block, this info follows the descriptor */
74 PA_PSTREAM_SHM_BLOCKID
,
77 PA_PSTREAM_SHM_LENGTH
,
81 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
85 #define FRAME_SIZE_MAX_USE (1024*64)
89 PA_PSTREAM_ITEM_PACKET
,
90 PA_PSTREAM_ITEM_MEMBLOCK
,
91 PA_PSTREAM_ITEM_SHMRELEASE
,
92 PA_PSTREAM_ITEM_SHMREVOKE
107 pa_seek_mode_t seek_mode
;
109 /* release/revoke info */
116 pa_mainloop_api
*mainloop
;
119 pa_queue
*send_queue
;
120 pa_mutex
*mutex
; /* only for access to the queue */
126 pa_pstream_descriptor descriptor
;
127 struct item_info
* current
;
128 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
131 pa_memchunk memchunk
;
135 pa_pstream_descriptor descriptor
;
136 pa_memblock
*memblock
;
138 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
144 pa_memimport
*import
;
145 pa_memexport
*export
;
147 pa_pstream_packet_cb_t recieve_packet_callback
;
148 void *recieve_packet_callback_userdata
;
150 pa_pstream_memblock_cb_t recieve_memblock_callback
;
151 void *recieve_memblock_callback_userdata
;
153 pa_pstream_notify_cb_t drain_callback
;
154 void *drain_callback_userdata
;
156 pa_pstream_notify_cb_t die_callback
;
157 void *die_callback_userdata
;
162 pa_creds read_creds
, write_creds
;
163 int read_creds_valid
, send_creds_now
;
167 static int do_write(pa_pstream
*p
);
168 static int do_read(pa_pstream
*p
);
170 static void do_something(pa_pstream
*p
) {
172 assert(PA_REFCNT_VALUE(p
) > 0);
176 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
179 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
182 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
195 p
->die_callback(p
, p
->die_callback_userdata
);
200 static void io_callback(pa_iochannel
*io
, void *userdata
) {
201 pa_pstream
*p
= userdata
;
209 static void anotify_callback(uint8_t event
, void *userdata
) {
210 pa_pstream
*p
= userdata
;
216 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
218 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
225 p
= pa_xnew(pa_pstream
, 1);
228 pa_iochannel_set_callback(io
, io_callback
, p
);
231 p
->mutex
= pa_mutex_new(1);
232 p
->anotify
= pa_anotify_new(m
, anotify_callback
, p
);
236 p
->send_queue
= pa_queue_new();
237 assert(p
->send_queue
);
239 p
->write
.current
= NULL
;
241 pa_memchunk_reset(&p
->write
.memchunk
);
242 p
->read
.memblock
= NULL
;
243 p
->read
.packet
= NULL
;
246 p
->recieve_packet_callback
= NULL
;
247 p
->recieve_packet_callback_userdata
= NULL
;
248 p
->recieve_memblock_callback
= NULL
;
249 p
->recieve_memblock_callback_userdata
= NULL
;
250 p
->drain_callback
= NULL
;
251 p
->drain_callback_userdata
= NULL
;
252 p
->die_callback
= NULL
;
253 p
->die_callback_userdata
= NULL
;
260 /* We do importing unconditionally */
261 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
263 pa_iochannel_socket_set_rcvbuf(io
, 1024*8);
264 pa_iochannel_socket_set_sndbuf(io
, 1024*8);
267 p
->send_creds_now
= 0;
268 p
->read_creds_valid
= 0;
273 static void item_free(void *item
, PA_GCC_UNUSED
void *p
) {
274 struct item_info
*i
= item
;
277 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
278 assert(i
->chunk
.memblock
);
279 pa_memblock_unref(i
->chunk
.memblock
);
280 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
282 pa_packet_unref(i
->packet
);
288 static void pstream_free(pa_pstream
*p
) {
293 pa_queue_free(p
->send_queue
, item_free
, NULL
);
295 if (p
->write
.current
)
296 item_free(p
->write
.current
, NULL
);
298 if (p
->read
.memblock
)
299 pa_memblock_unref(p
->read
.memblock
);
302 pa_packet_unref(p
->read
.packet
);
304 if (p
->write
.memchunk
.memblock
)
305 pa_memblock_unref(p
->write
.memchunk
.memblock
);
308 pa_mutex_free(p
->mutex
);
311 pa_anotify_free(p
->anotify
);
316 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
320 assert(PA_REFCNT_VALUE(p
) > 0);
323 i
= pa_xnew(struct item_info
, 1);
324 i
->type
= PA_PSTREAM_ITEM_PACKET
;
325 i
->packet
= pa_packet_ref(packet
);
328 if ((i
->with_creds
= !!creds
))
332 pa_mutex_lock(p
->mutex
);
333 pa_queue_push(p
->send_queue
, i
);
334 pa_mutex_unlock(p
->mutex
);
336 pa_anotify_signal(p
->anotify
, 0);
339 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
343 assert(PA_REFCNT_VALUE(p
) > 0);
344 assert(channel
!= (uint32_t) -1);
353 i
= pa_xnew(struct item_info
, 1);
354 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
356 n
= length
< FRAME_SIZE_MAX_USE
? length
: FRAME_SIZE_MAX_USE
;
357 i
->chunk
.index
= chunk
->index
+ idx
;
359 i
->chunk
.memblock
= pa_memblock_ref(chunk
->memblock
);
361 i
->channel
= channel
;
363 i
->seek_mode
= seek_mode
;
368 pa_mutex_lock(p
->mutex
);
369 pa_queue_push(p
->send_queue
, i
);
370 pa_mutex_unlock(p
->mutex
);
376 pa_anotify_signal(p
->anotify
, 0);
379 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
380 struct item_info
*item
;
381 pa_pstream
*p
= userdata
;
384 assert(PA_REFCNT_VALUE(p
) > 0);
386 /* pa_log("Releasing block %u", block_id); */
388 item
= pa_xnew(struct item_info
, 1);
389 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
390 item
->block_id
= block_id
;
392 item
->with_creds
= 0;
395 pa_mutex_lock(p
->mutex
);
396 pa_queue_push(p
->send_queue
, item
);
397 pa_mutex_unlock(p
->mutex
);
399 pa_anotify_signal(p
->anotify
, 0);
402 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
403 struct item_info
*item
;
404 pa_pstream
*p
= userdata
;
407 assert(PA_REFCNT_VALUE(p
) > 0);
409 /* pa_log("Revoking block %u", block_id); */
411 item
= pa_xnew(struct item_info
, 1);
412 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
413 item
->block_id
= block_id
;
415 item
->with_creds
= 0;
418 pa_mutex_lock(p
->mutex
);
419 pa_queue_push(p
->send_queue
, item
);
420 pa_mutex_unlock(p
->mutex
);
422 pa_anotify_signal(p
->anotify
, 0);
425 static void prepare_next_write_item(pa_pstream
*p
) {
427 assert(PA_REFCNT_VALUE(p
) > 0);
429 pa_mutex_lock(p
->mutex
);
430 p
->write
.current
= pa_queue_pop(p
->send_queue
);
431 pa_mutex_unlock(p
->mutex
);
433 if (!p
->write
.current
)
437 p
->write
.data
= NULL
;
438 pa_memchunk_reset(&p
->write
.memchunk
);
440 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
441 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
442 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
443 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
444 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
446 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
448 assert(p
->write
.current
->packet
);
449 p
->write
.data
= p
->write
.current
->packet
->data
;
450 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
452 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
454 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
455 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
457 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
459 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
460 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
464 int send_payload
= 1;
466 assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
467 assert(p
->write
.current
->chunk
.memblock
);
469 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
470 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
471 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
473 flags
= p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
;
476 uint32_t block_id
, shm_id
;
477 size_t offset
, length
;
481 if (pa_memexport_put(p
->export
,
482 p
->write
.current
->chunk
.memblock
,
488 flags
|= PA_FLAG_SHMDATA
;
491 p
->write
.shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
492 p
->write
.shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
493 p
->write
.shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
494 p
->write
.shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
496 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(sizeof(p
->write
.shm_info
));
497 p
->write
.data
= p
->write
.shm_info
;
500 /* pa_log_warn("Failed to export memory block."); */
504 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
505 p
->write
.memchunk
= p
->write
.current
->chunk
;
506 pa_memblock_ref(p
->write
.memchunk
.memblock
);
507 p
->write
.data
= NULL
;
510 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
514 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
515 p
->write_creds
= p
->write
.current
->creds
;
519 static int do_write(pa_pstream
*p
) {
523 pa_memblock
*release_memblock
= NULL
;
526 assert(PA_REFCNT_VALUE(p
) > 0);
528 if (!p
->write
.current
)
529 prepare_next_write_item(p
);
531 if (!p
->write
.current
)
534 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
535 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
536 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
538 assert(p
->write
.data
|| p
->write
.memchunk
.memblock
);
543 d
= (uint8_t*) pa_memblock_acquire(p
->write
.memchunk
.memblock
) + p
->write
.memchunk
.index
;
544 release_memblock
= p
->write
.memchunk
.memblock
;
547 d
= (uint8_t*) d
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
548 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
554 if (p
->send_creds_now
) {
556 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
559 p
->send_creds_now
= 0;
563 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
566 if (release_memblock
)
567 pa_memblock_release(release_memblock
);
571 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
572 assert(p
->write
.current
);
573 item_free(p
->write
.current
, (void *) 1);
574 p
->write
.current
= NULL
;
576 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
577 p
->drain_callback(p
, p
->drain_callback_userdata
);
584 if (release_memblock
)
585 pa_memblock_release(release_memblock
);
590 static int do_read(pa_pstream
*p
) {
594 pa_memblock
*release_memblock
= NULL
;
597 assert(PA_REFCNT_VALUE(p
) > 0);
599 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
600 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
601 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
603 assert(p
->read
.data
|| p
->read
.memblock
);
608 d
= pa_memblock_acquire(p
->read
.memblock
);
609 release_memblock
= p
->read
.memblock
;
612 d
= (uint8_t*) d
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
613 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
620 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
623 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
626 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
630 if (release_memblock
)
631 pa_memblock_release(release_memblock
);
635 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
636 uint32_t flags
, length
, channel
;
637 /* Reading of frame descriptor complete */
639 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
641 if (!p
->import
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
642 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
646 if (flags
== PA_FLAG_SHMRELEASE
) {
648 /* This is a SHM memblock release frame with no payload */
650 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
653 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
657 } else if (flags
== PA_FLAG_SHMREVOKE
) {
659 /* This is a SHM memblock revoke frame with no payload */
661 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
664 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
669 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
671 if (length
> FRAME_SIZE_MAX_ALLOW
) {
672 pa_log_warn("Recieved invalid frame size : %lu", (unsigned long) length
);
676 assert(!p
->read
.packet
&& !p
->read
.memblock
);
678 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
680 if (channel
== (uint32_t) -1) {
683 pa_log_warn("Received packet frame with invalid flags value.");
687 /* Frame is a packet frame */
688 p
->read
.packet
= pa_packet_new(length
);
689 p
->read
.data
= p
->read
.packet
->data
;
693 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
694 pa_log_warn("Received memblock frame with invalid seek mode.");
698 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
700 if (length
!= sizeof(p
->read
.shm_info
)) {
701 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
705 /* Frame is a memblock frame referencing an SHM memblock */
706 p
->read
.data
= p
->read
.shm_info
;
708 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
710 /* Frame is a memblock frame */
712 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
716 pa_log_warn("Recieved memblock frame with invalid flags value.");
721 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
722 /* Frame payload available */
724 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) {
726 /* Is this memblock data? Than pass it to the user */
727 l
= (p
->read
.index
- r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
: (size_t) r
;
732 chunk
.memblock
= p
->read
.memblock
;
733 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
736 if (p
->recieve_memblock_callback
) {
740 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
741 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
743 p
->recieve_memblock_callback(
745 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
747 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
749 p
->recieve_memblock_callback_userdata
);
752 /* Drop seek info for following callbacks */
753 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
754 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
755 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
760 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
762 if (p
->read
.memblock
) {
764 /* This was a memblock frame. We can unref the memblock now */
765 pa_memblock_unref(p
->read
.memblock
);
767 } else if (p
->read
.packet
) {
769 if (p
->recieve_packet_callback
)
771 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->recieve_packet_callback_userdata
);
773 p
->recieve_packet_callback(p
, p
->read
.packet
, NULL
, p
->recieve_packet_callback_userdata
);
776 pa_packet_unref(p
->read
.packet
);
780 assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
784 if (!(b
= pa_memimport_get(p
->import
,
785 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
786 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
787 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
788 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
790 pa_log_warn("Failed to import memory block.");
794 if (p
->recieve_memblock_callback
) {
800 chunk
.length
= pa_memblock_get_length(b
);
803 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
804 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
806 p
->recieve_memblock_callback(
808 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
810 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
812 p
->recieve_memblock_callback_userdata
);
815 pa_memblock_unref(b
);
825 p
->read
.memblock
= NULL
;
826 p
->read
.packet
= NULL
;
831 p
->read_creds_valid
= 0;
837 if (release_memblock
)
838 pa_memblock_release(release_memblock
);
843 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
845 assert(PA_REFCNT_VALUE(p
) > 0);
847 p
->die_callback
= cb
;
848 p
->die_callback_userdata
= userdata
;
851 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
853 assert(PA_REFCNT_VALUE(p
) > 0);
855 p
->drain_callback
= cb
;
856 p
->drain_callback_userdata
= userdata
;
859 void pa_pstream_set_recieve_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
861 assert(PA_REFCNT_VALUE(p
) > 0);
863 p
->recieve_packet_callback
= cb
;
864 p
->recieve_packet_callback_userdata
= userdata
;
867 void pa_pstream_set_recieve_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
869 assert(PA_REFCNT_VALUE(p
) > 0);
871 p
->recieve_memblock_callback
= cb
;
872 p
->recieve_memblock_callback_userdata
= userdata
;
875 int pa_pstream_is_pending(pa_pstream
*p
) {
879 assert(PA_REFCNT_VALUE(p
) > 0);
881 pa_mutex_lock(p
->mutex
);
886 b
= p
->write
.current
|| !pa_queue_is_empty(p
->send_queue
);
888 pa_mutex_unlock(p
->mutex
);
893 void pa_pstream_unref(pa_pstream
*p
) {
895 assert(PA_REFCNT_VALUE(p
) > 0);
897 if (PA_REFCNT_DEC(p
) <= 0)
901 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
903 assert(PA_REFCNT_VALUE(p
) > 0);
909 void pa_pstream_close(pa_pstream
*p
) {
915 pa_memimport_free(p
->import
);
920 pa_memexport_free(p
->export
);
925 pa_iochannel_free(p
->io
);
929 p
->die_callback
= NULL
;
930 p
->drain_callback
= NULL
;
931 p
->recieve_packet_callback
= NULL
;
932 p
->recieve_memblock_callback
= NULL
;
935 void pa_pstream_use_shm(pa_pstream
*p
, int enable
) {
937 assert(PA_REFCNT_VALUE(p
) > 0);
944 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);
949 pa_memexport_free(p
->export
);