4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
45 #include <pulse/xmalloc.h>
47 #include <pulsecore/queue.h>
48 #include <pulsecore/log.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/creds.h>
51 #include <pulsecore/refcnt.h>
52 #include <pulsecore/flist.h>
56 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
57 #define PA_FLAG_SHMDATA 0x80000000LU
58 #define PA_FLAG_SHMRELEASE 0x40000000LU
59 #define PA_FLAG_SHMREVOKE 0xC0000000LU
60 #define PA_FLAG_SHMMASK 0xFF000000LU
61 #define PA_FLAG_SEEKMASK 0x000000FFLU
63 /* The sequence descriptor header consists of 5 32bit integers: */
65 PA_PSTREAM_DESCRIPTOR_LENGTH
,
66 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
68 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
69 PA_PSTREAM_DESCRIPTOR_FLAGS
,
70 PA_PSTREAM_DESCRIPTOR_MAX
73 /* If we have an SHM block, this info follows the descriptor */
75 PA_PSTREAM_SHM_BLOCKID
,
78 PA_PSTREAM_SHM_LENGTH
,
82 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
84 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
85 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
86 #define FRAME_SIZE_MAX_USE (1024*64)
88 PA_STATIC_FLIST_DECLARE(items
, 0, pa_xfree
);
92 PA_PSTREAM_ITEM_PACKET
,
93 PA_PSTREAM_ITEM_MEMBLOCK
,
94 PA_PSTREAM_ITEM_SHMRELEASE
,
95 PA_PSTREAM_ITEM_SHMREVOKE
109 pa_seek_mode_t seek_mode
;
111 /* release/revoke info */
118 pa_mainloop_api
*mainloop
;
119 pa_defer_event
*defer_event
;
122 pa_queue
*send_queue
;
127 pa_pstream_descriptor descriptor
;
128 struct item_info
* current
;
129 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
132 pa_memchunk memchunk
;
136 pa_pstream_descriptor descriptor
;
137 pa_memblock
*memblock
;
139 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
145 pa_memimport
*import
;
146 pa_memexport
*export
;
148 pa_pstream_packet_cb_t recieve_packet_callback
;
149 void *recieve_packet_callback_userdata
;
151 pa_pstream_memblock_cb_t recieve_memblock_callback
;
152 void *recieve_memblock_callback_userdata
;
154 pa_pstream_notify_cb_t drain_callback
;
155 void *drain_callback_userdata
;
157 pa_pstream_notify_cb_t die_callback
;
158 void *die_callback_userdata
;
160 pa_pstream_block_id_cb_t revoke_callback
;
161 void *revoke_callback_userdata
;
163 pa_pstream_block_id_cb_t release_callback
;
164 void *release_callback_userdata
;
169 pa_creds read_creds
, write_creds
;
170 int read_creds_valid
, send_creds_now
;
174 static int do_write(pa_pstream
*p
);
175 static int do_read(pa_pstream
*p
);
177 static void do_something(pa_pstream
*p
) {
179 pa_assert(PA_REFCNT_VALUE(p
) > 0);
183 p
->mainloop
->defer_enable(p
->defer_event
, 0);
185 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
188 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
191 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
202 p
->die_callback(p
, p
->die_callback_userdata
);
204 pa_pstream_unlink(p
);
208 static void io_callback(pa_iochannel
*io
, void *userdata
) {
209 pa_pstream
*p
= userdata
;
212 pa_assert(PA_REFCNT_VALUE(p
) > 0);
213 pa_assert(p
->io
== io
);
218 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
219 pa_pstream
*p
= userdata
;
222 pa_assert(PA_REFCNT_VALUE(p
) > 0);
223 pa_assert(p
->defer_event
== e
);
224 pa_assert(p
->mainloop
== m
);
229 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
231 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
238 p
= pa_xnew(pa_pstream
, 1);
241 pa_iochannel_set_callback(io
, io_callback
, p
);
245 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
246 m
->defer_enable(p
->defer_event
, 0);
248 p
->send_queue
= pa_queue_new();
250 p
->write
.current
= NULL
;
252 pa_memchunk_reset(&p
->write
.memchunk
);
253 p
->read
.memblock
= NULL
;
254 p
->read
.packet
= NULL
;
257 p
->recieve_packet_callback
= NULL
;
258 p
->recieve_packet_callback_userdata
= NULL
;
259 p
->recieve_memblock_callback
= NULL
;
260 p
->recieve_memblock_callback_userdata
= NULL
;
261 p
->drain_callback
= NULL
;
262 p
->drain_callback_userdata
= NULL
;
263 p
->die_callback
= NULL
;
264 p
->die_callback_userdata
= NULL
;
265 p
->revoke_callback
= NULL
;
266 p
->revoke_callback_userdata
= NULL
;
267 p
->release_callback
= NULL
;
268 p
->release_callback_userdata
= NULL
;
275 /* We do importing unconditionally */
276 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
278 pa_iochannel_socket_set_rcvbuf(io
, 1024*8);
279 pa_iochannel_socket_set_sndbuf(io
, 1024*8);
282 p
->send_creds_now
= 0;
283 p
->read_creds_valid
= 0;
288 static void item_free(void *item
, PA_GCC_UNUSED
void *q
) {
289 struct item_info
*i
= item
;
292 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
293 pa_assert(i
->chunk
.memblock
);
294 pa_memblock_unref(i
->chunk
.memblock
);
295 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
296 pa_assert(i
->packet
);
297 pa_packet_unref(i
->packet
);
300 if (pa_flist_push(PA_STATIC_FLIST_GET(items
), i
) < 0)
304 static void pstream_free(pa_pstream
*p
) {
307 pa_pstream_unlink(p
);
309 pa_queue_free(p
->send_queue
, item_free
, NULL
);
311 if (p
->write
.current
)
312 item_free(p
->write
.current
, NULL
);
314 if (p
->write
.memchunk
.memblock
)
315 pa_memblock_unref(p
->write
.memchunk
.memblock
);
317 if (p
->read
.memblock
)
318 pa_memblock_unref(p
->read
.memblock
);
321 pa_packet_unref(p
->read
.packet
);
326 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
330 pa_assert(PA_REFCNT_VALUE(p
) > 0);
336 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
337 i
= pa_xnew(struct item_info
, 1);
339 i
->type
= PA_PSTREAM_ITEM_PACKET
;
340 i
->packet
= pa_packet_ref(packet
);
343 if ((i
->with_creds
= !!creds
))
347 pa_queue_push(p
->send_queue
, i
);
349 p
->mainloop
->defer_enable(p
->defer_event
, 1);
352 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
356 pa_assert(PA_REFCNT_VALUE(p
) > 0);
357 pa_assert(channel
!= (uint32_t) -1);
364 length
= chunk
->length
;
370 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
371 i
= pa_xnew(struct item_info
, 1);
372 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
374 n
= length
< FRAME_SIZE_MAX_USE
? length
: FRAME_SIZE_MAX_USE
;
375 i
->chunk
.index
= chunk
->index
+ idx
;
377 i
->chunk
.memblock
= pa_memblock_ref(chunk
->memblock
);
379 i
->channel
= channel
;
381 i
->seek_mode
= seek_mode
;
386 pa_queue_push(p
->send_queue
, i
);
392 p
->mainloop
->defer_enable(p
->defer_event
, 1);
395 void pa_pstream_send_release(pa_pstream
*p
, uint32_t block_id
) {
396 struct item_info
*item
;
398 pa_assert(PA_REFCNT_VALUE(p
) > 0);
403 /* pa_log("Releasing block %u", block_id); */
405 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
406 item
= pa_xnew(struct item_info
, 1);
407 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
408 item
->block_id
= block_id
;
410 item
->with_creds
= 0;
413 pa_queue_push(p
->send_queue
, item
);
414 p
->mainloop
->defer_enable(p
->defer_event
, 1);
417 /* might be called from thread context */
418 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
419 pa_pstream
*p
= userdata
;
422 pa_assert(PA_REFCNT_VALUE(p
) > 0);
427 if (p
->release_callback
)
428 p
->release_callback(p
, block_id
, p
->release_callback_userdata
);
430 pa_pstream_send_release(p
, block_id
);
433 void pa_pstream_send_revoke(pa_pstream
*p
, uint32_t block_id
) {
434 struct item_info
*item
;
436 pa_assert(PA_REFCNT_VALUE(p
) > 0);
440 /* pa_log("Revoking block %u", block_id); */
442 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
443 item
= pa_xnew(struct item_info
, 1);
444 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
445 item
->block_id
= block_id
;
447 item
->with_creds
= 0;
450 pa_queue_push(p
->send_queue
, item
);
451 p
->mainloop
->defer_enable(p
->defer_event
, 1);
454 /* might be called from thread context */
455 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
456 pa_pstream
*p
= userdata
;
459 pa_assert(PA_REFCNT_VALUE(p
) > 0);
461 if (p
->revoke_callback
)
462 p
->revoke_callback(p
, block_id
, p
->revoke_callback_userdata
);
464 pa_pstream_send_revoke(p
, block_id
);
467 static void prepare_next_write_item(pa_pstream
*p
) {
469 pa_assert(PA_REFCNT_VALUE(p
) > 0);
471 p
->write
.current
= pa_queue_pop(p
->send_queue
);
473 if (!p
->write
.current
)
477 p
->write
.data
= NULL
;
478 pa_memchunk_reset(&p
->write
.memchunk
);
480 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
481 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
482 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
483 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
484 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
486 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
488 pa_assert(p
->write
.current
->packet
);
489 p
->write
.data
= p
->write
.current
->packet
->data
;
490 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
492 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
494 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
495 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
497 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
499 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
500 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
504 int send_payload
= 1;
506 pa_assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
507 pa_assert(p
->write
.current
->chunk
.memblock
);
509 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
510 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
511 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
513 flags
= p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
;
516 uint32_t block_id
, shm_id
;
517 size_t offset
, length
;
519 pa_assert(p
->export
);
521 if (pa_memexport_put(p
->export
,
522 p
->write
.current
->chunk
.memblock
,
528 flags
|= PA_FLAG_SHMDATA
;
531 p
->write
.shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
532 p
->write
.shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
533 p
->write
.shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
534 p
->write
.shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
536 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(sizeof(p
->write
.shm_info
));
537 p
->write
.data
= p
->write
.shm_info
;
540 /* pa_log_warn("Failed to export memory block."); */
544 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
545 p
->write
.memchunk
= p
->write
.current
->chunk
;
546 pa_memblock_ref(p
->write
.memchunk
.memblock
);
547 p
->write
.data
= NULL
;
550 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
554 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
555 p
->write_creds
= p
->write
.current
->creds
;
559 static int do_write(pa_pstream
*p
) {
563 pa_memblock
*release_memblock
= NULL
;
566 pa_assert(PA_REFCNT_VALUE(p
) > 0);
568 if (!p
->write
.current
)
569 prepare_next_write_item(p
);
571 if (!p
->write
.current
)
574 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
575 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
576 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
578 pa_assert(p
->write
.data
|| p
->write
.memchunk
.memblock
);
583 d
= (uint8_t*) pa_memblock_acquire(p
->write
.memchunk
.memblock
) + p
->write
.memchunk
.index
;
584 release_memblock
= p
->write
.memchunk
.memblock
;
587 d
= (uint8_t*) d
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
588 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
594 if (p
->send_creds_now
) {
596 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
599 p
->send_creds_now
= 0;
603 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
606 if (release_memblock
)
607 pa_memblock_release(release_memblock
);
611 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
612 pa_assert(p
->write
.current
);
613 item_free(p
->write
.current
, NULL
);
614 p
->write
.current
= NULL
;
616 if (p
->write
.memchunk
.memblock
)
617 pa_memblock_unref(p
->write
.memchunk
.memblock
);
619 pa_memchunk_reset(&p
->write
.memchunk
);
621 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
622 p
->drain_callback(p
, p
->drain_callback_userdata
);
629 if (release_memblock
)
630 pa_memblock_release(release_memblock
);
635 static int do_read(pa_pstream
*p
) {
639 pa_memblock
*release_memblock
= NULL
;
641 pa_assert(PA_REFCNT_VALUE(p
) > 0);
643 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
644 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
645 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
647 pa_assert(p
->read
.data
|| p
->read
.memblock
);
652 d
= pa_memblock_acquire(p
->read
.memblock
);
653 release_memblock
= p
->read
.memblock
;
656 d
= (uint8_t*) d
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
657 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
664 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
667 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
670 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
674 if (release_memblock
)
675 pa_memblock_release(release_memblock
);
679 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
680 uint32_t flags
, length
, channel
;
681 /* Reading of frame descriptor complete */
683 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
685 if (!p
->use_shm
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
686 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
690 if (flags
== PA_FLAG_SHMRELEASE
) {
692 /* This is a SHM memblock release frame with no payload */
694 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
696 pa_assert(p
->export
);
697 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
701 } else if (flags
== PA_FLAG_SHMREVOKE
) {
703 /* This is a SHM memblock revoke frame with no payload */
705 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
707 pa_assert(p
->import
);
708 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
713 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
715 if (length
> FRAME_SIZE_MAX_ALLOW
|| length
<= 0) {
716 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length
);
720 pa_assert(!p
->read
.packet
&& !p
->read
.memblock
);
722 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
724 if (channel
== (uint32_t) -1) {
727 pa_log_warn("Received packet frame with invalid flags value.");
731 /* Frame is a packet frame */
732 p
->read
.packet
= pa_packet_new(length
);
733 p
->read
.data
= p
->read
.packet
->data
;
737 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
738 pa_log_warn("Received memblock frame with invalid seek mode.");
742 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
744 if (length
!= sizeof(p
->read
.shm_info
)) {
745 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
749 /* Frame is a memblock frame referencing an SHM memblock */
750 p
->read
.data
= p
->read
.shm_info
;
752 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
754 /* Frame is a memblock frame */
756 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
760 pa_log_warn("Recieved memblock frame with invalid flags value.");
765 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
766 /* Frame payload available */
768 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) {
770 /* Is this memblock data? Than pass it to the user */
771 l
= (p
->read
.index
- r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
: (size_t) r
;
776 chunk
.memblock
= p
->read
.memblock
;
777 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
780 if (p
->recieve_memblock_callback
) {
784 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
785 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
787 p
->recieve_memblock_callback(
789 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
791 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
793 p
->recieve_memblock_callback_userdata
);
796 /* Drop seek info for following callbacks */
797 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
798 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
799 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
804 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
806 if (p
->read
.memblock
) {
808 /* This was a memblock frame. We can unref the memblock now */
809 pa_memblock_unref(p
->read
.memblock
);
811 } else if (p
->read
.packet
) {
813 if (p
->recieve_packet_callback
)
815 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->recieve_packet_callback_userdata
);
817 p
->recieve_packet_callback(p
, p
->read
.packet
, NULL
, p
->recieve_packet_callback_userdata
);
820 pa_packet_unref(p
->read
.packet
);
824 pa_assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
826 pa_assert(p
->import
);
828 if (!(b
= pa_memimport_get(p
->import
,
829 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
830 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
831 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
832 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
834 pa_log_warn("Failed to import memory block.");
838 if (p
->recieve_memblock_callback
) {
844 chunk
.length
= pa_memblock_get_length(b
);
847 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
848 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
850 p
->recieve_memblock_callback(
852 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
854 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
856 p
->recieve_memblock_callback_userdata
);
859 pa_memblock_unref(b
);
869 p
->read
.memblock
= NULL
;
870 p
->read
.packet
= NULL
;
875 p
->read_creds_valid
= 0;
881 if (release_memblock
)
882 pa_memblock_release(release_memblock
);
887 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
889 pa_assert(PA_REFCNT_VALUE(p
) > 0);
891 p
->die_callback
= cb
;
892 p
->die_callback_userdata
= userdata
;
895 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
897 pa_assert(PA_REFCNT_VALUE(p
) > 0);
899 p
->drain_callback
= cb
;
900 p
->drain_callback_userdata
= userdata
;
903 void pa_pstream_set_recieve_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
905 pa_assert(PA_REFCNT_VALUE(p
) > 0);
907 p
->recieve_packet_callback
= cb
;
908 p
->recieve_packet_callback_userdata
= userdata
;
911 void pa_pstream_set_recieve_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
913 pa_assert(PA_REFCNT_VALUE(p
) > 0);
915 p
->recieve_memblock_callback
= cb
;
916 p
->recieve_memblock_callback_userdata
= userdata
;
919 void pa_pstream_set_release_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
921 pa_assert(PA_REFCNT_VALUE(p
) > 0);
923 p
->release_callback
= cb
;
924 p
->release_callback_userdata
= userdata
;
927 void pa_pstream_set_revoke_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
929 pa_assert(PA_REFCNT_VALUE(p
) > 0);
931 p
->release_callback
= cb
;
932 p
->release_callback_userdata
= userdata
;
935 int pa_pstream_is_pending(pa_pstream
*p
) {
939 pa_assert(PA_REFCNT_VALUE(p
) > 0);
944 b
= p
->write
.current
|| !pa_queue_is_empty(p
->send_queue
);
949 void pa_pstream_unref(pa_pstream
*p
) {
951 pa_assert(PA_REFCNT_VALUE(p
) > 0);
953 if (PA_REFCNT_DEC(p
) <= 0)
957 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
959 pa_assert(PA_REFCNT_VALUE(p
) > 0);
965 void pa_pstream_unlink(pa_pstream
*p
) {
974 pa_memimport_free(p
->import
);
979 pa_memexport_free(p
->export
);
984 pa_iochannel_free(p
->io
);
988 if (p
->defer_event
) {
989 p
->mainloop
->defer_free(p
->defer_event
);
990 p
->defer_event
= NULL
;
993 p
->die_callback
= NULL
;
994 p
->drain_callback
= NULL
;
995 p
->recieve_packet_callback
= NULL
;
996 p
->recieve_memblock_callback
= NULL
;
999 void pa_pstream_use_shm(pa_pstream
*p
, int enable
) {
1001 pa_assert(PA_REFCNT_VALUE(p
) > 0);
1003 p
->use_shm
= enable
;
1008 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);
1013 pa_memexport_free(p
->export
);