4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
44 #include <pulse/xmalloc.h>
46 #include <pulsecore/winsock.h>
47 #include <pulsecore/queue.h>
48 #include <pulsecore/log.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/creds.h>
51 #include <pulsecore/refcnt.h>
52 #include <pulsecore/flist.h>
53 #include <pulsecore/macro.h>
57 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
58 #define PA_FLAG_SHMDATA 0x80000000LU
59 #define PA_FLAG_SHMRELEASE 0x40000000LU
60 #define PA_FLAG_SHMREVOKE 0xC0000000LU
61 #define PA_FLAG_SHMMASK 0xFF000000LU
62 #define PA_FLAG_SEEKMASK 0x000000FFLU
64 /* The sequence descriptor header consists of 5 32bit integers: */
66 PA_PSTREAM_DESCRIPTOR_LENGTH
,
67 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
68 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
69 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
70 PA_PSTREAM_DESCRIPTOR_FLAGS
,
71 PA_PSTREAM_DESCRIPTOR_MAX
74 /* If we have an SHM block, this info follows the descriptor */
76 PA_PSTREAM_SHM_BLOCKID
,
79 PA_PSTREAM_SHM_LENGTH
,
83 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
85 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
86 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
88 PA_STATIC_FLIST_DECLARE(items
, 0, pa_xfree
);
92 PA_PSTREAM_ITEM_PACKET
,
93 PA_PSTREAM_ITEM_MEMBLOCK
,
94 PA_PSTREAM_ITEM_SHMRELEASE
,
95 PA_PSTREAM_ITEM_SHMREVOKE
109 pa_seek_mode_t seek_mode
;
111 /* release/revoke info */
118 pa_mainloop_api
*mainloop
;
119 pa_defer_event
*defer_event
;
122 pa_queue
*send_queue
;
127 pa_pstream_descriptor descriptor
;
128 struct item_info
* current
;
129 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
132 pa_memchunk memchunk
;
136 pa_pstream_descriptor descriptor
;
137 pa_memblock
*memblock
;
139 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
145 pa_memimport
*import
;
146 pa_memexport
*export
;
148 pa_pstream_packet_cb_t recieve_packet_callback
;
149 void *recieve_packet_callback_userdata
;
151 pa_pstream_memblock_cb_t recieve_memblock_callback
;
152 void *recieve_memblock_callback_userdata
;
154 pa_pstream_notify_cb_t drain_callback
;
155 void *drain_callback_userdata
;
157 pa_pstream_notify_cb_t die_callback
;
158 void *die_callback_userdata
;
160 pa_pstream_block_id_cb_t revoke_callback
;
161 void *revoke_callback_userdata
;
163 pa_pstream_block_id_cb_t release_callback
;
164 void *release_callback_userdata
;
169 pa_creds read_creds
, write_creds
;
170 int read_creds_valid
, send_creds_now
;
174 static int do_write(pa_pstream
*p
);
175 static int do_read(pa_pstream
*p
);
177 static void do_something(pa_pstream
*p
) {
179 pa_assert(PA_REFCNT_VALUE(p
) > 0);
183 p
->mainloop
->defer_enable(p
->defer_event
, 0);
185 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
188 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
191 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
202 p
->die_callback(p
, p
->die_callback_userdata
);
204 pa_pstream_unlink(p
);
208 static void io_callback(pa_iochannel
*io
, void *userdata
) {
209 pa_pstream
*p
= userdata
;
212 pa_assert(PA_REFCNT_VALUE(p
) > 0);
213 pa_assert(p
->io
== io
);
218 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
219 pa_pstream
*p
= userdata
;
222 pa_assert(PA_REFCNT_VALUE(p
) > 0);
223 pa_assert(p
->defer_event
== e
);
224 pa_assert(p
->mainloop
== m
);
229 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
231 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
238 p
= pa_xnew(pa_pstream
, 1);
241 pa_iochannel_set_callback(io
, io_callback
, p
);
245 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
246 m
->defer_enable(p
->defer_event
, 0);
248 p
->send_queue
= pa_queue_new();
250 p
->write
.current
= NULL
;
252 pa_memchunk_reset(&p
->write
.memchunk
);
253 p
->read
.memblock
= NULL
;
254 p
->read
.packet
= NULL
;
257 p
->recieve_packet_callback
= NULL
;
258 p
->recieve_packet_callback_userdata
= NULL
;
259 p
->recieve_memblock_callback
= NULL
;
260 p
->recieve_memblock_callback_userdata
= NULL
;
261 p
->drain_callback
= NULL
;
262 p
->drain_callback_userdata
= NULL
;
263 p
->die_callback
= NULL
;
264 p
->die_callback_userdata
= NULL
;
265 p
->revoke_callback
= NULL
;
266 p
->revoke_callback_userdata
= NULL
;
267 p
->release_callback
= NULL
;
268 p
->release_callback_userdata
= NULL
;
275 /* We do importing unconditionally */
276 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
278 pa_iochannel_socket_set_rcvbuf(io
, 1024*8);
279 pa_iochannel_socket_set_sndbuf(io
, 1024*8);
282 p
->send_creds_now
= 0;
283 p
->read_creds_valid
= 0;
288 static void item_free(void *item
, PA_GCC_UNUSED
void *q
) {
289 struct item_info
*i
= item
;
292 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
293 pa_assert(i
->chunk
.memblock
);
294 pa_memblock_unref(i
->chunk
.memblock
);
295 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
296 pa_assert(i
->packet
);
297 pa_packet_unref(i
->packet
);
300 if (pa_flist_push(PA_STATIC_FLIST_GET(items
), i
) < 0)
304 static void pstream_free(pa_pstream
*p
) {
307 pa_pstream_unlink(p
);
309 pa_queue_free(p
->send_queue
, item_free
, NULL
);
311 if (p
->write
.current
)
312 item_free(p
->write
.current
, NULL
);
314 if (p
->write
.memchunk
.memblock
)
315 pa_memblock_unref(p
->write
.memchunk
.memblock
);
317 if (p
->read
.memblock
)
318 pa_memblock_unref(p
->read
.memblock
);
321 pa_packet_unref(p
->read
.packet
);
326 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
330 pa_assert(PA_REFCNT_VALUE(p
) > 0);
336 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
337 i
= pa_xnew(struct item_info
, 1);
339 i
->type
= PA_PSTREAM_ITEM_PACKET
;
340 i
->packet
= pa_packet_ref(packet
);
343 if ((i
->with_creds
= !!creds
))
347 pa_queue_push(p
->send_queue
, i
);
349 p
->mainloop
->defer_enable(p
->defer_event
, 1);
352 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
357 pa_assert(PA_REFCNT_VALUE(p
) > 0);
358 pa_assert(channel
!= (uint32_t) -1);
365 length
= chunk
->length
;
367 bsm
= pa_mempool_block_size_max(p
->mempool
);
373 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
374 i
= pa_xnew(struct item_info
, 1);
375 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
377 n
= MIN(length
, bsm
);
378 i
->chunk
.index
= chunk
->index
+ idx
;
380 i
->chunk
.memblock
= pa_memblock_ref(chunk
->memblock
);
382 i
->channel
= channel
;
384 i
->seek_mode
= seek_mode
;
389 pa_queue_push(p
->send_queue
, i
);
395 p
->mainloop
->defer_enable(p
->defer_event
, 1);
398 void pa_pstream_send_release(pa_pstream
*p
, uint32_t block_id
) {
399 struct item_info
*item
;
401 pa_assert(PA_REFCNT_VALUE(p
) > 0);
406 /* pa_log("Releasing block %u", block_id); */
408 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
409 item
= pa_xnew(struct item_info
, 1);
410 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
411 item
->block_id
= block_id
;
413 item
->with_creds
= 0;
416 pa_queue_push(p
->send_queue
, item
);
417 p
->mainloop
->defer_enable(p
->defer_event
, 1);
420 /* might be called from thread context */
421 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
422 pa_pstream
*p
= userdata
;
425 pa_assert(PA_REFCNT_VALUE(p
) > 0);
430 if (p
->release_callback
)
431 p
->release_callback(p
, block_id
, p
->release_callback_userdata
);
433 pa_pstream_send_release(p
, block_id
);
436 void pa_pstream_send_revoke(pa_pstream
*p
, uint32_t block_id
) {
437 struct item_info
*item
;
439 pa_assert(PA_REFCNT_VALUE(p
) > 0);
443 /* pa_log("Revoking block %u", block_id); */
445 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
446 item
= pa_xnew(struct item_info
, 1);
447 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
448 item
->block_id
= block_id
;
450 item
->with_creds
= 0;
453 pa_queue_push(p
->send_queue
, item
);
454 p
->mainloop
->defer_enable(p
->defer_event
, 1);
457 /* might be called from thread context */
458 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
459 pa_pstream
*p
= userdata
;
462 pa_assert(PA_REFCNT_VALUE(p
) > 0);
464 if (p
->revoke_callback
)
465 p
->revoke_callback(p
, block_id
, p
->revoke_callback_userdata
);
467 pa_pstream_send_revoke(p
, block_id
);
470 static void prepare_next_write_item(pa_pstream
*p
) {
472 pa_assert(PA_REFCNT_VALUE(p
) > 0);
474 p
->write
.current
= pa_queue_pop(p
->send_queue
);
476 if (!p
->write
.current
)
480 p
->write
.data
= NULL
;
481 pa_memchunk_reset(&p
->write
.memchunk
);
483 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
484 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
485 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
486 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
487 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
489 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
491 pa_assert(p
->write
.current
->packet
);
492 p
->write
.data
= p
->write
.current
->packet
->data
;
493 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
495 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
497 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
498 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
500 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
502 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
503 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
507 int send_payload
= 1;
509 pa_assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
510 pa_assert(p
->write
.current
->chunk
.memblock
);
512 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
513 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
514 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
516 flags
= p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
;
519 uint32_t block_id
, shm_id
;
520 size_t offset
, length
;
522 pa_assert(p
->export
);
524 if (pa_memexport_put(p
->export
,
525 p
->write
.current
->chunk
.memblock
,
531 flags
|= PA_FLAG_SHMDATA
;
534 p
->write
.shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
535 p
->write
.shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
536 p
->write
.shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
537 p
->write
.shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
539 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(sizeof(p
->write
.shm_info
));
540 p
->write
.data
= p
->write
.shm_info
;
543 /* pa_log_warn("Failed to export memory block."); */
547 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
548 p
->write
.memchunk
= p
->write
.current
->chunk
;
549 pa_memblock_ref(p
->write
.memchunk
.memblock
);
550 p
->write
.data
= NULL
;
553 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
557 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
558 p
->write_creds
= p
->write
.current
->creds
;
562 static int do_write(pa_pstream
*p
) {
566 pa_memblock
*release_memblock
= NULL
;
569 pa_assert(PA_REFCNT_VALUE(p
) > 0);
571 if (!p
->write
.current
)
572 prepare_next_write_item(p
);
574 if (!p
->write
.current
)
577 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
578 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
579 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
581 pa_assert(p
->write
.data
|| p
->write
.memchunk
.memblock
);
586 d
= (uint8_t*) pa_memblock_acquire(p
->write
.memchunk
.memblock
) + p
->write
.memchunk
.index
;
587 release_memblock
= p
->write
.memchunk
.memblock
;
590 d
= (uint8_t*) d
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
591 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
597 if (p
->send_creds_now
) {
599 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
602 p
->send_creds_now
= 0;
606 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
609 if (release_memblock
)
610 pa_memblock_release(release_memblock
);
614 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
615 pa_assert(p
->write
.current
);
616 item_free(p
->write
.current
, NULL
);
617 p
->write
.current
= NULL
;
619 if (p
->write
.memchunk
.memblock
)
620 pa_memblock_unref(p
->write
.memchunk
.memblock
);
622 pa_memchunk_reset(&p
->write
.memchunk
);
624 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
625 p
->drain_callback(p
, p
->drain_callback_userdata
);
632 if (release_memblock
)
633 pa_memblock_release(release_memblock
);
638 static int do_read(pa_pstream
*p
) {
642 pa_memblock
*release_memblock
= NULL
;
644 pa_assert(PA_REFCNT_VALUE(p
) > 0);
646 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
647 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
648 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
650 pa_assert(p
->read
.data
|| p
->read
.memblock
);
655 d
= pa_memblock_acquire(p
->read
.memblock
);
656 release_memblock
= p
->read
.memblock
;
659 d
= (uint8_t*) d
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
660 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
667 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
670 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
673 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
677 if (release_memblock
)
678 pa_memblock_release(release_memblock
);
682 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
683 uint32_t flags
, length
, channel
;
684 /* Reading of frame descriptor complete */
686 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
688 if (!p
->use_shm
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
689 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
693 if (flags
== PA_FLAG_SHMRELEASE
) {
695 /* This is a SHM memblock release frame with no payload */
697 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
699 pa_assert(p
->export
);
700 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
704 } else if (flags
== PA_FLAG_SHMREVOKE
) {
706 /* This is a SHM memblock revoke frame with no payload */
708 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
710 pa_assert(p
->import
);
711 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
716 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
718 if (length
> FRAME_SIZE_MAX_ALLOW
|| length
<= 0) {
719 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length
);
723 pa_assert(!p
->read
.packet
&& !p
->read
.memblock
);
725 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
727 if (channel
== (uint32_t) -1) {
730 pa_log_warn("Received packet frame with invalid flags value.");
734 /* Frame is a packet frame */
735 p
->read
.packet
= pa_packet_new(length
);
736 p
->read
.data
= p
->read
.packet
->data
;
740 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
741 pa_log_warn("Received memblock frame with invalid seek mode.");
745 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
747 if (length
!= sizeof(p
->read
.shm_info
)) {
748 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
752 /* Frame is a memblock frame referencing an SHM memblock */
753 p
->read
.data
= p
->read
.shm_info
;
755 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
757 /* Frame is a memblock frame */
759 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
763 pa_log_warn("Recieved memblock frame with invalid flags value.");
768 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
769 /* Frame payload available */
771 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) {
773 /* Is this memblock data? Than pass it to the user */
774 l
= (p
->read
.index
- r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
: (size_t) r
;
779 chunk
.memblock
= p
->read
.memblock
;
780 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
783 if (p
->recieve_memblock_callback
) {
787 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
788 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
790 p
->recieve_memblock_callback(
792 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
794 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
796 p
->recieve_memblock_callback_userdata
);
799 /* Drop seek info for following callbacks */
800 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
801 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
802 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
807 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
809 if (p
->read
.memblock
) {
811 /* This was a memblock frame. We can unref the memblock now */
812 pa_memblock_unref(p
->read
.memblock
);
814 } else if (p
->read
.packet
) {
816 if (p
->recieve_packet_callback
)
818 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->recieve_packet_callback_userdata
);
820 p
->recieve_packet_callback(p
, p
->read
.packet
, NULL
, p
->recieve_packet_callback_userdata
);
823 pa_packet_unref(p
->read
.packet
);
827 pa_assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
829 pa_assert(p
->import
);
831 if (!(b
= pa_memimport_get(p
->import
,
832 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
833 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
834 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
835 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
837 pa_log_warn("Failed to import memory block.");
841 if (p
->recieve_memblock_callback
) {
847 chunk
.length
= pa_memblock_get_length(b
);
850 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
851 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
853 p
->recieve_memblock_callback(
855 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
857 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
859 p
->recieve_memblock_callback_userdata
);
862 pa_memblock_unref(b
);
872 p
->read
.memblock
= NULL
;
873 p
->read
.packet
= NULL
;
878 p
->read_creds_valid
= 0;
884 if (release_memblock
)
885 pa_memblock_release(release_memblock
);
890 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
892 pa_assert(PA_REFCNT_VALUE(p
) > 0);
894 p
->die_callback
= cb
;
895 p
->die_callback_userdata
= userdata
;
898 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
900 pa_assert(PA_REFCNT_VALUE(p
) > 0);
902 p
->drain_callback
= cb
;
903 p
->drain_callback_userdata
= userdata
;
906 void pa_pstream_set_recieve_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
908 pa_assert(PA_REFCNT_VALUE(p
) > 0);
910 p
->recieve_packet_callback
= cb
;
911 p
->recieve_packet_callback_userdata
= userdata
;
914 void pa_pstream_set_recieve_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
916 pa_assert(PA_REFCNT_VALUE(p
) > 0);
918 p
->recieve_memblock_callback
= cb
;
919 p
->recieve_memblock_callback_userdata
= userdata
;
922 void pa_pstream_set_release_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
924 pa_assert(PA_REFCNT_VALUE(p
) > 0);
926 p
->release_callback
= cb
;
927 p
->release_callback_userdata
= userdata
;
930 void pa_pstream_set_revoke_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
932 pa_assert(PA_REFCNT_VALUE(p
) > 0);
934 p
->release_callback
= cb
;
935 p
->release_callback_userdata
= userdata
;
938 int pa_pstream_is_pending(pa_pstream
*p
) {
942 pa_assert(PA_REFCNT_VALUE(p
) > 0);
947 b
= p
->write
.current
|| !pa_queue_is_empty(p
->send_queue
);
952 void pa_pstream_unref(pa_pstream
*p
) {
954 pa_assert(PA_REFCNT_VALUE(p
) > 0);
956 if (PA_REFCNT_DEC(p
) <= 0)
960 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
962 pa_assert(PA_REFCNT_VALUE(p
) > 0);
968 void pa_pstream_unlink(pa_pstream
*p
) {
977 pa_memimport_free(p
->import
);
982 pa_memexport_free(p
->export
);
987 pa_iochannel_free(p
->io
);
991 if (p
->defer_event
) {
992 p
->mainloop
->defer_free(p
->defer_event
);
993 p
->defer_event
= NULL
;
996 p
->die_callback
= NULL
;
997 p
->drain_callback
= NULL
;
998 p
->recieve_packet_callback
= NULL
;
999 p
->recieve_memblock_callback
= NULL
;
1002 void pa_pstream_use_shm(pa_pstream
*p
, int enable
) {
1004 pa_assert(PA_REFCNT_VALUE(p
) > 0);
1006 p
->use_shm
= enable
;
1011 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);
1016 pa_memexport_free(p
->export
);