]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
rework memory block management to be thread-safe and mostly lock-free.
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <assert.h>
29 #include <unistd.h>
30
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
33 #endif
34 #ifdef HAVE_SYS_UN_H
35 #include <sys/un.h>
36 #endif
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #endif
40
41 #include "winsock.h"
42
43 #include <pulse/xmalloc.h>
44
45 #include <pulsecore/queue.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-scache.h>
48 #include <pulsecore/creds.h>
49 #include <pulsecore/mutex.h>
50 #include <pulsecore/refcnt.h>
51 #include <pulsecore/anotify.h>
52
53 #include "pstream.h"
54
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA 0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE 0xC0000000LU
59 #define PA_FLAG_SHMMASK 0xFF000000LU
60 #define PA_FLAG_SEEKMASK 0x000000FFLU
61
62 /* The sequence descriptor header consists of 5 32bit integers: */
63 enum {
64 PA_PSTREAM_DESCRIPTOR_LENGTH,
65 PA_PSTREAM_DESCRIPTOR_CHANNEL,
66 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
68 PA_PSTREAM_DESCRIPTOR_FLAGS,
69 PA_PSTREAM_DESCRIPTOR_MAX
70 };
71
72 /* If we have an SHM block, this info follows the descriptor */
73 enum {
74 PA_PSTREAM_SHM_BLOCKID,
75 PA_PSTREAM_SHM_SHMID,
76 PA_PSTREAM_SHM_INDEX,
77 PA_PSTREAM_SHM_LENGTH,
78 PA_PSTREAM_SHM_MAX
79 };
80
81 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
82
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
85 #define FRAME_SIZE_MAX_USE (1024*64)
86
87 struct item_info {
88 enum {
89 PA_PSTREAM_ITEM_PACKET,
90 PA_PSTREAM_ITEM_MEMBLOCK,
91 PA_PSTREAM_ITEM_SHMRELEASE,
92 PA_PSTREAM_ITEM_SHMREVOKE
93 } type;
94
95
96 /* packet info */
97 pa_packet *packet;
98 #ifdef HAVE_CREDS
99 int with_creds;
100 pa_creds creds;
101 #endif
102
103 /* memblock info */
104 pa_memchunk chunk;
105 uint32_t channel;
106 int64_t offset;
107 pa_seek_mode_t seek_mode;
108
109 /* release/revoke info */
110 uint32_t block_id;
111 };
112
113 struct pa_pstream {
114 PA_REFCNT_DECLARE;
115
116 pa_mainloop_api *mainloop;
117 pa_iochannel *io;
118
119 pa_queue *send_queue;
120 pa_mutex *mutex; /* only for access to the queue */
121 pa_anotify *anotify;
122
123 int dead;
124
125 struct {
126 pa_pstream_descriptor descriptor;
127 struct item_info* current;
128 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
129 void *data;
130 size_t index;
131 pa_memchunk memchunk;
132 } write;
133
134 struct {
135 pa_pstream_descriptor descriptor;
136 pa_memblock *memblock;
137 pa_packet *packet;
138 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
139 void *data;
140 size_t index;
141 } read;
142
143 int use_shm;
144 pa_memimport *import;
145 pa_memexport *export;
146
147 pa_pstream_packet_cb_t recieve_packet_callback;
148 void *recieve_packet_callback_userdata;
149
150 pa_pstream_memblock_cb_t recieve_memblock_callback;
151 void *recieve_memblock_callback_userdata;
152
153 pa_pstream_notify_cb_t drain_callback;
154 void *drain_callback_userdata;
155
156 pa_pstream_notify_cb_t die_callback;
157 void *die_callback_userdata;
158
159 pa_mempool *mempool;
160
161 #ifdef HAVE_CREDS
162 pa_creds read_creds, write_creds;
163 int read_creds_valid, send_creds_now;
164 #endif
165 };
166
167 static int do_write(pa_pstream *p);
168 static int do_read(pa_pstream *p);
169
170 static void do_something(pa_pstream *p) {
171 assert(p);
172 assert(PA_REFCNT_VALUE(p) > 0);
173
174 pa_pstream_ref(p);
175
176 if (!p->dead && pa_iochannel_is_readable(p->io)) {
177 if (do_read(p) < 0)
178 goto fail;
179 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
180 goto fail;
181
182 if (!p->dead && pa_iochannel_is_writable(p->io)) {
183 if (do_write(p) < 0)
184 goto fail;
185 }
186
187 pa_pstream_unref(p);
188 return;
189
190 fail:
191
192 p->dead = 1;
193
194 if (p->die_callback)
195 p->die_callback(p, p->die_callback_userdata);
196
197 pa_pstream_unref(p);
198 }
199
200 static void io_callback(pa_iochannel*io, void *userdata) {
201 pa_pstream *p = userdata;
202
203 assert(p);
204 assert(p->io == io);
205
206 do_something(p);
207 }
208
209 static void anotify_callback(uint8_t event, void *userdata) {
210 pa_pstream *p = userdata;
211
212 assert(p);
213 do_something(p);
214 }
215
216 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
217
218 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
219 pa_pstream *p;
220
221 assert(m);
222 assert(io);
223 assert(pool);
224
225 p = pa_xnew(pa_pstream, 1);
226 PA_REFCNT_INIT(p);
227 p->io = io;
228 pa_iochannel_set_callback(io, io_callback, p);
229 p->dead = 0;
230
231 p->mutex = pa_mutex_new(1);
232 p->anotify = pa_anotify_new(m, anotify_callback, p);
233
234 p->mainloop = m;
235
236 p->send_queue = pa_queue_new();
237 assert(p->send_queue);
238
239 p->write.current = NULL;
240 p->write.index = 0;
241 pa_memchunk_reset(&p->write.memchunk);
242 p->read.memblock = NULL;
243 p->read.packet = NULL;
244 p->read.index = 0;
245
246 p->recieve_packet_callback = NULL;
247 p->recieve_packet_callback_userdata = NULL;
248 p->recieve_memblock_callback = NULL;
249 p->recieve_memblock_callback_userdata = NULL;
250 p->drain_callback = NULL;
251 p->drain_callback_userdata = NULL;
252 p->die_callback = NULL;
253 p->die_callback_userdata = NULL;
254
255 p->mempool = pool;
256
257 p->use_shm = 0;
258 p->export = NULL;
259
260 /* We do importing unconditionally */
261 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
262
263 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
264 pa_iochannel_socket_set_sndbuf(io, 1024*8);
265
266 #ifdef HAVE_CREDS
267 p->send_creds_now = 0;
268 p->read_creds_valid = 0;
269 #endif
270 return p;
271 }
272
273 static void item_free(void *item, PA_GCC_UNUSED void *p) {
274 struct item_info *i = item;
275 assert(i);
276
277 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
278 assert(i->chunk.memblock);
279 pa_memblock_unref(i->chunk.memblock);
280 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
281 assert(i->packet);
282 pa_packet_unref(i->packet);
283 }
284
285 pa_xfree(i);
286 }
287
288 static void pstream_free(pa_pstream *p) {
289 assert(p);
290
291 pa_pstream_close(p);
292
293 pa_queue_free(p->send_queue, item_free, NULL);
294
295 if (p->write.current)
296 item_free(p->write.current, NULL);
297
298 if (p->read.memblock)
299 pa_memblock_unref(p->read.memblock);
300
301 if (p->read.packet)
302 pa_packet_unref(p->read.packet);
303
304 if (p->write.memchunk.memblock)
305 pa_memblock_unref(p->write.memchunk.memblock);
306
307 if (p->mutex)
308 pa_mutex_free(p->mutex);
309
310 if (p->anotify)
311 pa_anotify_free(p->anotify);
312
313 pa_xfree(p);
314 }
315
316 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
317 struct item_info *i;
318
319 assert(p);
320 assert(PA_REFCNT_VALUE(p) > 0);
321 assert(packet);
322
323 i = pa_xnew(struct item_info, 1);
324 i->type = PA_PSTREAM_ITEM_PACKET;
325 i->packet = pa_packet_ref(packet);
326
327 #ifdef HAVE_CREDS
328 if ((i->with_creds = !!creds))
329 i->creds = *creds;
330 #endif
331
332 pa_mutex_lock(p->mutex);
333 pa_queue_push(p->send_queue, i);
334 pa_mutex_unlock(p->mutex);
335
336 pa_anotify_signal(p->anotify, 0);
337 }
338
339 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
340 size_t length, idx;
341
342 assert(p);
343 assert(PA_REFCNT_VALUE(p) > 0);
344 assert(channel != (uint32_t) -1);
345 assert(chunk);
346
347 idx = 0;
348
349 while (length > 0) {
350 struct item_info *i;
351 size_t n;
352
353 i = pa_xnew(struct item_info, 1);
354 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
355
356 n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
357 i->chunk.index = chunk->index + idx;
358 i->chunk.length = n;
359 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
360
361 i->channel = channel;
362 i->offset = offset;
363 i->seek_mode = seek_mode;
364 #ifdef HAVE_CREDS
365 i->with_creds = 0;
366 #endif
367
368 pa_mutex_lock(p->mutex);
369 pa_queue_push(p->send_queue, i);
370 pa_mutex_unlock(p->mutex);
371
372 idx += n;
373 length -= n;
374 }
375
376 pa_anotify_signal(p->anotify, 0);
377 }
378
379 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
380 struct item_info *item;
381 pa_pstream *p = userdata;
382
383 assert(p);
384 assert(PA_REFCNT_VALUE(p) > 0);
385
386 /* pa_log("Releasing block %u", block_id); */
387
388 item = pa_xnew(struct item_info, 1);
389 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
390 item->block_id = block_id;
391 #ifdef HAVE_CREDS
392 item->with_creds = 0;
393 #endif
394
395 pa_mutex_lock(p->mutex);
396 pa_queue_push(p->send_queue, item);
397 pa_mutex_unlock(p->mutex);
398
399 pa_anotify_signal(p->anotify, 0);
400 }
401
402 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
403 struct item_info *item;
404 pa_pstream *p = userdata;
405
406 assert(p);
407 assert(PA_REFCNT_VALUE(p) > 0);
408
409 /* pa_log("Revoking block %u", block_id); */
410
411 item = pa_xnew(struct item_info, 1);
412 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
413 item->block_id = block_id;
414 #ifdef HAVE_CREDS
415 item->with_creds = 0;
416 #endif
417
418 pa_mutex_lock(p->mutex);
419 pa_queue_push(p->send_queue, item);
420 pa_mutex_unlock(p->mutex);
421
422 pa_anotify_signal(p->anotify, 0);
423 }
424
425 static void prepare_next_write_item(pa_pstream *p) {
426 assert(p);
427 assert(PA_REFCNT_VALUE(p) > 0);
428
429 pa_mutex_lock(p->mutex);
430 p->write.current = pa_queue_pop(p->send_queue);
431 pa_mutex_unlock(p->mutex);
432
433 if (!p->write.current)
434 return;
435
436 p->write.index = 0;
437 p->write.data = NULL;
438 pa_memchunk_reset(&p->write.memchunk);
439
440 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
441 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
442 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
443 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
444 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
445
446 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
447
448 assert(p->write.current->packet);
449 p->write.data = p->write.current->packet->data;
450 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
451
452 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
453
454 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
455 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
456
457 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
458
459 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
460 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
461
462 } else {
463 uint32_t flags;
464 int send_payload = 1;
465
466 assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
467 assert(p->write.current->chunk.memblock);
468
469 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
470 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
471 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
472
473 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
474
475 if (p->use_shm) {
476 uint32_t block_id, shm_id;
477 size_t offset, length;
478
479 assert(p->export);
480
481 if (pa_memexport_put(p->export,
482 p->write.current->chunk.memblock,
483 &block_id,
484 &shm_id,
485 &offset,
486 &length) >= 0) {
487
488 flags |= PA_FLAG_SHMDATA;
489 send_payload = 0;
490
491 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
492 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
493 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
494 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
495
496 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
497 p->write.data = p->write.shm_info;
498 }
499 /* else */
500 /* pa_log_warn("Failed to export memory block."); */
501 }
502
503 if (send_payload) {
504 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
505 p->write.memchunk = p->write.current->chunk;
506 pa_memblock_ref(p->write.memchunk.memblock);
507 p->write.data = NULL;
508 }
509
510 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
511 }
512
513 #ifdef HAVE_CREDS
514 if ((p->send_creds_now = p->write.current->with_creds))
515 p->write_creds = p->write.current->creds;
516 #endif
517 }
518
519 static int do_write(pa_pstream *p) {
520 void *d;
521 size_t l;
522 ssize_t r;
523 pa_memblock *release_memblock = NULL;
524
525 assert(p);
526 assert(PA_REFCNT_VALUE(p) > 0);
527
528 if (!p->write.current)
529 prepare_next_write_item(p);
530
531 if (!p->write.current)
532 return 0;
533
534 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
535 d = (uint8_t*) p->write.descriptor + p->write.index;
536 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
537 } else {
538 assert(p->write.data || p->write.memchunk.memblock);
539
540 if (p->write.data)
541 d = p->write.data;
542 else {
543 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
544 release_memblock = p->write.memchunk.memblock;
545 }
546
547 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
548 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
549 }
550
551 assert(l > 0);
552
553 #ifdef HAVE_CREDS
554 if (p->send_creds_now) {
555
556 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
557 goto fail;
558
559 p->send_creds_now = 0;
560 } else
561 #endif
562
563 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
564 goto fail;
565
566 if (release_memblock)
567 pa_memblock_release(release_memblock);
568
569 p->write.index += r;
570
571 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
572 assert(p->write.current);
573 item_free(p->write.current, (void *) 1);
574 p->write.current = NULL;
575
576 if (p->drain_callback && !pa_pstream_is_pending(p))
577 p->drain_callback(p, p->drain_callback_userdata);
578 }
579
580 return 0;
581
582 fail:
583
584 if (release_memblock)
585 pa_memblock_release(release_memblock);
586
587 return -1;
588 }
589
590 static int do_read(pa_pstream *p) {
591 void *d;
592 size_t l;
593 ssize_t r;
594 pa_memblock *release_memblock = NULL;
595
596 assert(p);
597 assert(PA_REFCNT_VALUE(p) > 0);
598
599 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
600 d = (uint8_t*) p->read.descriptor + p->read.index;
601 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
602 } else {
603 assert(p->read.data || p->read.memblock);
604
605 if (p->read.data)
606 d = p->read.data;
607 else {
608 d = pa_memblock_acquire(p->read.memblock);
609 release_memblock = p->read.memblock;
610 }
611
612 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
613 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
614 }
615
616 #ifdef HAVE_CREDS
617 {
618 int b = 0;
619
620 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
621 goto fail;
622
623 p->read_creds_valid = p->read_creds_valid || b;
624 }
625 #else
626 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
627 goto fail;
628 #endif
629
630 if (release_memblock)
631 pa_memblock_release(release_memblock);
632
633 p->read.index += r;
634
635 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
636 uint32_t flags, length, channel;
637 /* Reading of frame descriptor complete */
638
639 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
640
641 if (!p->import && (flags & PA_FLAG_SHMMASK) != 0) {
642 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
643 return -1;
644 }
645
646 if (flags == PA_FLAG_SHMRELEASE) {
647
648 /* This is a SHM memblock release frame with no payload */
649
650 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
651
652 assert(p->export);
653 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
654
655 goto frame_done;
656
657 } else if (flags == PA_FLAG_SHMREVOKE) {
658
659 /* This is a SHM memblock revoke frame with no payload */
660
661 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
662
663 assert(p->import);
664 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
665
666 goto frame_done;
667 }
668
669 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
670
671 if (length > FRAME_SIZE_MAX_ALLOW) {
672 pa_log_warn("Recieved invalid frame size : %lu", (unsigned long) length);
673 return -1;
674 }
675
676 assert(!p->read.packet && !p->read.memblock);
677
678 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
679
680 if (channel == (uint32_t) -1) {
681
682 if (flags != 0) {
683 pa_log_warn("Received packet frame with invalid flags value.");
684 return -1;
685 }
686
687 /* Frame is a packet frame */
688 p->read.packet = pa_packet_new(length);
689 p->read.data = p->read.packet->data;
690
691 } else {
692
693 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
694 pa_log_warn("Received memblock frame with invalid seek mode.");
695 return -1;
696 }
697
698 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
699
700 if (length != sizeof(p->read.shm_info)) {
701 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
702 return -1;
703 }
704
705 /* Frame is a memblock frame referencing an SHM memblock */
706 p->read.data = p->read.shm_info;
707
708 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
709
710 /* Frame is a memblock frame */
711
712 p->read.memblock = pa_memblock_new(p->mempool, length);
713 p->read.data = NULL;
714 } else {
715
716 pa_log_warn("Recieved memblock frame with invalid flags value.");
717 return -1;
718 }
719 }
720
721 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
722 /* Frame payload available */
723
724 if (p->read.memblock && p->recieve_memblock_callback) {
725
726 /* Is this memblock data? Than pass it to the user */
727 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
728
729 if (l > 0) {
730 pa_memchunk chunk;
731
732 chunk.memblock = p->read.memblock;
733 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
734 chunk.length = l;
735
736 if (p->recieve_memblock_callback) {
737 int64_t offset;
738
739 offset = (int64_t) (
740 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
741 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
742
743 p->recieve_memblock_callback(
744 p,
745 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
746 offset,
747 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
748 &chunk,
749 p->recieve_memblock_callback_userdata);
750 }
751
752 /* Drop seek info for following callbacks */
753 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
754 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
755 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
756 }
757 }
758
759 /* Frame complete */
760 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
761
762 if (p->read.memblock) {
763
764 /* This was a memblock frame. We can unref the memblock now */
765 pa_memblock_unref(p->read.memblock);
766
767 } else if (p->read.packet) {
768
769 if (p->recieve_packet_callback)
770 #ifdef HAVE_CREDS
771 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
772 #else
773 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
774 #endif
775
776 pa_packet_unref(p->read.packet);
777 } else {
778 pa_memblock *b;
779
780 assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
781
782 assert(p->import);
783
784 if (!(b = pa_memimport_get(p->import,
785 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
786 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
787 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
788 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
789
790 pa_log_warn("Failed to import memory block.");
791 return -1;
792 }
793
794 if (p->recieve_memblock_callback) {
795 int64_t offset;
796 pa_memchunk chunk;
797
798 chunk.memblock = b;
799 chunk.index = 0;
800 chunk.length = pa_memblock_get_length(b);
801
802 offset = (int64_t) (
803 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
804 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
805
806 p->recieve_memblock_callback(
807 p,
808 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
809 offset,
810 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
811 &chunk,
812 p->recieve_memblock_callback_userdata);
813 }
814
815 pa_memblock_unref(b);
816 }
817
818 goto frame_done;
819 }
820 }
821
822 return 0;
823
824 frame_done:
825 p->read.memblock = NULL;
826 p->read.packet = NULL;
827 p->read.index = 0;
828 p->read.data = NULL;
829
830 #ifdef HAVE_CREDS
831 p->read_creds_valid = 0;
832 #endif
833
834 return 0;
835
836 fail:
837 if (release_memblock)
838 pa_memblock_release(release_memblock);
839
840 return -1;
841 }
842
843 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
844 assert(p);
845 assert(PA_REFCNT_VALUE(p) > 0);
846
847 p->die_callback = cb;
848 p->die_callback_userdata = userdata;
849 }
850
851 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
852 assert(p);
853 assert(PA_REFCNT_VALUE(p) > 0);
854
855 p->drain_callback = cb;
856 p->drain_callback_userdata = userdata;
857 }
858
859 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
860 assert(p);
861 assert(PA_REFCNT_VALUE(p) > 0);
862
863 p->recieve_packet_callback = cb;
864 p->recieve_packet_callback_userdata = userdata;
865 }
866
867 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
868 assert(p);
869 assert(PA_REFCNT_VALUE(p) > 0);
870
871 p->recieve_memblock_callback = cb;
872 p->recieve_memblock_callback_userdata = userdata;
873 }
874
875 int pa_pstream_is_pending(pa_pstream *p) {
876 int b;
877
878 assert(p);
879 assert(PA_REFCNT_VALUE(p) > 0);
880
881 pa_mutex_lock(p->mutex);
882
883 if (p->dead)
884 b = 0;
885 else
886 b = p->write.current || !pa_queue_is_empty(p->send_queue);
887
888 pa_mutex_unlock(p->mutex);
889
890 return b;
891 }
892
893 void pa_pstream_unref(pa_pstream*p) {
894 assert(p);
895 assert(PA_REFCNT_VALUE(p) > 0);
896
897 if (PA_REFCNT_DEC(p) <= 0)
898 pstream_free(p);
899 }
900
901 pa_pstream* pa_pstream_ref(pa_pstream*p) {
902 assert(p);
903 assert(PA_REFCNT_VALUE(p) > 0);
904
905 PA_REFCNT_INC(p);
906 return p;
907 }
908
909 void pa_pstream_close(pa_pstream *p) {
910 assert(p);
911
912 p->dead = 1;
913
914 if (p->import) {
915 pa_memimport_free(p->import);
916 p->import = NULL;
917 }
918
919 if (p->export) {
920 pa_memexport_free(p->export);
921 p->export = NULL;
922 }
923
924 if (p->io) {
925 pa_iochannel_free(p->io);
926 p->io = NULL;
927 }
928
929 p->die_callback = NULL;
930 p->drain_callback = NULL;
931 p->recieve_packet_callback = NULL;
932 p->recieve_memblock_callback = NULL;
933 }
934
935 void pa_pstream_use_shm(pa_pstream *p, int enable) {
936 assert(p);
937 assert(PA_REFCNT_VALUE(p) > 0);
938
939 p->use_shm = enable;
940
941 if (enable) {
942
943 if (!p->export)
944 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
945
946 } else {
947
948 if (p->export) {
949 pa_memexport_free(p->export);
950 p->export = NULL;
951 }
952 }
953 }