]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
minor update
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
35 #endif
36 #ifdef HAVE_SYS_UN_H
37 #include <sys/un.h>
38 #endif
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
41 #endif
42
43 #include "winsock.h"
44
45 #include <pulse/xmalloc.h>
46
47 #include <pulsecore/queue.h>
48 #include <pulsecore/log.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/creds.h>
51 #include <pulsecore/refcnt.h>
52
53 #include "pstream.h"
54
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA 0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE 0xC0000000LU
59 #define PA_FLAG_SHMMASK 0xFF000000LU
60 #define PA_FLAG_SEEKMASK 0x000000FFLU
61
62 /* The sequence descriptor header consists of 5 32bit integers: */
63 enum {
64 PA_PSTREAM_DESCRIPTOR_LENGTH,
65 PA_PSTREAM_DESCRIPTOR_CHANNEL,
66 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
68 PA_PSTREAM_DESCRIPTOR_FLAGS,
69 PA_PSTREAM_DESCRIPTOR_MAX
70 };
71
72 /* If we have an SHM block, this info follows the descriptor */
73 enum {
74 PA_PSTREAM_SHM_BLOCKID,
75 PA_PSTREAM_SHM_SHMID,
76 PA_PSTREAM_SHM_INDEX,
77 PA_PSTREAM_SHM_LENGTH,
78 PA_PSTREAM_SHM_MAX
79 };
80
81 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
82
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
85 #define FRAME_SIZE_MAX_USE (1024*64)
86
87 struct item_info {
88 enum {
89 PA_PSTREAM_ITEM_PACKET,
90 PA_PSTREAM_ITEM_MEMBLOCK,
91 PA_PSTREAM_ITEM_SHMRELEASE,
92 PA_PSTREAM_ITEM_SHMREVOKE
93 } type;
94
95
96 /* packet info */
97 pa_packet *packet;
98 #ifdef HAVE_CREDS
99 int with_creds;
100 pa_creds creds;
101 #endif
102
103 /* memblock info */
104 pa_memchunk chunk;
105 uint32_t channel;
106 int64_t offset;
107 pa_seek_mode_t seek_mode;
108
109 /* release/revoke info */
110 uint32_t block_id;
111 };
112
113 struct pa_pstream {
114 PA_REFCNT_DECLARE;
115
116 pa_mainloop_api *mainloop;
117 pa_defer_event *defer_event;
118 pa_iochannel *io;
119
120 pa_queue *send_queue;
121
122 int dead;
123
124 struct {
125 pa_pstream_descriptor descriptor;
126 struct item_info* current;
127 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
128 void *data;
129 size_t index;
130 pa_memchunk memchunk;
131 } write;
132
133 struct {
134 pa_pstream_descriptor descriptor;
135 pa_memblock *memblock;
136 pa_packet *packet;
137 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
138 void *data;
139 size_t index;
140 } read;
141
142 int use_shm;
143 pa_memimport *import;
144 pa_memexport *export;
145
146 pa_pstream_packet_cb_t recieve_packet_callback;
147 void *recieve_packet_callback_userdata;
148
149 pa_pstream_memblock_cb_t recieve_memblock_callback;
150 void *recieve_memblock_callback_userdata;
151
152 pa_pstream_notify_cb_t drain_callback;
153 void *drain_callback_userdata;
154
155 pa_pstream_notify_cb_t die_callback;
156 void *die_callback_userdata;
157
158 pa_mempool *mempool;
159
160 #ifdef HAVE_CREDS
161 pa_creds read_creds, write_creds;
162 int read_creds_valid, send_creds_now;
163 #endif
164 };
165
166 static int do_write(pa_pstream *p);
167 static int do_read(pa_pstream *p);
168
169 static void do_something(pa_pstream *p) {
170 pa_assert(p);
171 pa_assert(PA_REFCNT_VALUE(p) > 0);
172
173 pa_pstream_ref(p);
174
175 p->mainloop->defer_enable(p->defer_event, 0);
176
177 if (!p->dead && pa_iochannel_is_readable(p->io)) {
178 if (do_read(p) < 0)
179 goto fail;
180 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
181 goto fail;
182
183 if (!p->dead && pa_iochannel_is_writable(p->io)) {
184 if (do_write(p) < 0)
185 goto fail;
186 }
187
188 pa_pstream_unref(p);
189 return;
190
191 fail:
192
193 if (p->die_callback)
194 p->die_callback(p, p->die_callback_userdata);
195
196 pa_pstream_unlink(p);
197 pa_pstream_unref(p);
198 }
199
200 static void io_callback(pa_iochannel*io, void *userdata) {
201 pa_pstream *p = userdata;
202
203 pa_assert(p);
204 pa_assert(PA_REFCNT_VALUE(p) > 0);
205 pa_assert(p->io == io);
206
207 do_something(p);
208 }
209
210 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
211 pa_pstream *p = userdata;
212
213 pa_assert(p);
214 pa_assert(PA_REFCNT_VALUE(p) > 0);
215 pa_assert(p->defer_event == e);
216 pa_assert(p->mainloop == m);
217
218 do_something(p);
219 }
220
221 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
222
223 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
224 pa_pstream *p;
225
226 pa_assert(m);
227 pa_assert(io);
228 pa_assert(pool);
229
230 p = pa_xnew(pa_pstream, 1);
231 PA_REFCNT_INIT(p);
232 p->io = io;
233 pa_iochannel_set_callback(io, io_callback, p);
234 p->dead = 0;
235
236 p->mainloop = m;
237 p->defer_event = m->defer_new(m, defer_callback, p);
238 m->defer_enable(p->defer_event, 0);
239
240 p->send_queue = pa_queue_new();
241
242 p->write.current = NULL;
243 p->write.index = 0;
244 pa_memchunk_reset(&p->write.memchunk);
245 p->read.memblock = NULL;
246 p->read.packet = NULL;
247 p->read.index = 0;
248
249 p->recieve_packet_callback = NULL;
250 p->recieve_packet_callback_userdata = NULL;
251 p->recieve_memblock_callback = NULL;
252 p->recieve_memblock_callback_userdata = NULL;
253 p->drain_callback = NULL;
254 p->drain_callback_userdata = NULL;
255 p->die_callback = NULL;
256 p->die_callback_userdata = NULL;
257
258 p->mempool = pool;
259
260 p->use_shm = 0;
261 p->export = NULL;
262
263 /* We do importing unconditionally */
264 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
265
266 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
267 pa_iochannel_socket_set_sndbuf(io, 1024*8);
268
269 #ifdef HAVE_CREDS
270 p->send_creds_now = 0;
271 p->read_creds_valid = 0;
272 #endif
273 return p;
274 }
275
276 static void item_free(void *item, PA_GCC_UNUSED void *q) {
277 struct item_info *i = item;
278 pa_assert(i);
279
280 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
281 pa_assert(i->chunk.memblock);
282 pa_memblock_unref(i->chunk.memblock);
283 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
284 pa_assert(i->packet);
285 pa_packet_unref(i->packet);
286 }
287
288 pa_xfree(i);
289 }
290
291 static void pstream_free(pa_pstream *p) {
292 pa_assert(p);
293
294 pa_pstream_unlink(p);
295
296 pa_queue_free(p->send_queue, item_free, NULL);
297
298 if (p->write.current)
299 item_free(p->write.current, NULL);
300
301 if (p->write.memchunk.memblock)
302 pa_memblock_unref(p->write.memchunk.memblock);
303
304 if (p->read.memblock)
305 pa_memblock_unref(p->read.memblock);
306
307 if (p->read.packet)
308 pa_packet_unref(p->read.packet);
309
310 pa_xfree(p);
311 }
312
313 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
314 struct item_info *i;
315
316 pa_assert(p);
317 pa_assert(PA_REFCNT_VALUE(p) > 0);
318 pa_assert(packet);
319
320 if (p->dead)
321 return;
322
323 i = pa_xnew(struct item_info, 1);
324 i->type = PA_PSTREAM_ITEM_PACKET;
325 i->packet = pa_packet_ref(packet);
326
327 #ifdef HAVE_CREDS
328 if ((i->with_creds = !!creds))
329 i->creds = *creds;
330 #endif
331
332 pa_queue_push(p->send_queue, i);
333
334 p->mainloop->defer_enable(p->defer_event, 1);
335 }
336
337 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
338 size_t length, idx;
339
340 pa_assert(p);
341 pa_assert(PA_REFCNT_VALUE(p) > 0);
342 pa_assert(channel != (uint32_t) -1);
343 pa_assert(chunk);
344
345 if (p->dead)
346 return;
347
348 idx = 0;
349 length = chunk->length;
350
351 while (length > 0) {
352 struct item_info *i;
353 size_t n;
354
355 i = pa_xnew(struct item_info, 1);
356 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
357
358 n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
359 i->chunk.index = chunk->index + idx;
360 i->chunk.length = n;
361 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
362
363 i->channel = channel;
364 i->offset = offset;
365 i->seek_mode = seek_mode;
366 #ifdef HAVE_CREDS
367 i->with_creds = 0;
368 #endif
369
370 pa_queue_push(p->send_queue, i);
371
372 idx += n;
373 length -= n;
374 }
375
376 p->mainloop->defer_enable(p->defer_event, 1);
377 }
378
379 /* might be called from thread context */
380 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
381 struct item_info *item;
382 pa_pstream *p = userdata;
383
384 pa_assert(p);
385 pa_assert(PA_REFCNT_VALUE(p) > 0);
386
387 if (p->dead)
388 return;
389
390 /* pa_log("Releasing block %u", block_id); */
391
392 item = pa_xnew(struct item_info, 1);
393 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
394 item->block_id = block_id;
395 #ifdef HAVE_CREDS
396 item->with_creds = 0;
397 #endif
398
399 pa_queue_push(p->send_queue, item);
400 p->mainloop->defer_enable(p->defer_event, 1);
401 }
402
403 /* might be called from thread context */
404 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
405 struct item_info *item;
406 pa_pstream *p = userdata;
407
408 pa_assert(p);
409 pa_assert(PA_REFCNT_VALUE(p) > 0);
410
411 if (p->dead)
412 return;
413 /* pa_log("Revoking block %u", block_id); */
414
415 item = pa_xnew(struct item_info, 1);
416 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
417 item->block_id = block_id;
418 #ifdef HAVE_CREDS
419 item->with_creds = 0;
420 #endif
421
422 pa_queue_push(p->send_queue, item);
423 p->mainloop->defer_enable(p->defer_event, 1);
424 }
425
426 static void prepare_next_write_item(pa_pstream *p) {
427 pa_assert(p);
428 pa_assert(PA_REFCNT_VALUE(p) > 0);
429
430 p->write.current = pa_queue_pop(p->send_queue);
431
432 if (!p->write.current)
433 return;
434
435 p->write.index = 0;
436 p->write.data = NULL;
437 pa_memchunk_reset(&p->write.memchunk);
438
439 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
440 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
441 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
442 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
443 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
444
445 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
446
447 pa_assert(p->write.current->packet);
448 p->write.data = p->write.current->packet->data;
449 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
450
451 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
452
453 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
454 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
455
456 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
457
458 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
459 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
460
461 } else {
462 uint32_t flags;
463 int send_payload = 1;
464
465 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
466 pa_assert(p->write.current->chunk.memblock);
467
468 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
469 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
470 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
471
472 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
473
474 if (p->use_shm) {
475 uint32_t block_id, shm_id;
476 size_t offset, length;
477
478 pa_assert(p->export);
479
480 if (pa_memexport_put(p->export,
481 p->write.current->chunk.memblock,
482 &block_id,
483 &shm_id,
484 &offset,
485 &length) >= 0) {
486
487 flags |= PA_FLAG_SHMDATA;
488 send_payload = 0;
489
490 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
491 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
492 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
493 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
494
495 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
496 p->write.data = p->write.shm_info;
497 }
498 /* else */
499 /* pa_log_warn("Failed to export memory block."); */
500 }
501
502 if (send_payload) {
503 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
504 p->write.memchunk = p->write.current->chunk;
505 pa_memblock_ref(p->write.memchunk.memblock);
506 p->write.data = NULL;
507 }
508
509 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
510 }
511
512 #ifdef HAVE_CREDS
513 if ((p->send_creds_now = p->write.current->with_creds))
514 p->write_creds = p->write.current->creds;
515 #endif
516 }
517
518 static int do_write(pa_pstream *p) {
519 void *d;
520 size_t l;
521 ssize_t r;
522 pa_memblock *release_memblock = NULL;
523
524 pa_assert(p);
525 pa_assert(PA_REFCNT_VALUE(p) > 0);
526
527 if (!p->write.current)
528 prepare_next_write_item(p);
529
530 if (!p->write.current)
531 return 0;
532
533 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
534 d = (uint8_t*) p->write.descriptor + p->write.index;
535 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
536 } else {
537 pa_assert(p->write.data || p->write.memchunk.memblock);
538
539 if (p->write.data)
540 d = p->write.data;
541 else {
542 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
543 release_memblock = p->write.memchunk.memblock;
544 }
545
546 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
547 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
548 }
549
550 pa_assert(l > 0);
551
552 #ifdef HAVE_CREDS
553 if (p->send_creds_now) {
554
555 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
556 goto fail;
557
558 p->send_creds_now = 0;
559 } else
560 #endif
561
562 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
563 goto fail;
564
565 if (release_memblock)
566 pa_memblock_release(release_memblock);
567
568 p->write.index += r;
569
570 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
571 pa_assert(p->write.current);
572 item_free(p->write.current, NULL);
573 p->write.current = NULL;
574
575 if (p->write.memchunk.memblock)
576 pa_memblock_unref(p->write.memchunk.memblock);
577
578 pa_memchunk_reset(&p->write.memchunk);
579
580 if (p->drain_callback && !pa_pstream_is_pending(p))
581 p->drain_callback(p, p->drain_callback_userdata);
582 }
583
584 return 0;
585
586 fail:
587
588 if (release_memblock)
589 pa_memblock_release(release_memblock);
590
591 return -1;
592 }
593
594 static int do_read(pa_pstream *p) {
595 void *d;
596 size_t l;
597 ssize_t r;
598 pa_memblock *release_memblock = NULL;
599 pa_assert(p);
600 pa_assert(PA_REFCNT_VALUE(p) > 0);
601
602 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
603 d = (uint8_t*) p->read.descriptor + p->read.index;
604 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
605 } else {
606 pa_assert(p->read.data || p->read.memblock);
607
608 if (p->read.data)
609 d = p->read.data;
610 else {
611 d = pa_memblock_acquire(p->read.memblock);
612 release_memblock = p->read.memblock;
613 }
614
615 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
616 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
617 }
618
619 #ifdef HAVE_CREDS
620 {
621 int b = 0;
622
623 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
624 goto fail;
625
626 p->read_creds_valid = p->read_creds_valid || b;
627 }
628 #else
629 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
630 goto fail;
631 #endif
632
633 if (release_memblock)
634 pa_memblock_release(release_memblock);
635
636 p->read.index += r;
637
638 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
639 uint32_t flags, length, channel;
640 /* Reading of frame descriptor complete */
641
642 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
643
644 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
645 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
646 return -1;
647 }
648
649 if (flags == PA_FLAG_SHMRELEASE) {
650
651 /* This is a SHM memblock release frame with no payload */
652
653 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
654
655 pa_assert(p->export);
656 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
657
658 goto frame_done;
659
660 } else if (flags == PA_FLAG_SHMREVOKE) {
661
662 /* This is a SHM memblock revoke frame with no payload */
663
664 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
665
666 pa_assert(p->import);
667 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
668
669 goto frame_done;
670 }
671
672 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
673
674 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
675 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length);
676 return -1;
677 }
678
679 pa_assert(!p->read.packet && !p->read.memblock);
680
681 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
682
683 if (channel == (uint32_t) -1) {
684
685 if (flags != 0) {
686 pa_log_warn("Received packet frame with invalid flags value.");
687 return -1;
688 }
689
690 /* Frame is a packet frame */
691 p->read.packet = pa_packet_new(length);
692 p->read.data = p->read.packet->data;
693
694 } else {
695
696 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
697 pa_log_warn("Received memblock frame with invalid seek mode.");
698 return -1;
699 }
700
701 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
702
703 if (length != sizeof(p->read.shm_info)) {
704 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
705 return -1;
706 }
707
708 /* Frame is a memblock frame referencing an SHM memblock */
709 p->read.data = p->read.shm_info;
710
711 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
712
713 /* Frame is a memblock frame */
714
715 p->read.memblock = pa_memblock_new(p->mempool, length);
716 p->read.data = NULL;
717 } else {
718
719 pa_log_warn("Recieved memblock frame with invalid flags value.");
720 return -1;
721 }
722 }
723
724 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
725 /* Frame payload available */
726
727 if (p->read.memblock && p->recieve_memblock_callback) {
728
729 /* Is this memblock data? Than pass it to the user */
730 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
731
732 if (l > 0) {
733 pa_memchunk chunk;
734
735 chunk.memblock = p->read.memblock;
736 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
737 chunk.length = l;
738
739 if (p->recieve_memblock_callback) {
740 int64_t offset;
741
742 offset = (int64_t) (
743 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
744 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
745
746 p->recieve_memblock_callback(
747 p,
748 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
749 offset,
750 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
751 &chunk,
752 p->recieve_memblock_callback_userdata);
753 }
754
755 /* Drop seek info for following callbacks */
756 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
757 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
758 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
759 }
760 }
761
762 /* Frame complete */
763 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
764
765 if (p->read.memblock) {
766
767 /* This was a memblock frame. We can unref the memblock now */
768 pa_memblock_unref(p->read.memblock);
769
770 } else if (p->read.packet) {
771
772 if (p->recieve_packet_callback)
773 #ifdef HAVE_CREDS
774 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
775 #else
776 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
777 #endif
778
779 pa_packet_unref(p->read.packet);
780 } else {
781 pa_memblock *b;
782
783 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
784
785 pa_assert(p->import);
786
787 if (!(b = pa_memimport_get(p->import,
788 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
789 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
790 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
791 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
792
793 pa_log_warn("Failed to import memory block.");
794 return -1;
795 }
796
797 if (p->recieve_memblock_callback) {
798 int64_t offset;
799 pa_memchunk chunk;
800
801 chunk.memblock = b;
802 chunk.index = 0;
803 chunk.length = pa_memblock_get_length(b);
804
805 offset = (int64_t) (
806 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
807 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
808
809 p->recieve_memblock_callback(
810 p,
811 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
812 offset,
813 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
814 &chunk,
815 p->recieve_memblock_callback_userdata);
816 }
817
818 pa_memblock_unref(b);
819 }
820
821 goto frame_done;
822 }
823 }
824
825 return 0;
826
827 frame_done:
828 p->read.memblock = NULL;
829 p->read.packet = NULL;
830 p->read.index = 0;
831 p->read.data = NULL;
832
833 #ifdef HAVE_CREDS
834 p->read_creds_valid = 0;
835 #endif
836
837 return 0;
838
839 fail:
840 if (release_memblock)
841 pa_memblock_release(release_memblock);
842
843 return -1;
844 }
845
846 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
847 pa_assert(p);
848 pa_assert(PA_REFCNT_VALUE(p) > 0);
849
850 p->die_callback = cb;
851 p->die_callback_userdata = userdata;
852 }
853
854 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
855 pa_assert(p);
856 pa_assert(PA_REFCNT_VALUE(p) > 0);
857
858 p->drain_callback = cb;
859 p->drain_callback_userdata = userdata;
860 }
861
862 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
863 pa_assert(p);
864 pa_assert(PA_REFCNT_VALUE(p) > 0);
865
866 p->recieve_packet_callback = cb;
867 p->recieve_packet_callback_userdata = userdata;
868 }
869
870 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
871 pa_assert(p);
872 pa_assert(PA_REFCNT_VALUE(p) > 0);
873
874 p->recieve_memblock_callback = cb;
875 p->recieve_memblock_callback_userdata = userdata;
876 }
877
878 int pa_pstream_is_pending(pa_pstream *p) {
879 int b;
880
881 pa_assert(p);
882 pa_assert(PA_REFCNT_VALUE(p) > 0);
883
884 if (p->dead)
885 b = 0;
886 else
887 b = p->write.current || !pa_queue_is_empty(p->send_queue);
888
889 return b;
890 }
891
892 void pa_pstream_unref(pa_pstream*p) {
893 pa_assert(p);
894 pa_assert(PA_REFCNT_VALUE(p) > 0);
895
896 if (PA_REFCNT_DEC(p) <= 0)
897 pstream_free(p);
898 }
899
900 pa_pstream* pa_pstream_ref(pa_pstream*p) {
901 pa_assert(p);
902 pa_assert(PA_REFCNT_VALUE(p) > 0);
903
904 PA_REFCNT_INC(p);
905 return p;
906 }
907
908 void pa_pstream_unlink(pa_pstream *p) {
909 pa_assert(p);
910
911 if (p->dead)
912 return;
913
914 p->dead = 1;
915
916 if (p->import) {
917 pa_memimport_free(p->import);
918 p->import = NULL;
919 }
920
921 if (p->export) {
922 pa_memexport_free(p->export);
923 p->export = NULL;
924 }
925
926 if (p->io) {
927 pa_iochannel_free(p->io);
928 p->io = NULL;
929 }
930
931 if (p->defer_event) {
932 p->mainloop->defer_free(p->defer_event);
933 p->defer_event = NULL;
934 }
935
936 p->die_callback = NULL;
937 p->drain_callback = NULL;
938 p->recieve_packet_callback = NULL;
939 p->recieve_memblock_callback = NULL;
940 }
941
942 void pa_pstream_use_shm(pa_pstream *p, int enable) {
943 pa_assert(p);
944 pa_assert(PA_REFCNT_VALUE(p) > 0);
945
946 p->use_shm = enable;
947
948 if (enable) {
949
950 if (!p->export)
951 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
952
953 } else {
954
955 if (p->export) {
956 pa_memexport_free(p->export);
957 p->export = NULL;
958 }
959 }
960 }