]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
when transferring large memory chunks of a pa_pstream, split them up
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <assert.h>
29 #include <unistd.h>
30
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
33 #endif
34 #ifdef HAVE_SYS_UN_H
35 #include <sys/un.h>
36 #endif
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #endif
40
41 #include "winsock.h"
42
43 #include <pulse/xmalloc.h>
44
45 #include <pulsecore/queue.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-scache.h>
48 #include <pulsecore/creds.h>
49
50 #include "pstream.h"
51
52 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
53 #define PA_FLAG_SHMDATA 0x80000000LU
54 #define PA_FLAG_SHMRELEASE 0x40000000LU
55 #define PA_FLAG_SHMREVOKE 0xC0000000LU
56 #define PA_FLAG_SHMMASK 0xFF000000LU
57 #define PA_FLAG_SEEKMASK 0x000000FFLU
58
59 /* The sequence descriptor header consists of 5 32bit integers: */
60 enum {
61 PA_PSTREAM_DESCRIPTOR_LENGTH,
62 PA_PSTREAM_DESCRIPTOR_CHANNEL,
63 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
64 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
65 PA_PSTREAM_DESCRIPTOR_FLAGS,
66 PA_PSTREAM_DESCRIPTOR_MAX
67 };
68
69 /* If we have an SHM block, this info follows the descriptor */
70 enum {
71 PA_PSTREAM_SHM_BLOCKID,
72 PA_PSTREAM_SHM_SHMID,
73 PA_PSTREAM_SHM_INDEX,
74 PA_PSTREAM_SHM_LENGTH,
75 PA_PSTREAM_SHM_MAX
76 };
77
78 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
79
80 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
81 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
82 #define FRAME_SIZE_MAX_USE (1024*64)
83
84 struct item_info {
85 enum {
86 PA_PSTREAM_ITEM_PACKET,
87 PA_PSTREAM_ITEM_MEMBLOCK,
88 PA_PSTREAM_ITEM_SHMRELEASE,
89 PA_PSTREAM_ITEM_SHMREVOKE
90 } type;
91
92
93 /* packet info */
94 pa_packet *packet;
95 #ifdef HAVE_CREDS
96 int with_creds;
97 pa_creds creds;
98 #endif
99
100 /* memblock info */
101 pa_memchunk chunk;
102 uint32_t channel;
103 int64_t offset;
104 pa_seek_mode_t seek_mode;
105
106 /* release/revoke info */
107 uint32_t block_id;
108 };
109
110 struct pa_pstream {
111 int ref;
112
113 pa_mainloop_api *mainloop;
114 pa_defer_event *defer_event;
115 pa_iochannel *io;
116 pa_queue *send_queue;
117
118 int dead;
119
120 struct {
121 pa_pstream_descriptor descriptor;
122 struct item_info* current;
123 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
124 void *data;
125 size_t index;
126 } write;
127
128 struct {
129 pa_pstream_descriptor descriptor;
130 pa_memblock *memblock;
131 pa_packet *packet;
132 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
133 void *data;
134 size_t index;
135 } read;
136
137 int use_shm;
138 pa_memimport *import;
139 pa_memexport *export;
140
141 pa_pstream_packet_cb_t recieve_packet_callback;
142 void *recieve_packet_callback_userdata;
143
144 pa_pstream_memblock_cb_t recieve_memblock_callback;
145 void *recieve_memblock_callback_userdata;
146
147 pa_pstream_notify_cb_t drain_callback;
148 void *drain_callback_userdata;
149
150 pa_pstream_notify_cb_t die_callback;
151 void *die_callback_userdata;
152
153 pa_mempool *mempool;
154
155 #ifdef HAVE_CREDS
156 pa_creds read_creds, write_creds;
157 int read_creds_valid, send_creds_now;
158 #endif
159 };
160
161 static int do_write(pa_pstream *p);
162 static int do_read(pa_pstream *p);
163
164 static void do_something(pa_pstream *p) {
165 assert(p);
166
167 p->mainloop->defer_enable(p->defer_event, 0);
168
169 pa_pstream_ref(p);
170
171 if (!p->dead && pa_iochannel_is_readable(p->io)) {
172 if (do_read(p) < 0)
173 goto fail;
174 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
175 goto fail;
176
177 if (!p->dead && pa_iochannel_is_writable(p->io)) {
178 if (do_write(p) < 0)
179 goto fail;
180 }
181
182 pa_pstream_unref(p);
183 return;
184
185 fail:
186
187 p->dead = 1;
188
189 if (p->die_callback)
190 p->die_callback(p, p->die_callback_userdata);
191
192 pa_pstream_unref(p);
193 }
194
195 static void io_callback(pa_iochannel*io, void *userdata) {
196 pa_pstream *p = userdata;
197
198 assert(p);
199 assert(p->io == io);
200
201 do_something(p);
202 }
203
204 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
205 pa_pstream *p = userdata;
206
207 assert(p);
208 assert(p->defer_event == e);
209 assert(p->mainloop == m);
210
211 do_something(p);
212 }
213
214 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
215
216 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
217 pa_pstream *p;
218
219 assert(m);
220 assert(io);
221 assert(pool);
222
223 p = pa_xnew(pa_pstream, 1);
224 p->ref = 1;
225 p->io = io;
226 pa_iochannel_set_callback(io, io_callback, p);
227 p->dead = 0;
228
229 p->mainloop = m;
230 p->defer_event = m->defer_new(m, defer_callback, p);
231 m->defer_enable(p->defer_event, 0);
232
233 p->send_queue = pa_queue_new();
234 assert(p->send_queue);
235
236 p->write.current = NULL;
237 p->write.index = 0;
238 p->read.memblock = NULL;
239 p->read.packet = NULL;
240 p->read.index = 0;
241
242 p->recieve_packet_callback = NULL;
243 p->recieve_packet_callback_userdata = NULL;
244 p->recieve_memblock_callback = NULL;
245 p->recieve_memblock_callback_userdata = NULL;
246 p->drain_callback = NULL;
247 p->drain_callback_userdata = NULL;
248 p->die_callback = NULL;
249 p->die_callback_userdata = NULL;
250
251 p->mempool = pool;
252
253 p->use_shm = 0;
254 p->export = NULL;
255
256 /* We do importing unconditionally */
257 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
258
259 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
260 pa_iochannel_socket_set_sndbuf(io, 1024*8);
261
262 #ifdef HAVE_CREDS
263 p->send_creds_now = 0;
264 p->read_creds_valid = 0;
265 #endif
266 return p;
267 }
268
269 static void item_free(void *item, PA_GCC_UNUSED void *p) {
270 struct item_info *i = item;
271 assert(i);
272
273 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
274 assert(i->chunk.memblock);
275 pa_memblock_unref(i->chunk.memblock);
276 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
277 assert(i->packet);
278 pa_packet_unref(i->packet);
279 }
280
281 pa_xfree(i);
282 }
283
284 static void pstream_free(pa_pstream *p) {
285 assert(p);
286
287 pa_pstream_close(p);
288
289 pa_queue_free(p->send_queue, item_free, NULL);
290
291 if (p->write.current)
292 item_free(p->write.current, NULL);
293
294 if (p->read.memblock)
295 pa_memblock_unref(p->read.memblock);
296
297 if (p->read.packet)
298 pa_packet_unref(p->read.packet);
299
300 pa_xfree(p);
301 }
302
303 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
304 struct item_info *i;
305
306 assert(p);
307 assert(p->ref >= 1);
308 assert(packet);
309
310 if (p->dead)
311 return;
312
313 i = pa_xnew(struct item_info, 1);
314 i->type = PA_PSTREAM_ITEM_PACKET;
315 i->packet = pa_packet_ref(packet);
316
317 #ifdef HAVE_CREDS
318 if ((i->with_creds = !!creds))
319 i->creds = *creds;
320 #endif
321
322 pa_queue_push(p->send_queue, i);
323 p->mainloop->defer_enable(p->defer_event, 1);
324 }
325
326 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
327 size_t length, idx;
328
329 assert(p);
330 assert(p->ref >= 1);
331 assert(channel != (uint32_t) -1);
332 assert(chunk);
333
334 if (p->dead)
335 return;
336
337 length = chunk->length;
338 idx = 0;
339
340 while (length > 0) {
341 struct item_info *i;
342 size_t n;
343
344 i = pa_xnew(struct item_info, 1);
345 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
346
347 n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
348 i->chunk.index = chunk->index + idx;
349 i->chunk.length = n;
350 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
351
352 i->channel = channel;
353 i->offset = offset;
354 i->seek_mode = seek_mode;
355 #ifdef HAVE_CREDS
356 i->with_creds = 0;
357 #endif
358
359 pa_queue_push(p->send_queue, i);
360
361 idx += n;
362 length -= n;
363 }
364
365 p->mainloop->defer_enable(p->defer_event, 1);
366 }
367
368 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
369 struct item_info *item;
370 pa_pstream *p = userdata;
371
372 assert(p);
373 assert(p->ref >= 1);
374
375 if (p->dead)
376 return;
377
378 /* pa_log("Releasing block %u", block_id); */
379
380 item = pa_xnew(struct item_info, 1);
381 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
382 item->block_id = block_id;
383 #ifdef HAVE_CREDS
384 item->with_creds = 0;
385 #endif
386
387 pa_queue_push(p->send_queue, item);
388 p->mainloop->defer_enable(p->defer_event, 1);
389 }
390
391 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
392 struct item_info *item;
393 pa_pstream *p = userdata;
394
395 assert(p);
396 assert(p->ref >= 1);
397
398 if (p->dead)
399 return;
400
401 /* pa_log("Revoking block %u", block_id); */
402
403 item = pa_xnew(struct item_info, 1);
404 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
405 item->block_id = block_id;
406 #ifdef HAVE_CREDS
407 item->with_creds = 0;
408 #endif
409
410 pa_queue_push(p->send_queue, item);
411 p->mainloop->defer_enable(p->defer_event, 1);
412 }
413
414 static void prepare_next_write_item(pa_pstream *p) {
415 assert(p);
416
417 if (!(p->write.current = pa_queue_pop(p->send_queue)))
418 return;
419
420 p->write.index = 0;
421 p->write.data = NULL;
422
423 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
424 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
425 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
426 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
427 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
428
429 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
430
431 assert(p->write.current->packet);
432 p->write.data = p->write.current->packet->data;
433 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
434
435 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
436
437 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
438 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
439
440 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
441
442 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
443 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
444
445 } else {
446 uint32_t flags;
447 int send_payload = 1;
448
449 assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
450 assert(p->write.current->chunk.memblock);
451
452 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
453 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
454 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
455
456 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
457
458 if (p->use_shm) {
459 uint32_t block_id, shm_id;
460 size_t offset, length;
461
462 assert(p->export);
463
464 if (pa_memexport_put(p->export,
465 p->write.current->chunk.memblock,
466 &block_id,
467 &shm_id,
468 &offset,
469 &length) >= 0) {
470
471 flags |= PA_FLAG_SHMDATA;
472 send_payload = 0;
473
474 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
475 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
476 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
477 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
478
479 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
480 p->write.data = p->write.shm_info;
481 }
482 /* else */
483 /* pa_log_warn("Failed to export memory block."); */
484 }
485
486 if (send_payload) {
487 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
488 p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
489 }
490
491 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
492 }
493
494 #ifdef HAVE_CREDS
495 if ((p->send_creds_now = p->write.current->with_creds))
496 p->write_creds = p->write.current->creds;
497 #endif
498 }
499
500 static int do_write(pa_pstream *p) {
501 void *d;
502 size_t l;
503 ssize_t r;
504 assert(p);
505
506 if (!p->write.current)
507 prepare_next_write_item(p);
508
509 if (!p->write.current)
510 return 0;
511
512 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
513 d = (uint8_t*) p->write.descriptor + p->write.index;
514 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
515 } else {
516 assert(p->write.data);
517
518 d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
519 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
520 }
521
522 assert(l > 0);
523
524 #ifdef HAVE_CREDS
525 if (p->send_creds_now) {
526
527 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
528 return -1;
529
530 p->send_creds_now = 0;
531 } else
532 #endif
533
534 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
535 return -1;
536
537 p->write.index += r;
538
539 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
540 assert(p->write.current);
541 item_free(p->write.current, (void *) 1);
542 p->write.current = NULL;
543
544 if (p->drain_callback && !pa_pstream_is_pending(p))
545 p->drain_callback(p, p->drain_callback_userdata);
546 }
547
548 return 0;
549 }
550
551 static int do_read(pa_pstream *p) {
552 void *d;
553 size_t l;
554 ssize_t r;
555 assert(p);
556
557 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
558 d = (uint8_t*) p->read.descriptor + p->read.index;
559 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
560 } else {
561 assert(p->read.data);
562 d = (uint8_t*) p->read.data + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
563 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
564 }
565
566 #ifdef HAVE_CREDS
567 {
568 int b = 0;
569
570 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
571 return -1;
572
573 p->read_creds_valid = p->read_creds_valid || b;
574 }
575 #else
576 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
577 return -1;
578 #endif
579
580 p->read.index += r;
581
582 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
583 uint32_t flags, length, channel;
584 /* Reading of frame descriptor complete */
585
586 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
587
588 if (!p->import && (flags & PA_FLAG_SHMMASK) != 0) {
589 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
590 return -1;
591 }
592
593 if (flags == PA_FLAG_SHMRELEASE) {
594
595 /* This is a SHM memblock release frame with no payload */
596
597 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
598
599 assert(p->export);
600 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
601
602 goto frame_done;
603
604 } else if (flags == PA_FLAG_SHMREVOKE) {
605
606 /* This is a SHM memblock revoke frame with no payload */
607
608 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
609
610 assert(p->import);
611 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
612
613 goto frame_done;
614 }
615
616 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
617
618 if (length > FRAME_SIZE_MAX_ALLOW) {
619 pa_log_warn("Recieved invalid frame size : %lu", (unsigned long) length);
620 return -1;
621 }
622
623 assert(!p->read.packet && !p->read.memblock);
624
625 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
626
627 if (channel == (uint32_t) -1) {
628
629 if (flags != 0) {
630 pa_log_warn("Received packet frame with invalid flags value.");
631 return -1;
632 }
633
634 /* Frame is a packet frame */
635 p->read.packet = pa_packet_new(length);
636 p->read.data = p->read.packet->data;
637
638 } else {
639
640 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
641 pa_log_warn("Received memblock frame with invalid seek mode.");
642 return -1;
643 }
644
645 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
646
647 if (length != sizeof(p->read.shm_info)) {
648 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
649 return -1;
650 }
651
652 /* Frame is a memblock frame referencing an SHM memblock */
653 p->read.data = p->read.shm_info;
654
655 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
656
657 /* Frame is a memblock frame */
658
659 p->read.memblock = pa_memblock_new(p->mempool, length);
660 p->read.data = p->read.memblock->data;
661 } else {
662
663 pa_log_warn("Recieved memblock frame with invalid flags value.");
664 return -1;
665 }
666 }
667
668 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
669 /* Frame payload available */
670
671 if (p->read.memblock && p->recieve_memblock_callback) {
672
673 /* Is this memblock data? Than pass it to the user */
674 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
675
676 if (l > 0) {
677 pa_memchunk chunk;
678
679 chunk.memblock = p->read.memblock;
680 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
681 chunk.length = l;
682
683 if (p->recieve_memblock_callback) {
684 int64_t offset;
685
686 offset = (int64_t) (
687 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
688 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
689
690 p->recieve_memblock_callback(
691 p,
692 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
693 offset,
694 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
695 &chunk,
696 p->recieve_memblock_callback_userdata);
697 }
698
699 /* Drop seek info for following callbacks */
700 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
701 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
702 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
703 }
704 }
705
706 /* Frame complete */
707 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
708
709 if (p->read.memblock) {
710
711 /* This was a memblock frame. We can unref the memblock now */
712 pa_memblock_unref(p->read.memblock);
713
714 } else if (p->read.packet) {
715
716 if (p->recieve_packet_callback)
717 #ifdef HAVE_CREDS
718 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
719 #else
720 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
721 #endif
722
723 pa_packet_unref(p->read.packet);
724 } else {
725 pa_memblock *b;
726
727 assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
728
729 assert(p->import);
730
731 if (!(b = pa_memimport_get(p->import,
732 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
733 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
734 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
735 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
736
737 pa_log_warn("Failed to import memory block.");
738 return -1;
739 }
740
741 if (p->recieve_memblock_callback) {
742 int64_t offset;
743 pa_memchunk chunk;
744
745 chunk.memblock = b;
746 chunk.index = 0;
747 chunk.length = b->length;
748
749 offset = (int64_t) (
750 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
751 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
752
753 p->recieve_memblock_callback(
754 p,
755 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
756 offset,
757 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
758 &chunk,
759 p->recieve_memblock_callback_userdata);
760 }
761
762 pa_memblock_unref(b);
763 }
764
765 goto frame_done;
766 }
767 }
768
769 return 0;
770
771 frame_done:
772 p->read.memblock = NULL;
773 p->read.packet = NULL;
774 p->read.index = 0;
775
776 #ifdef HAVE_CREDS
777 p->read_creds_valid = 0;
778 #endif
779
780 return 0;
781 }
782
783 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
784 assert(p);
785 assert(p->ref >= 1);
786
787 p->die_callback = cb;
788 p->die_callback_userdata = userdata;
789 }
790
791
792 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
793 assert(p);
794 assert(p->ref >= 1);
795
796 p->drain_callback = cb;
797 p->drain_callback_userdata = userdata;
798 }
799
800 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
801 assert(p);
802 assert(p->ref >= 1);
803
804 p->recieve_packet_callback = cb;
805 p->recieve_packet_callback_userdata = userdata;
806 }
807
808 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
809 assert(p);
810 assert(p->ref >= 1);
811
812 p->recieve_memblock_callback = cb;
813 p->recieve_memblock_callback_userdata = userdata;
814 }
815
816 int pa_pstream_is_pending(pa_pstream *p) {
817 assert(p);
818
819 if (p->dead)
820 return 0;
821
822 return p->write.current || !pa_queue_is_empty(p->send_queue);
823 }
824
825 void pa_pstream_unref(pa_pstream*p) {
826 assert(p);
827 assert(p->ref >= 1);
828
829 if (--p->ref == 0)
830 pstream_free(p);
831 }
832
833 pa_pstream* pa_pstream_ref(pa_pstream*p) {
834 assert(p);
835 assert(p->ref >= 1);
836
837 p->ref++;
838 return p;
839 }
840
841 void pa_pstream_close(pa_pstream *p) {
842 assert(p);
843
844 p->dead = 1;
845
846 if (p->import) {
847 pa_memimport_free(p->import);
848 p->import = NULL;
849 }
850
851 if (p->export) {
852 pa_memexport_free(p->export);
853 p->export = NULL;
854 }
855
856 if (p->io) {
857 pa_iochannel_free(p->io);
858 p->io = NULL;
859 }
860
861 if (p->defer_event) {
862 p->mainloop->defer_free(p->defer_event);
863 p->defer_event = NULL;
864 }
865
866 p->die_callback = NULL;
867 p->drain_callback = NULL;
868 p->recieve_packet_callback = NULL;
869 p->recieve_memblock_callback = NULL;
870
871
872 }
873
874 void pa_pstream_use_shm(pa_pstream *p, int enable) {
875 assert(p);
876 assert(p->ref >= 1);
877
878 p->use_shm = enable;
879
880 if (enable) {
881
882 if (!p->export)
883 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
884
885 } else {
886
887 if (p->export) {
888 pa_memexport_free(p->export);
889 p->export = NULL;
890 }
891
892 }
893 }