]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
remove all occurences of
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <assert.h>
29 #include <unistd.h>
30
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
33 #endif
34 #ifdef HAVE_SYS_UN_H
35 #include <sys/un.h>
36 #endif
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #endif
40
41 #include "winsock.h"
42
43 #include <pulse/xmalloc.h>
44
45 #include <pulsecore/queue.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-scache.h>
48 #include <pulsecore/creds.h>
49
50 #include "pstream.h"
51
52 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
53 #define PA_FLAG_SHMDATA 0x80000000LU
54 #define PA_FLAG_SHMRELEASE 0x40000000LU
55 #define PA_FLAG_SHMREVOKE 0xC0000000LU
56 #define PA_FLAG_SHMMASK 0xFF000000LU
57 #define PA_FLAG_SEEKMASK 0x000000FFLU
58
59 /* The sequence descriptor header consists of 5 32bit integers: */
60 enum {
61 PA_PSTREAM_DESCRIPTOR_LENGTH,
62 PA_PSTREAM_DESCRIPTOR_CHANNEL,
63 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
64 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
65 PA_PSTREAM_DESCRIPTOR_FLAGS,
66 PA_PSTREAM_DESCRIPTOR_MAX
67 };
68
69 /* If we have an SHM block, this info follows the descriptor */
70 enum {
71 PA_PSTREAM_SHM_BLOCKID,
72 PA_PSTREAM_SHM_SHMID,
73 PA_PSTREAM_SHM_INDEX,
74 PA_PSTREAM_SHM_LENGTH,
75 PA_PSTREAM_SHM_MAX
76 };
77
78 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
79
80 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
81 #define FRAME_SIZE_MAX PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
82
83 struct item_info {
84 enum {
85 PA_PSTREAM_ITEM_PACKET,
86 PA_PSTREAM_ITEM_MEMBLOCK,
87 PA_PSTREAM_ITEM_SHMRELEASE,
88 PA_PSTREAM_ITEM_SHMREVOKE
89 } type;
90
91
92 /* packet info */
93 pa_packet *packet;
94 #ifdef HAVE_CREDS
95 int with_creds;
96 pa_creds creds;
97 #endif
98
99 /* memblock info */
100 pa_memchunk chunk;
101 uint32_t channel;
102 int64_t offset;
103 pa_seek_mode_t seek_mode;
104
105 /* release/revoke info */
106 uint32_t block_id;
107 };
108
109 struct pa_pstream {
110 int ref;
111
112 pa_mainloop_api *mainloop;
113 pa_defer_event *defer_event;
114 pa_iochannel *io;
115 pa_queue *send_queue;
116
117 int dead;
118
119 struct {
120 pa_pstream_descriptor descriptor;
121 struct item_info* current;
122 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
123 void *data;
124 size_t index;
125 } write;
126
127 struct {
128 pa_pstream_descriptor descriptor;
129 pa_memblock *memblock;
130 pa_packet *packet;
131 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
132 void *data;
133 size_t index;
134 } read;
135
136 int use_shm;
137 pa_memimport *import;
138 pa_memexport *export;
139
140 pa_pstream_packet_cb_t recieve_packet_callback;
141 void *recieve_packet_callback_userdata;
142
143 pa_pstream_memblock_cb_t recieve_memblock_callback;
144 void *recieve_memblock_callback_userdata;
145
146 pa_pstream_notify_cb_t drain_callback;
147 void *drain_callback_userdata;
148
149 pa_pstream_notify_cb_t die_callback;
150 void *die_callback_userdata;
151
152 pa_mempool *mempool;
153
154 #ifdef HAVE_CREDS
155 pa_creds read_creds, write_creds;
156 int read_creds_valid, send_creds_now;
157 #endif
158 };
159
160 static int do_write(pa_pstream *p);
161 static int do_read(pa_pstream *p);
162
163 static void do_something(pa_pstream *p) {
164 assert(p);
165
166 p->mainloop->defer_enable(p->defer_event, 0);
167
168 pa_pstream_ref(p);
169
170 if (!p->dead && pa_iochannel_is_readable(p->io)) {
171 if (do_read(p) < 0)
172 goto fail;
173 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
174 goto fail;
175
176 if (!p->dead && pa_iochannel_is_writable(p->io)) {
177 if (do_write(p) < 0)
178 goto fail;
179 }
180
181 pa_pstream_unref(p);
182 return;
183
184 fail:
185
186 p->dead = 1;
187
188 if (p->die_callback)
189 p->die_callback(p, p->die_callback_userdata);
190
191 pa_pstream_unref(p);
192 }
193
194 static void io_callback(pa_iochannel*io, void *userdata) {
195 pa_pstream *p = userdata;
196
197 assert(p);
198 assert(p->io == io);
199
200 do_something(p);
201 }
202
203 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
204 pa_pstream *p = userdata;
205
206 assert(p);
207 assert(p->defer_event == e);
208 assert(p->mainloop == m);
209
210 do_something(p);
211 }
212
213 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
214
215 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
216 pa_pstream *p;
217
218 assert(m);
219 assert(io);
220 assert(pool);
221
222 p = pa_xnew(pa_pstream, 1);
223 p->ref = 1;
224 p->io = io;
225 pa_iochannel_set_callback(io, io_callback, p);
226 p->dead = 0;
227
228 p->mainloop = m;
229 p->defer_event = m->defer_new(m, defer_callback, p);
230 m->defer_enable(p->defer_event, 0);
231
232 p->send_queue = pa_queue_new();
233 assert(p->send_queue);
234
235 p->write.current = NULL;
236 p->write.index = 0;
237 p->read.memblock = NULL;
238 p->read.packet = NULL;
239 p->read.index = 0;
240
241 p->recieve_packet_callback = NULL;
242 p->recieve_packet_callback_userdata = NULL;
243 p->recieve_memblock_callback = NULL;
244 p->recieve_memblock_callback_userdata = NULL;
245 p->drain_callback = NULL;
246 p->drain_callback_userdata = NULL;
247 p->die_callback = NULL;
248 p->die_callback_userdata = NULL;
249
250 p->mempool = pool;
251
252 p->use_shm = 0;
253 p->export = NULL;
254 p->import = NULL;
255
256 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
257 pa_iochannel_socket_set_sndbuf(io, 1024*8);
258
259 #ifdef HAVE_CREDS
260 p->send_creds_now = 0;
261 p->read_creds_valid = 0;
262 #endif
263 return p;
264 }
265
266 static void item_free(void *item, PA_GCC_UNUSED void *p) {
267 struct item_info *i = item;
268 assert(i);
269
270 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
271 assert(i->chunk.memblock);
272 pa_memblock_unref(i->chunk.memblock);
273 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
274 assert(i->packet);
275 pa_packet_unref(i->packet);
276 }
277
278 pa_xfree(i);
279 }
280
281 static void pstream_free(pa_pstream *p) {
282 assert(p);
283
284 pa_pstream_close(p);
285
286 pa_queue_free(p->send_queue, item_free, NULL);
287
288 if (p->write.current)
289 item_free(p->write.current, NULL);
290
291 if (p->read.memblock)
292 pa_memblock_unref(p->read.memblock);
293
294 if (p->read.packet)
295 pa_packet_unref(p->read.packet);
296
297 pa_xfree(p);
298 }
299
300 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
301 struct item_info *i;
302
303 assert(p);
304 assert(p->ref >= 1);
305 assert(packet);
306
307 if (p->dead)
308 return;
309
310 i = pa_xnew(struct item_info, 1);
311 i->type = PA_PSTREAM_ITEM_PACKET;
312 i->packet = pa_packet_ref(packet);
313
314 #ifdef HAVE_CREDS
315 if ((i->with_creds = !!creds))
316 i->creds = *creds;
317 #endif
318
319 pa_queue_push(p->send_queue, i);
320 p->mainloop->defer_enable(p->defer_event, 1);
321 }
322
323 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
324 struct item_info *i;
325
326 assert(p);
327 assert(p->ref >= 1);
328 assert(channel != (uint32_t) -1);
329 assert(chunk);
330
331 if (p->dead)
332 return;
333
334 i = pa_xnew(struct item_info, 1);
335 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
336 i->chunk = *chunk;
337 i->channel = channel;
338 i->offset = offset;
339 i->seek_mode = seek_mode;
340 #ifdef HAVE_CREDS
341 i->with_creds = 0;
342 #endif
343
344 pa_memblock_ref(i->chunk.memblock);
345
346 pa_queue_push(p->send_queue, i);
347 p->mainloop->defer_enable(p->defer_event, 1);
348 }
349
350 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
351 struct item_info *item;
352 pa_pstream *p = userdata;
353
354 assert(p);
355 assert(p->ref >= 1);
356
357 if (p->dead)
358 return;
359
360 /* pa_log("Releasing block %u", block_id); */
361
362 item = pa_xnew(struct item_info, 1);
363 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
364 item->block_id = block_id;
365 #ifdef HAVE_CREDS
366 item->with_creds = 0;
367 #endif
368
369 pa_queue_push(p->send_queue, item);
370 p->mainloop->defer_enable(p->defer_event, 1);
371 }
372
373 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
374 struct item_info *item;
375 pa_pstream *p = userdata;
376
377 assert(p);
378 assert(p->ref >= 1);
379
380 if (p->dead)
381 return;
382
383 /* pa_log("Revoking block %u", block_id); */
384
385 item = pa_xnew(struct item_info, 1);
386 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
387 item->block_id = block_id;
388 #ifdef HAVE_CREDS
389 item->with_creds = 0;
390 #endif
391
392 pa_queue_push(p->send_queue, item);
393 p->mainloop->defer_enable(p->defer_event, 1);
394 }
395
396 static void prepare_next_write_item(pa_pstream *p) {
397 assert(p);
398
399 if (!(p->write.current = pa_queue_pop(p->send_queue)))
400 return;
401
402 p->write.index = 0;
403 p->write.data = NULL;
404
405 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
406 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
407 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
408 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
409 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
410
411 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
412
413 assert(p->write.current->packet);
414 p->write.data = p->write.current->packet->data;
415 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
416
417 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
418
419 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
420 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
421
422 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
423
424 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
425 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
426
427 } else {
428 uint32_t flags;
429 int send_payload = 1;
430
431 assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
432 assert(p->write.current->chunk.memblock);
433
434 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
435 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
436 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
437
438 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
439
440 if (p->use_shm) {
441 uint32_t block_id, shm_id;
442 size_t offset, length;
443
444 assert(p->export);
445
446 if (pa_memexport_put(p->export,
447 p->write.current->chunk.memblock,
448 &block_id,
449 &shm_id,
450 &offset,
451 &length) >= 0) {
452
453 flags |= PA_FLAG_SHMDATA;
454 send_payload = 0;
455
456 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
457 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
458 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
459 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
460
461 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
462 p->write.data = p->write.shm_info;
463 }
464 /* else */
465 /* pa_log_warn("Failed to export memory block."); */
466 }
467
468 if (send_payload) {
469 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
470 p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
471 }
472
473 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
474 }
475
476 #ifdef HAVE_CREDS
477 if ((p->send_creds_now = p->write.current->with_creds))
478 p->write_creds = p->write.current->creds;
479
480 #endif
481 }
482
483 static int do_write(pa_pstream *p) {
484 void *d;
485 size_t l;
486 ssize_t r;
487 assert(p);
488
489 if (!p->write.current)
490 prepare_next_write_item(p);
491
492 if (!p->write.current)
493 return 0;
494
495 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
496 d = (uint8_t*) p->write.descriptor + p->write.index;
497 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
498 } else {
499 assert(p->write.data);
500
501 d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
502 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
503 }
504
505 assert(l > 0);
506
507 #ifdef HAVE_CREDS
508 if (p->send_creds_now) {
509
510 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
511 return -1;
512
513 p->send_creds_now = 0;
514 } else
515 #endif
516
517 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
518 return -1;
519
520 p->write.index += r;
521
522 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
523 assert(p->write.current);
524 item_free(p->write.current, (void *) 1);
525 p->write.current = NULL;
526
527 if (p->drain_callback && !pa_pstream_is_pending(p))
528 p->drain_callback(p, p->drain_callback_userdata);
529 }
530
531 return 0;
532 }
533
534 static int do_read(pa_pstream *p) {
535 void *d;
536 size_t l;
537 ssize_t r;
538 assert(p);
539
540 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
541 d = (uint8_t*) p->read.descriptor + p->read.index;
542 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
543 } else {
544 assert(p->read.data);
545 d = (uint8_t*) p->read.data + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
546 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
547 }
548
549 #ifdef HAVE_CREDS
550 {
551 int b = 0;
552
553 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
554 return -1;
555
556 p->read_creds_valid = p->read_creds_valid || b;
557 }
558 #else
559 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
560 return -1;
561 #endif
562
563 p->read.index += r;
564
565 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
566 uint32_t flags, length, channel;
567 /* Reading of frame descriptor complete */
568
569 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
570
571 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
572 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
573 return -1;
574 }
575
576 if (flags == PA_FLAG_SHMRELEASE) {
577
578 /* This is a SHM memblock release frame with no payload */
579
580 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
581
582 assert(p->export);
583 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
584
585 goto frame_done;
586
587 } else if (flags == PA_FLAG_SHMREVOKE) {
588
589 /* This is a SHM memblock revoke frame with no payload */
590
591 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
592
593 assert(p->import);
594 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
595
596 goto frame_done;
597 }
598
599 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
600
601 if (length > FRAME_SIZE_MAX) {
602 pa_log_warn("Recieved invalid frame size : %lu", (unsigned long) length);
603 return -1;
604 }
605
606 assert(!p->read.packet && !p->read.memblock);
607
608 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
609
610 if (channel == (uint32_t) -1) {
611
612 if (flags != 0) {
613 pa_log_warn("Received packet frame with invalid flags value.");
614 return -1;
615 }
616
617 /* Frame is a packet frame */
618 p->read.packet = pa_packet_new(length);
619 p->read.data = p->read.packet->data;
620
621 } else {
622
623 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
624 pa_log_warn("Received memblock frame with invalid seek mode.");
625 return -1;
626 }
627
628 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
629
630 if (length != sizeof(p->read.shm_info)) {
631 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
632 return -1;
633 }
634
635 /* Frame is a memblock frame referencing an SHM memblock */
636 p->read.data = p->read.shm_info;
637
638 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
639
640 /* Frame is a memblock frame */
641
642 p->read.memblock = pa_memblock_new(p->mempool, length);
643 p->read.data = p->read.memblock->data;
644 } else {
645
646 pa_log_warn("Recieved memblock frame with invalid flags value.");
647 return -1;
648 }
649 }
650
651 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
652 /* Frame payload available */
653
654 if (p->read.memblock && p->recieve_memblock_callback) {
655
656 /* Is this memblock data? Than pass it to the user */
657 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
658
659 if (l > 0) {
660 pa_memchunk chunk;
661
662 chunk.memblock = p->read.memblock;
663 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
664 chunk.length = l;
665
666 if (p->recieve_memblock_callback) {
667 int64_t offset;
668
669 offset = (int64_t) (
670 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
671 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
672
673 p->recieve_memblock_callback(
674 p,
675 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
676 offset,
677 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
678 &chunk,
679 p->recieve_memblock_callback_userdata);
680 }
681
682 /* Drop seek info for following callbacks */
683 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
684 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
685 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
686 }
687 }
688
689 /* Frame complete */
690 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
691
692 if (p->read.memblock) {
693
694 /* This was a memblock frame. We can unref the memblock now */
695 pa_memblock_unref(p->read.memblock);
696
697 } else if (p->read.packet) {
698
699 if (p->recieve_packet_callback)
700 #ifdef HAVE_CREDS
701 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
702 #else
703 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
704 #endif
705
706 pa_packet_unref(p->read.packet);
707 } else {
708 pa_memblock *b;
709
710 assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
711
712 assert(p->import);
713
714 if (!(b = pa_memimport_get(p->import,
715 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
716 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
717 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
718 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
719
720 pa_log_warn("Failed to import memory block.");
721 return -1;
722 }
723
724 if (p->recieve_memblock_callback) {
725 int64_t offset;
726 pa_memchunk chunk;
727
728 chunk.memblock = b;
729 chunk.index = 0;
730 chunk.length = b->length;
731
732 offset = (int64_t) (
733 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
734 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
735
736 p->recieve_memblock_callback(
737 p,
738 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
739 offset,
740 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
741 &chunk,
742 p->recieve_memblock_callback_userdata);
743 }
744
745 pa_memblock_unref(b);
746 }
747
748 goto frame_done;
749 }
750 }
751
752 return 0;
753
754 frame_done:
755 p->read.memblock = NULL;
756 p->read.packet = NULL;
757 p->read.index = 0;
758
759 #ifdef HAVE_CREDS
760 p->read_creds_valid = 0;
761 #endif
762
763 return 0;
764 }
765
766 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
767 assert(p);
768 assert(p->ref >= 1);
769
770 p->die_callback = cb;
771 p->die_callback_userdata = userdata;
772 }
773
774
775 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
776 assert(p);
777 assert(p->ref >= 1);
778
779 p->drain_callback = cb;
780 p->drain_callback_userdata = userdata;
781 }
782
783 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
784 assert(p);
785 assert(p->ref >= 1);
786
787 p->recieve_packet_callback = cb;
788 p->recieve_packet_callback_userdata = userdata;
789 }
790
791 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
792 assert(p);
793 assert(p->ref >= 1);
794
795 p->recieve_memblock_callback = cb;
796 p->recieve_memblock_callback_userdata = userdata;
797 }
798
799 int pa_pstream_is_pending(pa_pstream *p) {
800 assert(p);
801
802 if (p->dead)
803 return 0;
804
805 return p->write.current || !pa_queue_is_empty(p->send_queue);
806 }
807
808 void pa_pstream_unref(pa_pstream*p) {
809 assert(p);
810 assert(p->ref >= 1);
811
812 if (--p->ref == 0)
813 pstream_free(p);
814 }
815
816 pa_pstream* pa_pstream_ref(pa_pstream*p) {
817 assert(p);
818 assert(p->ref >= 1);
819
820 p->ref++;
821 return p;
822 }
823
824 void pa_pstream_close(pa_pstream *p) {
825 assert(p);
826
827 p->dead = 1;
828
829 if (p->import) {
830 pa_memimport_free(p->import);
831 p->import = NULL;
832 }
833
834 if (p->export) {
835 pa_memexport_free(p->export);
836 p->export = NULL;
837 }
838
839 if (p->io) {
840 pa_iochannel_free(p->io);
841 p->io = NULL;
842 }
843
844 if (p->defer_event) {
845 p->mainloop->defer_free(p->defer_event);
846 p->defer_event = NULL;
847 }
848
849 p->die_callback = NULL;
850 p->drain_callback = NULL;
851 p->recieve_packet_callback = NULL;
852 p->recieve_memblock_callback = NULL;
853
854
855 }
856
857 void pa_pstream_use_shm(pa_pstream *p, int enable) {
858 assert(p);
859 assert(p->ref >= 1);
860
861 p->use_shm = enable;
862
863 if (!p->import)
864 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
865
866 if (!p->export)
867 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
868 }