]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
initialize 'length' properly
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
35 #endif
36 #ifdef HAVE_SYS_UN_H
37 #include <sys/un.h>
38 #endif
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
41 #endif
42
43 #include "winsock.h"
44
45 #include <pulse/xmalloc.h>
46
47 #include <pulsecore/queue.h>
48 #include <pulsecore/log.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/creds.h>
51 #include <pulsecore/refcnt.h>
52
53 #include "pstream.h"
54
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA 0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE 0xC0000000LU
59 #define PA_FLAG_SHMMASK 0xFF000000LU
60 #define PA_FLAG_SEEKMASK 0x000000FFLU
61
62 /* The sequence descriptor header consists of 5 32bit integers: */
63 enum {
64 PA_PSTREAM_DESCRIPTOR_LENGTH,
65 PA_PSTREAM_DESCRIPTOR_CHANNEL,
66 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
68 PA_PSTREAM_DESCRIPTOR_FLAGS,
69 PA_PSTREAM_DESCRIPTOR_MAX
70 };
71
72 /* If we have an SHM block, this info follows the descriptor */
73 enum {
74 PA_PSTREAM_SHM_BLOCKID,
75 PA_PSTREAM_SHM_SHMID,
76 PA_PSTREAM_SHM_INDEX,
77 PA_PSTREAM_SHM_LENGTH,
78 PA_PSTREAM_SHM_MAX
79 };
80
81 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
82
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
85 #define FRAME_SIZE_MAX_USE (1024*64)
86
87 struct item_info {
88 enum {
89 PA_PSTREAM_ITEM_PACKET,
90 PA_PSTREAM_ITEM_MEMBLOCK,
91 PA_PSTREAM_ITEM_SHMRELEASE,
92 PA_PSTREAM_ITEM_SHMREVOKE
93 } type;
94
95
96 /* packet info */
97 pa_packet *packet;
98 #ifdef HAVE_CREDS
99 int with_creds;
100 pa_creds creds;
101 #endif
102
103 /* memblock info */
104 pa_memchunk chunk;
105 uint32_t channel;
106 int64_t offset;
107 pa_seek_mode_t seek_mode;
108
109 /* release/revoke info */
110 uint32_t block_id;
111 };
112
113 struct pa_pstream {
114 PA_REFCNT_DECLARE;
115
116 pa_mainloop_api *mainloop;
117 pa_defer_event *defer_event;
118 pa_iochannel *io;
119
120 pa_queue *send_queue;
121
122 int dead;
123
124 struct {
125 pa_pstream_descriptor descriptor;
126 struct item_info* current;
127 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
128 void *data;
129 size_t index;
130 pa_memchunk memchunk;
131 } write;
132
133 struct {
134 pa_pstream_descriptor descriptor;
135 pa_memblock *memblock;
136 pa_packet *packet;
137 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
138 void *data;
139 size_t index;
140 } read;
141
142 int use_shm;
143 pa_memimport *import;
144 pa_memexport *export;
145
146 pa_pstream_packet_cb_t recieve_packet_callback;
147 void *recieve_packet_callback_userdata;
148
149 pa_pstream_memblock_cb_t recieve_memblock_callback;
150 void *recieve_memblock_callback_userdata;
151
152 pa_pstream_notify_cb_t drain_callback;
153 void *drain_callback_userdata;
154
155 pa_pstream_notify_cb_t die_callback;
156 void *die_callback_userdata;
157
158 pa_mempool *mempool;
159
160 #ifdef HAVE_CREDS
161 pa_creds read_creds, write_creds;
162 int read_creds_valid, send_creds_now;
163 #endif
164 };
165
166 static int do_write(pa_pstream *p);
167 static int do_read(pa_pstream *p);
168
169 static void do_something(pa_pstream *p) {
170 pa_assert(p);
171 pa_assert(PA_REFCNT_VALUE(p) > 0);
172
173 pa_pstream_ref(p);
174
175 p->mainloop->defer_enable(p->defer_event, 0);
176
177 if (!p->dead && pa_iochannel_is_readable(p->io)) {
178 if (do_read(p) < 0)
179 goto fail;
180 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
181 goto fail;
182
183 if (!p->dead && pa_iochannel_is_writable(p->io)) {
184 if (do_write(p) < 0)
185 goto fail;
186 }
187
188 pa_pstream_unref(p);
189 return;
190
191 fail:
192
193 if (p->die_callback)
194 p->die_callback(p, p->die_callback_userdata);
195
196 pa_pstream_unlink(p);
197 pa_pstream_unref(p);
198 }
199
200 static void io_callback(pa_iochannel*io, void *userdata) {
201 pa_pstream *p = userdata;
202
203 pa_assert(p);
204 pa_assert(PA_REFCNT_VALUE(p) > 0);
205 pa_assert(p->io == io);
206
207 do_something(p);
208 }
209
210 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
211 pa_pstream *p = userdata;
212
213 pa_assert(p);
214 pa_assert(PA_REFCNT_VALUE(p) > 0);
215 pa_assert(p->defer_event == e);
216 pa_assert(p->mainloop == m);
217
218 do_something(p);
219 }
220
221 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
222
223 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
224 pa_pstream *p;
225
226 pa_assert(m);
227 pa_assert(io);
228 pa_assert(pool);
229
230 p = pa_xnew(pa_pstream, 1);
231 PA_REFCNT_INIT(p);
232 p->io = io;
233 pa_iochannel_set_callback(io, io_callback, p);
234 p->dead = 0;
235
236 p->mainloop = m;
237 p->defer_event = m->defer_new(m, defer_callback, p);
238 m->defer_enable(p->defer_event, 0);
239
240 p->send_queue = pa_queue_new();
241 pa_assert(p->send_queue);
242
243 p->write.current = NULL;
244 p->write.index = 0;
245 pa_memchunk_reset(&p->write.memchunk);
246 p->read.memblock = NULL;
247 p->read.packet = NULL;
248 p->read.index = 0;
249
250 p->recieve_packet_callback = NULL;
251 p->recieve_packet_callback_userdata = NULL;
252 p->recieve_memblock_callback = NULL;
253 p->recieve_memblock_callback_userdata = NULL;
254 p->drain_callback = NULL;
255 p->drain_callback_userdata = NULL;
256 p->die_callback = NULL;
257 p->die_callback_userdata = NULL;
258
259 p->mempool = pool;
260
261 p->use_shm = 0;
262 p->export = NULL;
263
264 /* We do importing unconditionally */
265 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
266
267 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
268 pa_iochannel_socket_set_sndbuf(io, 1024*8);
269
270 #ifdef HAVE_CREDS
271 p->send_creds_now = 0;
272 p->read_creds_valid = 0;
273 #endif
274 return p;
275 }
276
277 static void item_free(void *item, PA_GCC_UNUSED void *p) {
278 struct item_info *i = item;
279 pa_assert(i);
280
281 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
282 pa_assert(i->chunk.memblock);
283 pa_memblock_unref(i->chunk.memblock);
284 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
285 pa_assert(i->packet);
286 pa_packet_unref(i->packet);
287 }
288
289 pa_xfree(i);
290 }
291
292 static void pstream_free(pa_pstream *p) {
293 pa_assert(p);
294
295 pa_pstream_unlink(p);
296
297 pa_queue_free(p->send_queue, item_free, NULL);
298
299 if (p->write.current)
300 item_free(p->write.current, NULL);
301
302 if (p->read.memblock)
303 pa_memblock_unref(p->read.memblock);
304
305 if (p->read.packet)
306 pa_packet_unref(p->read.packet);
307
308 if (p->write.memchunk.memblock)
309 pa_memblock_unref(p->write.memchunk.memblock);
310
311 pa_xfree(p);
312 }
313
314 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
315 struct item_info *i;
316
317 pa_assert(p);
318 pa_assert(PA_REFCNT_VALUE(p) > 0);
319 pa_assert(packet);
320
321 if (p->dead)
322 return;
323
324 i = pa_xnew(struct item_info, 1);
325 i->type = PA_PSTREAM_ITEM_PACKET;
326 i->packet = pa_packet_ref(packet);
327
328 #ifdef HAVE_CREDS
329 if ((i->with_creds = !!creds))
330 i->creds = *creds;
331 #endif
332
333 pa_queue_push(p->send_queue, i);
334
335 p->mainloop->defer_enable(p->defer_event, 1);
336 }
337
338 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
339 size_t length, idx;
340
341 pa_assert(p);
342 pa_assert(PA_REFCNT_VALUE(p) > 0);
343 pa_assert(channel != (uint32_t) -1);
344 pa_assert(chunk);
345
346 if (p->dead)
347 return;
348
349 idx = 0;
350 length = chunk->length;
351
352 while (length > 0) {
353 struct item_info *i;
354 size_t n;
355
356 i = pa_xnew(struct item_info, 1);
357 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
358
359 n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
360 i->chunk.index = chunk->index + idx;
361 i->chunk.length = n;
362 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
363
364 i->channel = channel;
365 i->offset = offset;
366 i->seek_mode = seek_mode;
367 #ifdef HAVE_CREDS
368 i->with_creds = 0;
369 #endif
370
371 pa_queue_push(p->send_queue, i);
372
373 idx += n;
374 length -= n;
375 }
376
377 p->mainloop->defer_enable(p->defer_event, 1);
378 }
379
380 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
381 struct item_info *item;
382 pa_pstream *p = userdata;
383
384 pa_assert(p);
385 pa_assert(PA_REFCNT_VALUE(p) > 0);
386
387 if (p->dead)
388 return;
389
390 /* pa_log("Releasing block %u", block_id); */
391
392 item = pa_xnew(struct item_info, 1);
393 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
394 item->block_id = block_id;
395 #ifdef HAVE_CREDS
396 item->with_creds = 0;
397 #endif
398
399 pa_queue_push(p->send_queue, item);
400 p->mainloop->defer_enable(p->defer_event, 1);
401 }
402
403 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
404 struct item_info *item;
405 pa_pstream *p = userdata;
406
407 pa_assert(p);
408 pa_assert(PA_REFCNT_VALUE(p) > 0);
409
410 if (p->dead)
411 return;
412 /* pa_log("Revoking block %u", block_id); */
413
414 item = pa_xnew(struct item_info, 1);
415 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
416 item->block_id = block_id;
417 #ifdef HAVE_CREDS
418 item->with_creds = 0;
419 #endif
420
421 pa_queue_push(p->send_queue, item);
422 p->mainloop->defer_enable(p->defer_event, 1);
423 }
424
425 static void prepare_next_write_item(pa_pstream *p) {
426 pa_assert(p);
427 pa_assert(PA_REFCNT_VALUE(p) > 0);
428
429 p->write.current = pa_queue_pop(p->send_queue);
430
431 if (!p->write.current)
432 return;
433
434 p->write.index = 0;
435 p->write.data = NULL;
436 pa_memchunk_reset(&p->write.memchunk);
437
438 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
439 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
440 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
441 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
442 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
443
444 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
445
446 pa_assert(p->write.current->packet);
447 p->write.data = p->write.current->packet->data;
448 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
449
450 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
451
452 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
453 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
454
455 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
456
457 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
458 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
459
460 } else {
461 uint32_t flags;
462 int send_payload = 1;
463
464 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
465 pa_assert(p->write.current->chunk.memblock);
466
467 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
468 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
469 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
470
471 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
472
473 if (p->use_shm) {
474 uint32_t block_id, shm_id;
475 size_t offset, length;
476
477 pa_assert(p->export);
478
479 if (pa_memexport_put(p->export,
480 p->write.current->chunk.memblock,
481 &block_id,
482 &shm_id,
483 &offset,
484 &length) >= 0) {
485
486 flags |= PA_FLAG_SHMDATA;
487 send_payload = 0;
488
489 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
490 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
491 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
492 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
493
494 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
495 p->write.data = p->write.shm_info;
496 }
497 /* else */
498 /* pa_log_warn("Failed to export memory block."); */
499 }
500
501 if (send_payload) {
502 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
503 p->write.memchunk = p->write.current->chunk;
504 pa_memblock_ref(p->write.memchunk.memblock);
505 p->write.data = NULL;
506 }
507
508 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
509 }
510
511 #ifdef HAVE_CREDS
512 if ((p->send_creds_now = p->write.current->with_creds))
513 p->write_creds = p->write.current->creds;
514 #endif
515 }
516
517 static int do_write(pa_pstream *p) {
518 void *d;
519 size_t l;
520 ssize_t r;
521 pa_memblock *release_memblock = NULL;
522
523 pa_assert(p);
524 pa_assert(PA_REFCNT_VALUE(p) > 0);
525
526 if (!p->write.current)
527 prepare_next_write_item(p);
528
529 if (!p->write.current)
530 return 0;
531
532 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
533 d = (uint8_t*) p->write.descriptor + p->write.index;
534 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
535 } else {
536 pa_assert(p->write.data || p->write.memchunk.memblock);
537
538 if (p->write.data)
539 d = p->write.data;
540 else {
541 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
542 release_memblock = p->write.memchunk.memblock;
543 }
544
545 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
546 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
547 }
548
549 pa_assert(l > 0);
550
551 #ifdef HAVE_CREDS
552 if (p->send_creds_now) {
553
554 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
555 goto fail;
556
557 p->send_creds_now = 0;
558 } else
559 #endif
560
561 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
562 goto fail;
563
564 if (release_memblock)
565 pa_memblock_release(release_memblock);
566
567 p->write.index += r;
568
569 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
570 pa_assert(p->write.current);
571 item_free(p->write.current, (void *) 1);
572 p->write.current = NULL;
573
574 if (p->drain_callback && !pa_pstream_is_pending(p))
575 p->drain_callback(p, p->drain_callback_userdata);
576 }
577
578 return 0;
579
580 fail:
581
582 if (release_memblock)
583 pa_memblock_release(release_memblock);
584
585 return -1;
586 }
587
588 static int do_read(pa_pstream *p) {
589 void *d;
590 size_t l;
591 ssize_t r;
592 pa_memblock *release_memblock = NULL;
593 pa_assert(p);
594 pa_assert(PA_REFCNT_VALUE(p) > 0);
595
596 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
597 d = (uint8_t*) p->read.descriptor + p->read.index;
598 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
599 } else {
600 pa_assert(p->read.data || p->read.memblock);
601
602 if (p->read.data)
603 d = p->read.data;
604 else {
605 d = pa_memblock_acquire(p->read.memblock);
606 release_memblock = p->read.memblock;
607 }
608
609 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
610 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
611 }
612
613 #ifdef HAVE_CREDS
614 {
615 int b = 0;
616
617 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
618 goto fail;
619
620 p->read_creds_valid = p->read_creds_valid || b;
621 }
622 #else
623 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
624 goto fail;
625 #endif
626
627 if (release_memblock)
628 pa_memblock_release(release_memblock);
629
630 p->read.index += r;
631
632 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
633 uint32_t flags, length, channel;
634 /* Reading of frame descriptor complete */
635
636 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
637
638 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
639 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
640 return -1;
641 }
642
643 if (flags == PA_FLAG_SHMRELEASE) {
644
645 /* This is a SHM memblock release frame with no payload */
646
647 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
648
649 pa_assert(p->export);
650 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
651
652 goto frame_done;
653
654 } else if (flags == PA_FLAG_SHMREVOKE) {
655
656 /* This is a SHM memblock revoke frame with no payload */
657
658 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
659
660 pa_assert(p->import);
661 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
662
663 goto frame_done;
664 }
665
666 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
667
668 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
669 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length);
670 return -1;
671 }
672
673 pa_assert(!p->read.packet && !p->read.memblock);
674
675 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
676
677 if (channel == (uint32_t) -1) {
678
679 if (flags != 0) {
680 pa_log_warn("Received packet frame with invalid flags value.");
681 return -1;
682 }
683
684 /* Frame is a packet frame */
685 p->read.packet = pa_packet_new(length);
686 p->read.data = p->read.packet->data;
687
688 } else {
689
690 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
691 pa_log_warn("Received memblock frame with invalid seek mode.");
692 return -1;
693 }
694
695 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
696
697 if (length != sizeof(p->read.shm_info)) {
698 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
699 return -1;
700 }
701
702 /* Frame is a memblock frame referencing an SHM memblock */
703 p->read.data = p->read.shm_info;
704
705 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
706
707 /* Frame is a memblock frame */
708
709 p->read.memblock = pa_memblock_new(p->mempool, length);
710 p->read.data = NULL;
711 } else {
712
713 pa_log_warn("Recieved memblock frame with invalid flags value.");
714 return -1;
715 }
716 }
717
718 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
719 /* Frame payload available */
720
721 if (p->read.memblock && p->recieve_memblock_callback) {
722
723 /* Is this memblock data? Than pass it to the user */
724 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
725
726 if (l > 0) {
727 pa_memchunk chunk;
728
729 chunk.memblock = p->read.memblock;
730 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
731 chunk.length = l;
732
733 if (p->recieve_memblock_callback) {
734 int64_t offset;
735
736 offset = (int64_t) (
737 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
738 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
739
740 p->recieve_memblock_callback(
741 p,
742 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
743 offset,
744 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
745 &chunk,
746 p->recieve_memblock_callback_userdata);
747 }
748
749 /* Drop seek info for following callbacks */
750 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
751 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
752 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
753 }
754 }
755
756 /* Frame complete */
757 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
758
759 if (p->read.memblock) {
760
761 /* This was a memblock frame. We can unref the memblock now */
762 pa_memblock_unref(p->read.memblock);
763
764 } else if (p->read.packet) {
765
766 if (p->recieve_packet_callback)
767 #ifdef HAVE_CREDS
768 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
769 #else
770 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
771 #endif
772
773 pa_packet_unref(p->read.packet);
774 } else {
775 pa_memblock *b;
776
777 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
778
779 pa_assert(p->import);
780
781 if (!(b = pa_memimport_get(p->import,
782 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
783 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
784 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
785 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
786
787 pa_log_warn("Failed to import memory block.");
788 return -1;
789 }
790
791 if (p->recieve_memblock_callback) {
792 int64_t offset;
793 pa_memchunk chunk;
794
795 chunk.memblock = b;
796 chunk.index = 0;
797 chunk.length = pa_memblock_get_length(b);
798
799 offset = (int64_t) (
800 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
801 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
802
803 p->recieve_memblock_callback(
804 p,
805 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
806 offset,
807 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
808 &chunk,
809 p->recieve_memblock_callback_userdata);
810 }
811
812 pa_memblock_unref(b);
813 }
814
815 goto frame_done;
816 }
817 }
818
819 return 0;
820
821 frame_done:
822 p->read.memblock = NULL;
823 p->read.packet = NULL;
824 p->read.index = 0;
825 p->read.data = NULL;
826
827 #ifdef HAVE_CREDS
828 p->read_creds_valid = 0;
829 #endif
830
831 return 0;
832
833 fail:
834 if (release_memblock)
835 pa_memblock_release(release_memblock);
836
837 return -1;
838 }
839
840 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
841 pa_assert(p);
842 pa_assert(PA_REFCNT_VALUE(p) > 0);
843
844 p->die_callback = cb;
845 p->die_callback_userdata = userdata;
846 }
847
848 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
849 pa_assert(p);
850 pa_assert(PA_REFCNT_VALUE(p) > 0);
851
852 p->drain_callback = cb;
853 p->drain_callback_userdata = userdata;
854 }
855
856 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
857 pa_assert(p);
858 pa_assert(PA_REFCNT_VALUE(p) > 0);
859
860 p->recieve_packet_callback = cb;
861 p->recieve_packet_callback_userdata = userdata;
862 }
863
864 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
865 pa_assert(p);
866 pa_assert(PA_REFCNT_VALUE(p) > 0);
867
868 p->recieve_memblock_callback = cb;
869 p->recieve_memblock_callback_userdata = userdata;
870 }
871
872 int pa_pstream_is_pending(pa_pstream *p) {
873 int b;
874
875 pa_assert(p);
876 pa_assert(PA_REFCNT_VALUE(p) > 0);
877
878 if (p->dead)
879 b = 0;
880 else
881 b = p->write.current || !pa_queue_is_empty(p->send_queue);
882
883 return b;
884 }
885
886 void pa_pstream_unref(pa_pstream*p) {
887 pa_assert(p);
888 pa_assert(PA_REFCNT_VALUE(p) > 0);
889
890 if (PA_REFCNT_DEC(p) <= 0)
891 pstream_free(p);
892 }
893
894 pa_pstream* pa_pstream_ref(pa_pstream*p) {
895 pa_assert(p);
896 pa_assert(PA_REFCNT_VALUE(p) > 0);
897
898 PA_REFCNT_INC(p);
899 return p;
900 }
901
902 void pa_pstream_unlink(pa_pstream *p) {
903 pa_assert(p);
904
905 if (p->dead)
906 return;
907
908 p->dead = 1;
909
910 if (p->import) {
911 pa_memimport_free(p->import);
912 p->import = NULL;
913 }
914
915 if (p->export) {
916 pa_memexport_free(p->export);
917 p->export = NULL;
918 }
919
920 if (p->io) {
921 pa_iochannel_free(p->io);
922 p->io = NULL;
923 }
924
925 if (p->defer_event) {
926 p->mainloop->defer_free(p->defer_event);
927 p->defer_event = NULL;
928 }
929
930 p->die_callback = NULL;
931 p->drain_callback = NULL;
932 p->recieve_packet_callback = NULL;
933 p->recieve_memblock_callback = NULL;
934 }
935
936 void pa_pstream_use_shm(pa_pstream *p, int enable) {
937 pa_assert(p);
938 pa_assert(PA_REFCNT_VALUE(p) > 0);
939
940 p->use_shm = enable;
941
942 if (enable) {
943
944 if (!p->export)
945 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
946
947 } else {
948
949 if (p->export) {
950 pa_memexport_free(p->export);
951 p->export = NULL;
952 }
953 }
954 }