]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
add callbacks for the revoke/release stuff, so that we can make this thing thread...
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
35 #endif
36 #ifdef HAVE_SYS_UN_H
37 #include <sys/un.h>
38 #endif
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
41 #endif
42
43 #include "winsock.h"
44
45 #include <pulse/xmalloc.h>
46
47 #include <pulsecore/queue.h>
48 #include <pulsecore/log.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/creds.h>
51 #include <pulsecore/refcnt.h>
52
53 #include "pstream.h"
54
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA 0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE 0xC0000000LU
59 #define PA_FLAG_SHMMASK 0xFF000000LU
60 #define PA_FLAG_SEEKMASK 0x000000FFLU
61
62 /* The sequence descriptor header consists of 5 32bit integers: */
63 enum {
64 PA_PSTREAM_DESCRIPTOR_LENGTH,
65 PA_PSTREAM_DESCRIPTOR_CHANNEL,
66 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
68 PA_PSTREAM_DESCRIPTOR_FLAGS,
69 PA_PSTREAM_DESCRIPTOR_MAX
70 };
71
72 /* If we have an SHM block, this info follows the descriptor */
73 enum {
74 PA_PSTREAM_SHM_BLOCKID,
75 PA_PSTREAM_SHM_SHMID,
76 PA_PSTREAM_SHM_INDEX,
77 PA_PSTREAM_SHM_LENGTH,
78 PA_PSTREAM_SHM_MAX
79 };
80
81 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
82
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
85 #define FRAME_SIZE_MAX_USE (1024*64)
86
87 struct item_info {
88 enum {
89 PA_PSTREAM_ITEM_PACKET,
90 PA_PSTREAM_ITEM_MEMBLOCK,
91 PA_PSTREAM_ITEM_SHMRELEASE,
92 PA_PSTREAM_ITEM_SHMREVOKE
93 } type;
94
95
96 /* packet info */
97 pa_packet *packet;
98 #ifdef HAVE_CREDS
99 int with_creds;
100 pa_creds creds;
101 #endif
102
103 /* memblock info */
104 pa_memchunk chunk;
105 uint32_t channel;
106 int64_t offset;
107 pa_seek_mode_t seek_mode;
108
109 /* release/revoke info */
110 uint32_t block_id;
111 };
112
113 struct pa_pstream {
114 PA_REFCNT_DECLARE;
115
116 pa_mainloop_api *mainloop;
117 pa_defer_event *defer_event;
118 pa_iochannel *io;
119
120 pa_queue *send_queue;
121
122 int dead;
123
124 struct {
125 pa_pstream_descriptor descriptor;
126 struct item_info* current;
127 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
128 void *data;
129 size_t index;
130 pa_memchunk memchunk;
131 } write;
132
133 struct {
134 pa_pstream_descriptor descriptor;
135 pa_memblock *memblock;
136 pa_packet *packet;
137 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
138 void *data;
139 size_t index;
140 } read;
141
142 int use_shm;
143 pa_memimport *import;
144 pa_memexport *export;
145
146 pa_pstream_packet_cb_t recieve_packet_callback;
147 void *recieve_packet_callback_userdata;
148
149 pa_pstream_memblock_cb_t recieve_memblock_callback;
150 void *recieve_memblock_callback_userdata;
151
152 pa_pstream_notify_cb_t drain_callback;
153 void *drain_callback_userdata;
154
155 pa_pstream_notify_cb_t die_callback;
156 void *die_callback_userdata;
157
158 pa_pstream_block_id_cb_t revoke_callback;
159 void *revoke_callback_userdata;
160
161 pa_pstream_block_id_cb_t release_callback;
162 void *release_callback_userdata;
163
164 pa_mempool *mempool;
165
166 #ifdef HAVE_CREDS
167 pa_creds read_creds, write_creds;
168 int read_creds_valid, send_creds_now;
169 #endif
170 };
171
172 static int do_write(pa_pstream *p);
173 static int do_read(pa_pstream *p);
174
175 static void do_something(pa_pstream *p) {
176 pa_assert(p);
177 pa_assert(PA_REFCNT_VALUE(p) > 0);
178
179 pa_pstream_ref(p);
180
181 p->mainloop->defer_enable(p->defer_event, 0);
182
183 if (!p->dead && pa_iochannel_is_readable(p->io)) {
184 if (do_read(p) < 0)
185 goto fail;
186 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
187 goto fail;
188
189 if (!p->dead && pa_iochannel_is_writable(p->io)) {
190 if (do_write(p) < 0)
191 goto fail;
192 }
193
194 pa_pstream_unref(p);
195 return;
196
197 fail:
198
199 if (p->die_callback)
200 p->die_callback(p, p->die_callback_userdata);
201
202 pa_pstream_unlink(p);
203 pa_pstream_unref(p);
204 }
205
206 static void io_callback(pa_iochannel*io, void *userdata) {
207 pa_pstream *p = userdata;
208
209 pa_assert(p);
210 pa_assert(PA_REFCNT_VALUE(p) > 0);
211 pa_assert(p->io == io);
212
213 do_something(p);
214 }
215
216 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
217 pa_pstream *p = userdata;
218
219 pa_assert(p);
220 pa_assert(PA_REFCNT_VALUE(p) > 0);
221 pa_assert(p->defer_event == e);
222 pa_assert(p->mainloop == m);
223
224 do_something(p);
225 }
226
227 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
228
229 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
230 pa_pstream *p;
231
232 pa_assert(m);
233 pa_assert(io);
234 pa_assert(pool);
235
236 p = pa_xnew(pa_pstream, 1);
237 PA_REFCNT_INIT(p);
238 p->io = io;
239 pa_iochannel_set_callback(io, io_callback, p);
240 p->dead = 0;
241
242 p->mainloop = m;
243 p->defer_event = m->defer_new(m, defer_callback, p);
244 m->defer_enable(p->defer_event, 0);
245
246 p->send_queue = pa_queue_new();
247
248 p->write.current = NULL;
249 p->write.index = 0;
250 pa_memchunk_reset(&p->write.memchunk);
251 p->read.memblock = NULL;
252 p->read.packet = NULL;
253 p->read.index = 0;
254
255 p->recieve_packet_callback = NULL;
256 p->recieve_packet_callback_userdata = NULL;
257 p->recieve_memblock_callback = NULL;
258 p->recieve_memblock_callback_userdata = NULL;
259 p->drain_callback = NULL;
260 p->drain_callback_userdata = NULL;
261 p->die_callback = NULL;
262 p->die_callback_userdata = NULL;
263 p->revoke_callback = NULL;
264 p->revoke_callback_userdata = NULL;
265 p->release_callback = NULL;
266 p->release_callback_userdata = NULL;
267
268 p->mempool = pool;
269
270 p->use_shm = 0;
271 p->export = NULL;
272
273 /* We do importing unconditionally */
274 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
275
276 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
277 pa_iochannel_socket_set_sndbuf(io, 1024*8);
278
279 #ifdef HAVE_CREDS
280 p->send_creds_now = 0;
281 p->read_creds_valid = 0;
282 #endif
283 return p;
284 }
285
286 static void item_free(void *item, PA_GCC_UNUSED void *q) {
287 struct item_info *i = item;
288 pa_assert(i);
289
290 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
291 pa_assert(i->chunk.memblock);
292 pa_memblock_unref(i->chunk.memblock);
293 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
294 pa_assert(i->packet);
295 pa_packet_unref(i->packet);
296 }
297
298 pa_xfree(i);
299 }
300
301 static void pstream_free(pa_pstream *p) {
302 pa_assert(p);
303
304 pa_pstream_unlink(p);
305
306 pa_queue_free(p->send_queue, item_free, NULL);
307
308 if (p->write.current)
309 item_free(p->write.current, NULL);
310
311 if (p->write.memchunk.memblock)
312 pa_memblock_unref(p->write.memchunk.memblock);
313
314 if (p->read.memblock)
315 pa_memblock_unref(p->read.memblock);
316
317 if (p->read.packet)
318 pa_packet_unref(p->read.packet);
319
320 pa_xfree(p);
321 }
322
323 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
324 struct item_info *i;
325
326 pa_assert(p);
327 pa_assert(PA_REFCNT_VALUE(p) > 0);
328 pa_assert(packet);
329
330 if (p->dead)
331 return;
332
333 i = pa_xnew(struct item_info, 1);
334 i->type = PA_PSTREAM_ITEM_PACKET;
335 i->packet = pa_packet_ref(packet);
336
337 #ifdef HAVE_CREDS
338 if ((i->with_creds = !!creds))
339 i->creds = *creds;
340 #endif
341
342 pa_queue_push(p->send_queue, i);
343
344 p->mainloop->defer_enable(p->defer_event, 1);
345 }
346
347 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
348 size_t length, idx;
349
350 pa_assert(p);
351 pa_assert(PA_REFCNT_VALUE(p) > 0);
352 pa_assert(channel != (uint32_t) -1);
353 pa_assert(chunk);
354
355 if (p->dead)
356 return;
357
358 idx = 0;
359 length = chunk->length;
360
361 while (length > 0) {
362 struct item_info *i;
363 size_t n;
364
365 i = pa_xnew(struct item_info, 1);
366 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
367
368 n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
369 i->chunk.index = chunk->index + idx;
370 i->chunk.length = n;
371 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
372
373 i->channel = channel;
374 i->offset = offset;
375 i->seek_mode = seek_mode;
376 #ifdef HAVE_CREDS
377 i->with_creds = 0;
378 #endif
379
380 pa_queue_push(p->send_queue, i);
381
382 idx += n;
383 length -= n;
384 }
385
386 p->mainloop->defer_enable(p->defer_event, 1);
387 }
388
389 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
390 struct item_info *item;
391 pa_assert(p);
392 pa_assert(PA_REFCNT_VALUE(p) > 0);
393
394 if (p->dead)
395 return;
396
397 /* pa_log("Releasing block %u", block_id); */
398
399 item = pa_xnew(struct item_info, 1);
400 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
401 item->block_id = block_id;
402 #ifdef HAVE_CREDS
403 item->with_creds = 0;
404 #endif
405
406 pa_queue_push(p->send_queue, item);
407 p->mainloop->defer_enable(p->defer_event, 1);
408 }
409
410 /* might be called from thread context */
411 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
412 pa_pstream *p = userdata;
413
414 pa_assert(p);
415 pa_assert(PA_REFCNT_VALUE(p) > 0);
416
417 if (p->dead)
418 return;
419
420 if (p->release_callback)
421 p->release_callback(p, block_id, p->release_callback_userdata);
422 else
423 pa_pstream_send_release(p, block_id);
424 }
425
426 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
427 struct item_info *item;
428 pa_assert(p);
429 pa_assert(PA_REFCNT_VALUE(p) > 0);
430
431 if (p->dead)
432 return;
433 /* pa_log("Revoking block %u", block_id); */
434
435 item = pa_xnew(struct item_info, 1);
436 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
437 item->block_id = block_id;
438 #ifdef HAVE_CREDS
439 item->with_creds = 0;
440 #endif
441
442 pa_queue_push(p->send_queue, item);
443 p->mainloop->defer_enable(p->defer_event, 1);
444 }
445
446 /* might be called from thread context */
447 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
448 pa_pstream *p = userdata;
449
450 pa_assert(p);
451 pa_assert(PA_REFCNT_VALUE(p) > 0);
452
453 if (p->revoke_callback)
454 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
455 else
456 pa_pstream_send_revoke(p, block_id);
457 }
458
459 static void prepare_next_write_item(pa_pstream *p) {
460 pa_assert(p);
461 pa_assert(PA_REFCNT_VALUE(p) > 0);
462
463 p->write.current = pa_queue_pop(p->send_queue);
464
465 if (!p->write.current)
466 return;
467
468 p->write.index = 0;
469 p->write.data = NULL;
470 pa_memchunk_reset(&p->write.memchunk);
471
472 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
473 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
474 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
475 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
476 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
477
478 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
479
480 pa_assert(p->write.current->packet);
481 p->write.data = p->write.current->packet->data;
482 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
483
484 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
485
486 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
487 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
488
489 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
490
491 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
492 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
493
494 } else {
495 uint32_t flags;
496 int send_payload = 1;
497
498 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
499 pa_assert(p->write.current->chunk.memblock);
500
501 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
502 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
503 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
504
505 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
506
507 if (p->use_shm) {
508 uint32_t block_id, shm_id;
509 size_t offset, length;
510
511 pa_assert(p->export);
512
513 if (pa_memexport_put(p->export,
514 p->write.current->chunk.memblock,
515 &block_id,
516 &shm_id,
517 &offset,
518 &length) >= 0) {
519
520 flags |= PA_FLAG_SHMDATA;
521 send_payload = 0;
522
523 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
524 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
525 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
526 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
527
528 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
529 p->write.data = p->write.shm_info;
530 }
531 /* else */
532 /* pa_log_warn("Failed to export memory block."); */
533 }
534
535 if (send_payload) {
536 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
537 p->write.memchunk = p->write.current->chunk;
538 pa_memblock_ref(p->write.memchunk.memblock);
539 p->write.data = NULL;
540 }
541
542 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
543 }
544
545 #ifdef HAVE_CREDS
546 if ((p->send_creds_now = p->write.current->with_creds))
547 p->write_creds = p->write.current->creds;
548 #endif
549 }
550
551 static int do_write(pa_pstream *p) {
552 void *d;
553 size_t l;
554 ssize_t r;
555 pa_memblock *release_memblock = NULL;
556
557 pa_assert(p);
558 pa_assert(PA_REFCNT_VALUE(p) > 0);
559
560 if (!p->write.current)
561 prepare_next_write_item(p);
562
563 if (!p->write.current)
564 return 0;
565
566 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
567 d = (uint8_t*) p->write.descriptor + p->write.index;
568 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
569 } else {
570 pa_assert(p->write.data || p->write.memchunk.memblock);
571
572 if (p->write.data)
573 d = p->write.data;
574 else {
575 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
576 release_memblock = p->write.memchunk.memblock;
577 }
578
579 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
580 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
581 }
582
583 pa_assert(l > 0);
584
585 #ifdef HAVE_CREDS
586 if (p->send_creds_now) {
587
588 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
589 goto fail;
590
591 p->send_creds_now = 0;
592 } else
593 #endif
594
595 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
596 goto fail;
597
598 if (release_memblock)
599 pa_memblock_release(release_memblock);
600
601 p->write.index += r;
602
603 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
604 pa_assert(p->write.current);
605 item_free(p->write.current, NULL);
606 p->write.current = NULL;
607
608 if (p->write.memchunk.memblock)
609 pa_memblock_unref(p->write.memchunk.memblock);
610
611 pa_memchunk_reset(&p->write.memchunk);
612
613 if (p->drain_callback && !pa_pstream_is_pending(p))
614 p->drain_callback(p, p->drain_callback_userdata);
615 }
616
617 return 0;
618
619 fail:
620
621 if (release_memblock)
622 pa_memblock_release(release_memblock);
623
624 return -1;
625 }
626
627 static int do_read(pa_pstream *p) {
628 void *d;
629 size_t l;
630 ssize_t r;
631 pa_memblock *release_memblock = NULL;
632 pa_assert(p);
633 pa_assert(PA_REFCNT_VALUE(p) > 0);
634
635 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
636 d = (uint8_t*) p->read.descriptor + p->read.index;
637 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
638 } else {
639 pa_assert(p->read.data || p->read.memblock);
640
641 if (p->read.data)
642 d = p->read.data;
643 else {
644 d = pa_memblock_acquire(p->read.memblock);
645 release_memblock = p->read.memblock;
646 }
647
648 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
649 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
650 }
651
652 #ifdef HAVE_CREDS
653 {
654 int b = 0;
655
656 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
657 goto fail;
658
659 p->read_creds_valid = p->read_creds_valid || b;
660 }
661 #else
662 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
663 goto fail;
664 #endif
665
666 if (release_memblock)
667 pa_memblock_release(release_memblock);
668
669 p->read.index += r;
670
671 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
672 uint32_t flags, length, channel;
673 /* Reading of frame descriptor complete */
674
675 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
676
677 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
678 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
679 return -1;
680 }
681
682 if (flags == PA_FLAG_SHMRELEASE) {
683
684 /* This is a SHM memblock release frame with no payload */
685
686 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
687
688 pa_assert(p->export);
689 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
690
691 goto frame_done;
692
693 } else if (flags == PA_FLAG_SHMREVOKE) {
694
695 /* This is a SHM memblock revoke frame with no payload */
696
697 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
698
699 pa_assert(p->import);
700 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
701
702 goto frame_done;
703 }
704
705 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
706
707 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
708 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length);
709 return -1;
710 }
711
712 pa_assert(!p->read.packet && !p->read.memblock);
713
714 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
715
716 if (channel == (uint32_t) -1) {
717
718 if (flags != 0) {
719 pa_log_warn("Received packet frame with invalid flags value.");
720 return -1;
721 }
722
723 /* Frame is a packet frame */
724 p->read.packet = pa_packet_new(length);
725 p->read.data = p->read.packet->data;
726
727 } else {
728
729 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
730 pa_log_warn("Received memblock frame with invalid seek mode.");
731 return -1;
732 }
733
734 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
735
736 if (length != sizeof(p->read.shm_info)) {
737 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
738 return -1;
739 }
740
741 /* Frame is a memblock frame referencing an SHM memblock */
742 p->read.data = p->read.shm_info;
743
744 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
745
746 /* Frame is a memblock frame */
747
748 p->read.memblock = pa_memblock_new(p->mempool, length);
749 p->read.data = NULL;
750 } else {
751
752 pa_log_warn("Recieved memblock frame with invalid flags value.");
753 return -1;
754 }
755 }
756
757 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
758 /* Frame payload available */
759
760 if (p->read.memblock && p->recieve_memblock_callback) {
761
762 /* Is this memblock data? Than pass it to the user */
763 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
764
765 if (l > 0) {
766 pa_memchunk chunk;
767
768 chunk.memblock = p->read.memblock;
769 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
770 chunk.length = l;
771
772 if (p->recieve_memblock_callback) {
773 int64_t offset;
774
775 offset = (int64_t) (
776 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
777 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
778
779 p->recieve_memblock_callback(
780 p,
781 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
782 offset,
783 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
784 &chunk,
785 p->recieve_memblock_callback_userdata);
786 }
787
788 /* Drop seek info for following callbacks */
789 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
790 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
791 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
792 }
793 }
794
795 /* Frame complete */
796 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
797
798 if (p->read.memblock) {
799
800 /* This was a memblock frame. We can unref the memblock now */
801 pa_memblock_unref(p->read.memblock);
802
803 } else if (p->read.packet) {
804
805 if (p->recieve_packet_callback)
806 #ifdef HAVE_CREDS
807 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
808 #else
809 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
810 #endif
811
812 pa_packet_unref(p->read.packet);
813 } else {
814 pa_memblock *b;
815
816 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
817
818 pa_assert(p->import);
819
820 if (!(b = pa_memimport_get(p->import,
821 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
822 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
823 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
824 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
825
826 pa_log_warn("Failed to import memory block.");
827 return -1;
828 }
829
830 if (p->recieve_memblock_callback) {
831 int64_t offset;
832 pa_memchunk chunk;
833
834 chunk.memblock = b;
835 chunk.index = 0;
836 chunk.length = pa_memblock_get_length(b);
837
838 offset = (int64_t) (
839 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
840 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
841
842 p->recieve_memblock_callback(
843 p,
844 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
845 offset,
846 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
847 &chunk,
848 p->recieve_memblock_callback_userdata);
849 }
850
851 pa_memblock_unref(b);
852 }
853
854 goto frame_done;
855 }
856 }
857
858 return 0;
859
860 frame_done:
861 p->read.memblock = NULL;
862 p->read.packet = NULL;
863 p->read.index = 0;
864 p->read.data = NULL;
865
866 #ifdef HAVE_CREDS
867 p->read_creds_valid = 0;
868 #endif
869
870 return 0;
871
872 fail:
873 if (release_memblock)
874 pa_memblock_release(release_memblock);
875
876 return -1;
877 }
878
879 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
880 pa_assert(p);
881 pa_assert(PA_REFCNT_VALUE(p) > 0);
882
883 p->die_callback = cb;
884 p->die_callback_userdata = userdata;
885 }
886
887 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
888 pa_assert(p);
889 pa_assert(PA_REFCNT_VALUE(p) > 0);
890
891 p->drain_callback = cb;
892 p->drain_callback_userdata = userdata;
893 }
894
895 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
896 pa_assert(p);
897 pa_assert(PA_REFCNT_VALUE(p) > 0);
898
899 p->recieve_packet_callback = cb;
900 p->recieve_packet_callback_userdata = userdata;
901 }
902
903 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
904 pa_assert(p);
905 pa_assert(PA_REFCNT_VALUE(p) > 0);
906
907 p->recieve_memblock_callback = cb;
908 p->recieve_memblock_callback_userdata = userdata;
909 }
910
911 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
912 pa_assert(p);
913 pa_assert(PA_REFCNT_VALUE(p) > 0);
914
915 p->release_callback = cb;
916 p->release_callback_userdata = userdata;
917 }
918
919 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
920 pa_assert(p);
921 pa_assert(PA_REFCNT_VALUE(p) > 0);
922
923 p->release_callback = cb;
924 p->release_callback_userdata = userdata;
925 }
926
927 int pa_pstream_is_pending(pa_pstream *p) {
928 int b;
929
930 pa_assert(p);
931 pa_assert(PA_REFCNT_VALUE(p) > 0);
932
933 if (p->dead)
934 b = 0;
935 else
936 b = p->write.current || !pa_queue_is_empty(p->send_queue);
937
938 return b;
939 }
940
941 void pa_pstream_unref(pa_pstream*p) {
942 pa_assert(p);
943 pa_assert(PA_REFCNT_VALUE(p) > 0);
944
945 if (PA_REFCNT_DEC(p) <= 0)
946 pstream_free(p);
947 }
948
949 pa_pstream* pa_pstream_ref(pa_pstream*p) {
950 pa_assert(p);
951 pa_assert(PA_REFCNT_VALUE(p) > 0);
952
953 PA_REFCNT_INC(p);
954 return p;
955 }
956
957 void pa_pstream_unlink(pa_pstream *p) {
958 pa_assert(p);
959
960 if (p->dead)
961 return;
962
963 p->dead = 1;
964
965 if (p->import) {
966 pa_memimport_free(p->import);
967 p->import = NULL;
968 }
969
970 if (p->export) {
971 pa_memexport_free(p->export);
972 p->export = NULL;
973 }
974
975 if (p->io) {
976 pa_iochannel_free(p->io);
977 p->io = NULL;
978 }
979
980 if (p->defer_event) {
981 p->mainloop->defer_free(p->defer_event);
982 p->defer_event = NULL;
983 }
984
985 p->die_callback = NULL;
986 p->drain_callback = NULL;
987 p->recieve_packet_callback = NULL;
988 p->recieve_memblock_callback = NULL;
989 }
990
991 void pa_pstream_use_shm(pa_pstream *p, int enable) {
992 pa_assert(p);
993 pa_assert(PA_REFCNT_VALUE(p) > 0);
994
995 p->use_shm = enable;
996
997 if (enable) {
998
999 if (!p->export)
1000 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1001
1002 } else {
1003
1004 if (p->export) {
1005 pa_memexport_free(p->export);
1006 p->export = NULL;
1007 }
1008 }
1009 }