]> code.delx.au - pulseaudio/blob - src/pulsecore/memblock.c
beefup proplist handling for sound events
[pulseaudio] / src / pulsecore / memblock.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <signal.h>
34 #include <errno.h>
35
36 #include <pulse/xmalloc.h>
37 #include <pulse/def.h>
38
39 #include <pulsecore/shm.h>
40 #include <pulsecore/log.h>
41 #include <pulsecore/hashmap.h>
42 #include <pulsecore/semaphore.h>
43 #include <pulsecore/macro.h>
44 #include <pulsecore/flist.h>
45 #include <pulsecore/core-util.h>
46
47 #include "memblock.h"
48
49 #define PA_MEMPOOL_SLOTS_MAX 512
50 #define PA_MEMPOOL_SLOT_SIZE (32*1024)
51
52 #define PA_MEMEXPORT_SLOTS_MAX 128
53
54 #define PA_MEMIMPORT_SLOTS_MAX 128
55 #define PA_MEMIMPORT_SEGMENTS_MAX 16
56
57 struct pa_memblock {
58 PA_REFCNT_DECLARE; /* the reference counter */
59 pa_mempool *pool;
60
61 pa_memblock_type_t type;
62
63 pa_bool_t read_only:1;
64 pa_bool_t is_silence:1;
65
66 pa_atomic_ptr_t data;
67 size_t length;
68
69 pa_atomic_t n_acquired;
70 pa_atomic_t please_signal;
71
72 union {
73 struct {
74 /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
75 pa_free_cb_t free_cb;
76 } user;
77
78 struct {
79 uint32_t id;
80 pa_memimport_segment *segment;
81 } imported;
82 } per_type;
83 };
84
85 struct pa_memimport_segment {
86 pa_memimport *import;
87 pa_shm memory;
88 unsigned n_blocks;
89 };
90
91 struct pa_memimport {
92 pa_mutex *mutex;
93
94 pa_mempool *pool;
95 pa_hashmap *segments;
96 pa_hashmap *blocks;
97
98 /* Called whenever an imported memory block is no longer
99 * needed. */
100 pa_memimport_release_cb_t release_cb;
101 void *userdata;
102
103 PA_LLIST_FIELDS(pa_memimport);
104 };
105
106 struct memexport_slot {
107 PA_LLIST_FIELDS(struct memexport_slot);
108 pa_memblock *block;
109 };
110
111 struct pa_memexport {
112 pa_mutex *mutex;
113 pa_mempool *pool;
114
115 struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX];
116
117 PA_LLIST_HEAD(struct memexport_slot, free_slots);
118 PA_LLIST_HEAD(struct memexport_slot, used_slots);
119 unsigned n_init;
120
121 /* Called whenever a client from which we imported a memory block
122 which we in turn exported to another client dies and we need to
123 revoke the memory block accordingly */
124 pa_memexport_revoke_cb_t revoke_cb;
125 void *userdata;
126
127 PA_LLIST_FIELDS(pa_memexport);
128 };
129
130 struct mempool_slot {
131 PA_LLIST_FIELDS(struct mempool_slot);
132 /* the actual data follows immediately hereafter */
133 };
134
135 struct pa_mempool {
136 pa_semaphore *semaphore;
137 pa_mutex *mutex;
138
139 pa_shm memory;
140 size_t block_size;
141 unsigned n_blocks;
142
143 pa_atomic_t n_init;
144
145 PA_LLIST_HEAD(pa_memimport, imports);
146 PA_LLIST_HEAD(pa_memexport, exports);
147
148 /* A list of free slots that may be reused */
149 pa_flist *free_slots;
150
151 pa_mempool_stat stat;
152 };
153
154 static void segment_detach(pa_memimport_segment *seg);
155
156 PA_STATIC_FLIST_DECLARE(unused_memblocks, 0, pa_xfree);
157
158 /* No lock necessary */
159 static void stat_add(pa_memblock*b) {
160 pa_assert(b);
161 pa_assert(b->pool);
162
163 pa_atomic_inc(&b->pool->stat.n_allocated);
164 pa_atomic_add(&b->pool->stat.allocated_size, b->length);
165
166 pa_atomic_inc(&b->pool->stat.n_accumulated);
167 pa_atomic_add(&b->pool->stat.accumulated_size, b->length);
168
169 if (b->type == PA_MEMBLOCK_IMPORTED) {
170 pa_atomic_inc(&b->pool->stat.n_imported);
171 pa_atomic_add(&b->pool->stat.imported_size, b->length);
172 }
173
174 pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
175 pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
176 }
177
178 /* No lock necessary */
179 static void stat_remove(pa_memblock *b) {
180 pa_assert(b);
181 pa_assert(b->pool);
182
183 pa_assert(pa_atomic_load(&b->pool->stat.n_allocated) > 0);
184 pa_assert(pa_atomic_load(&b->pool->stat.allocated_size) >= (int) b->length);
185
186 pa_atomic_dec(&b->pool->stat.n_allocated);
187 pa_atomic_sub(&b->pool->stat.allocated_size, b->length);
188
189 if (b->type == PA_MEMBLOCK_IMPORTED) {
190 pa_assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
191 pa_assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
192
193 pa_atomic_dec(&b->pool->stat.n_imported);
194 pa_atomic_sub(&b->pool->stat.imported_size, b->length);
195 }
196
197 pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
198 }
199
200 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length);
201
202 /* No lock necessary */
203 pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {
204 pa_memblock *b;
205
206 pa_assert(p);
207 pa_assert(length > 0);
208
209 if (!(b = pa_memblock_new_pool(p, length)))
210 b = memblock_new_appended(p, length);
211
212 return b;
213 }
214
215 /* No lock necessary */
216 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
217 pa_memblock *b;
218
219 pa_assert(p);
220 pa_assert(length > 0);
221
222 /* If -1 is passed as length we choose the size for the caller. */
223
224 if (length == (size_t) -1)
225 length = p->block_size - PA_ALIGN(sizeof(struct mempool_slot)) - PA_ALIGN(sizeof(pa_memblock));
226
227 b = pa_xmalloc(PA_ALIGN(sizeof(pa_memblock)) + length);
228 PA_REFCNT_INIT(b);
229 b->pool = p;
230 b->type = PA_MEMBLOCK_APPENDED;
231 b->read_only = b->is_silence = FALSE;
232 pa_atomic_ptr_store(&b->data, (uint8_t*) b + PA_ALIGN(sizeof(pa_memblock)));
233 b->length = length;
234 pa_atomic_store(&b->n_acquired, 0);
235 pa_atomic_store(&b->please_signal, 0);
236
237 stat_add(b);
238 return b;
239 }
240
241 /* No lock necessary */
242 static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) {
243 struct mempool_slot *slot;
244 pa_assert(p);
245
246 if (!(slot = pa_flist_pop(p->free_slots))) {
247 int idx;
248
249 /* The free list was empty, we have to allocate a new entry */
250
251 if ((unsigned) (idx = pa_atomic_inc(&p->n_init)) >= p->n_blocks)
252 pa_atomic_dec(&p->n_init);
253 else
254 slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * idx));
255
256 if (!slot) {
257 pa_log_info("Pool full");
258 pa_atomic_inc(&p->stat.n_pool_full);
259 return NULL;
260 }
261 }
262
263 return slot;
264 }
265
266 /* No lock necessary */
267 static void* mempool_slot_data(struct mempool_slot *slot) {
268 pa_assert(slot);
269
270 return (uint8_t*) slot + PA_ALIGN(sizeof(struct mempool_slot));
271 }
272
273 /* No lock necessary */
274 static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) {
275 pa_assert(p);
276
277 pa_assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr);
278 pa_assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size);
279
280 return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size;
281 }
282
283 /* No lock necessary */
284 static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
285 unsigned idx;
286
287 if ((idx = mempool_slot_idx(p, ptr)) == (unsigned) -1)
288 return NULL;
289
290 return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));
291 }
292
293 /* No lock necessary */
294 pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
295 pa_memblock *b = NULL;
296 struct mempool_slot *slot;
297
298 pa_assert(p);
299 pa_assert(length > 0);
300
301 /* If -1 is passed as length we choose the size for the caller: we
302 * take the largest size that fits in one of our slots. */
303
304 if (length == (size_t) -1)
305 length = pa_mempool_block_size_max(p);
306
307 if (p->block_size - PA_ALIGN(sizeof(struct mempool_slot)) >= PA_ALIGN(sizeof(pa_memblock)) + length) {
308
309 if (!(slot = mempool_allocate_slot(p)))
310 return NULL;
311
312 b = mempool_slot_data(slot);
313 b->type = PA_MEMBLOCK_POOL;
314 pa_atomic_ptr_store(&b->data, (uint8_t*) b + PA_ALIGN(sizeof(pa_memblock)));
315
316 } else if (p->block_size - PA_ALIGN(sizeof(struct mempool_slot)) >= length) {
317
318 if (!(slot = mempool_allocate_slot(p)))
319 return NULL;
320
321 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
322 b = pa_xnew(pa_memblock, 1);
323
324 b->type = PA_MEMBLOCK_POOL_EXTERNAL;
325 pa_atomic_ptr_store(&b->data, mempool_slot_data(slot));
326
327 } else {
328 pa_log_debug("Memory block too large for pool: %lu > %lu", (unsigned long) length, (unsigned long) (p->block_size - PA_ALIGN(sizeof(struct mempool_slot))));
329 pa_atomic_inc(&p->stat.n_too_large_for_pool);
330 return NULL;
331 }
332
333 PA_REFCNT_INIT(b);
334 b->pool = p;
335 b->read_only = b->is_silence = FALSE;
336 b->length = length;
337 pa_atomic_store(&b->n_acquired, 0);
338 pa_atomic_store(&b->please_signal, 0);
339
340 stat_add(b);
341 return b;
342 }
343
344 /* No lock necessary */
345 pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, pa_bool_t read_only) {
346 pa_memblock *b;
347
348 pa_assert(p);
349 pa_assert(d);
350 pa_assert(length != (size_t) -1);
351 pa_assert(length > 0);
352
353 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
354 b = pa_xnew(pa_memblock, 1);
355 PA_REFCNT_INIT(b);
356 b->pool = p;
357 b->type = PA_MEMBLOCK_FIXED;
358 b->read_only = read_only;
359 b->is_silence = FALSE;
360 pa_atomic_ptr_store(&b->data, d);
361 b->length = length;
362 pa_atomic_store(&b->n_acquired, 0);
363 pa_atomic_store(&b->please_signal, 0);
364
365 stat_add(b);
366 return b;
367 }
368
369 /* No lock necessary */
370 pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, pa_free_cb_t free_cb, pa_bool_t read_only) {
371 pa_memblock *b;
372
373 pa_assert(p);
374 pa_assert(d);
375 pa_assert(length > 0);
376 pa_assert(length != (size_t) -1);
377 pa_assert(free_cb);
378
379 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
380 b = pa_xnew(pa_memblock, 1);
381 PA_REFCNT_INIT(b);
382 b->pool = p;
383 b->type = PA_MEMBLOCK_USER;
384 b->read_only = read_only;
385 b->is_silence = FALSE;
386 pa_atomic_ptr_store(&b->data, d);
387 b->length = length;
388 pa_atomic_store(&b->n_acquired, 0);
389 pa_atomic_store(&b->please_signal, 0);
390
391 b->per_type.user.free_cb = free_cb;
392
393 stat_add(b);
394 return b;
395 }
396
397 /* No lock necessary */
398 pa_bool_t pa_memblock_is_read_only(pa_memblock *b) {
399 pa_assert(b);
400 pa_assert(PA_REFCNT_VALUE(b) > 0);
401
402 return b->read_only && PA_REFCNT_VALUE(b) == 1;
403 }
404
405 /* No lock necessary */
406 pa_bool_t pa_memblock_is_silence(pa_memblock *b) {
407 pa_assert(b);
408 pa_assert(PA_REFCNT_VALUE(b) > 0);
409
410 return b->is_silence;
411 }
412
413 /* No lock necessary */
414 void pa_memblock_set_is_silence(pa_memblock *b, pa_bool_t v) {
415 pa_assert(b);
416 pa_assert(PA_REFCNT_VALUE(b) > 0);
417
418 b->is_silence = v;
419 }
420
421 /* No lock necessary */
422 pa_bool_t pa_memblock_ref_is_one(pa_memblock *b) {
423 int r;
424
425 pa_assert(b);
426
427 pa_assert_se((r = PA_REFCNT_VALUE(b)) > 0);
428
429 return r == 1;
430 }
431
432 /* No lock necessary */
433 void* pa_memblock_acquire(pa_memblock *b) {
434 pa_assert(b);
435 pa_assert(PA_REFCNT_VALUE(b) > 0);
436
437 pa_atomic_inc(&b->n_acquired);
438
439 return pa_atomic_ptr_load(&b->data);
440 }
441
442 /* No lock necessary, in corner cases locks by its own */
443 void pa_memblock_release(pa_memblock *b) {
444 int r;
445 pa_assert(b);
446 pa_assert(PA_REFCNT_VALUE(b) > 0);
447
448 r = pa_atomic_dec(&b->n_acquired);
449 pa_assert(r >= 1);
450
451 /* Signal a waiting thread that this memblock is no longer used */
452 if (r == 1 && pa_atomic_load(&b->please_signal))
453 pa_semaphore_post(b->pool->semaphore);
454 }
455
456 size_t pa_memblock_get_length(pa_memblock *b) {
457 pa_assert(b);
458 pa_assert(PA_REFCNT_VALUE(b) > 0);
459
460 return b->length;
461 }
462
463 pa_mempool* pa_memblock_get_pool(pa_memblock *b) {
464 pa_assert(b);
465 pa_assert(PA_REFCNT_VALUE(b) > 0);
466
467 return b->pool;
468 }
469
470 /* No lock necessary */
471 pa_memblock* pa_memblock_ref(pa_memblock*b) {
472 pa_assert(b);
473 pa_assert(PA_REFCNT_VALUE(b) > 0);
474
475 PA_REFCNT_INC(b);
476 return b;
477 }
478
479 static void memblock_free(pa_memblock *b) {
480 pa_assert(b);
481
482 pa_assert(pa_atomic_load(&b->n_acquired) == 0);
483
484 stat_remove(b);
485
486 switch (b->type) {
487 case PA_MEMBLOCK_USER :
488 pa_assert(b->per_type.user.free_cb);
489 b->per_type.user.free_cb(pa_atomic_ptr_load(&b->data));
490
491 /* Fall through */
492
493 case PA_MEMBLOCK_FIXED:
494 case PA_MEMBLOCK_APPENDED :
495 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
496 pa_xfree(b);
497
498 break;
499
500 case PA_MEMBLOCK_IMPORTED : {
501 pa_memimport_segment *segment;
502 pa_memimport *import;
503
504 /* FIXME! This should be implemented lock-free */
505
506 segment = b->per_type.imported.segment;
507 pa_assert(segment);
508 import = segment->import;
509 pa_assert(import);
510
511 pa_mutex_lock(import->mutex);
512 pa_hashmap_remove(import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));
513 if (-- segment->n_blocks <= 0)
514 segment_detach(segment);
515
516 pa_mutex_unlock(import->mutex);
517
518 import->release_cb(import, b->per_type.imported.id, import->userdata);
519
520 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
521 pa_xfree(b);
522 break;
523 }
524
525 case PA_MEMBLOCK_POOL_EXTERNAL:
526 case PA_MEMBLOCK_POOL: {
527 struct mempool_slot *slot;
528 int call_free;
529
530 slot = mempool_slot_by_ptr(b->pool, pa_atomic_ptr_load(&b->data));
531 pa_assert(slot);
532
533 call_free = b->type == PA_MEMBLOCK_POOL_EXTERNAL;
534
535 /* The free list dimensions should easily allow all slots
536 * to fit in, hence try harder if pushing this slot into
537 * the free list fails */
538 while (pa_flist_push(b->pool->free_slots, slot) < 0)
539 ;
540
541 if (call_free)
542 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
543 pa_xfree(b);
544
545 break;
546 }
547
548 case PA_MEMBLOCK_TYPE_MAX:
549 default:
550 pa_assert_not_reached();
551 }
552 }
553
554 /* No lock necessary */
555 void pa_memblock_unref(pa_memblock*b) {
556 pa_assert(b);
557 pa_assert(PA_REFCNT_VALUE(b) > 0);
558
559 if (PA_REFCNT_DEC(b) > 0)
560 return;
561
562 memblock_free(b);
563 }
564
565 /* Self locked */
566 static void memblock_wait(pa_memblock *b) {
567 pa_assert(b);
568
569 if (pa_atomic_load(&b->n_acquired) > 0) {
570 /* We need to wait until all threads gave up access to the
571 * memory block before we can go on. Unfortunately this means
572 * that we have to lock and wait here. Sniff! */
573
574 pa_atomic_inc(&b->please_signal);
575
576 while (pa_atomic_load(&b->n_acquired) > 0)
577 pa_semaphore_wait(b->pool->semaphore);
578
579 pa_atomic_dec(&b->please_signal);
580 }
581 }
582
583 /* No lock necessary. This function is not multiple caller safe! */
584 static void memblock_make_local(pa_memblock *b) {
585 pa_assert(b);
586
587 pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
588
589 if (b->length <= b->pool->block_size - PA_ALIGN(sizeof(struct mempool_slot))) {
590 struct mempool_slot *slot;
591
592 if ((slot = mempool_allocate_slot(b->pool))) {
593 void *new_data;
594 /* We can move it into a local pool, perfect! */
595
596 new_data = mempool_slot_data(slot);
597 memcpy(new_data, pa_atomic_ptr_load(&b->data), b->length);
598 pa_atomic_ptr_store(&b->data, new_data);
599
600 b->type = PA_MEMBLOCK_POOL_EXTERNAL;
601 b->read_only = FALSE;
602
603 goto finish;
604 }
605 }
606
607 /* Humm, not enough space in the pool, so lets allocate the memory with malloc() */
608 b->per_type.user.free_cb = pa_xfree;
609 pa_atomic_ptr_store(&b->data, pa_xmemdup(pa_atomic_ptr_load(&b->data), b->length));
610
611 b->type = PA_MEMBLOCK_USER;
612 b->read_only = FALSE;
613
614 finish:
615 pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
616 pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
617 memblock_wait(b);
618 }
619
620 /* No lock necessary. This function is not multiple caller safe*/
621 void pa_memblock_unref_fixed(pa_memblock *b) {
622 pa_assert(b);
623 pa_assert(PA_REFCNT_VALUE(b) > 0);
624 pa_assert(b->type == PA_MEMBLOCK_FIXED);
625
626 if (PA_REFCNT_VALUE(b) > 1)
627 memblock_make_local(b);
628
629 pa_memblock_unref(b);
630 }
631
632 /* No lock necessary. */
633 pa_memblock *pa_memblock_will_need(pa_memblock *b) {
634 void *p;
635
636 pa_assert(b);
637 pa_assert(PA_REFCNT_VALUE(b) > 0);
638
639 p = pa_memblock_acquire(b);
640 pa_will_need(p, b->length);
641 pa_memblock_release(b);
642
643 return b;
644 }
645
646 /* Self-locked. This function is not multiple-caller safe */
647 static void memblock_replace_import(pa_memblock *b) {
648 pa_memimport_segment *seg;
649
650 pa_assert(b);
651 pa_assert(b->type == PA_MEMBLOCK_IMPORTED);
652
653 pa_assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
654 pa_assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
655 pa_atomic_dec(&b->pool->stat.n_imported);
656 pa_atomic_sub(&b->pool->stat.imported_size, b->length);
657
658 seg = b->per_type.imported.segment;
659 pa_assert(seg);
660 pa_assert(seg->import);
661
662 pa_mutex_lock(seg->import->mutex);
663
664 pa_hashmap_remove(
665 seg->import->blocks,
666 PA_UINT32_TO_PTR(b->per_type.imported.id));
667
668 memblock_make_local(b);
669
670 if (-- seg->n_blocks <= 0) {
671 pa_mutex_unlock(seg->import->mutex);
672 segment_detach(seg);
673 } else
674 pa_mutex_unlock(seg->import->mutex);
675 }
676
677 pa_mempool* pa_mempool_new(int shared) {
678 pa_mempool *p;
679
680 p = pa_xnew(pa_mempool, 1);
681
682 p->mutex = pa_mutex_new(TRUE, TRUE);
683 p->semaphore = pa_semaphore_new(0);
684
685 p->block_size = PA_PAGE_ALIGN(PA_MEMPOOL_SLOT_SIZE);
686 if (p->block_size < PA_PAGE_SIZE)
687 p->block_size = PA_PAGE_SIZE;
688
689 p->n_blocks = PA_MEMPOOL_SLOTS_MAX;
690
691 pa_assert(p->block_size > PA_ALIGN(sizeof(struct mempool_slot)));
692
693 if (pa_shm_create_rw(&p->memory, p->n_blocks * p->block_size, shared, 0700) < 0) {
694 pa_xfree(p);
695 return NULL;
696 }
697
698 memset(&p->stat, 0, sizeof(p->stat));
699 pa_atomic_store(&p->n_init, 0);
700
701 PA_LLIST_HEAD_INIT(pa_memimport, p->imports);
702 PA_LLIST_HEAD_INIT(pa_memexport, p->exports);
703
704 p->free_slots = pa_flist_new(p->n_blocks*2);
705
706 return p;
707 }
708
709 void pa_mempool_free(pa_mempool *p) {
710 pa_assert(p);
711
712 pa_mutex_lock(p->mutex);
713
714 while (p->imports)
715 pa_memimport_free(p->imports);
716
717 while (p->exports)
718 pa_memexport_free(p->exports);
719
720 pa_mutex_unlock(p->mutex);
721
722 pa_flist_free(p->free_slots, NULL);
723
724 if (pa_atomic_load(&p->stat.n_allocated) > 0) {
725 /* raise(SIGTRAP); */
726 pa_log_warn("Memory pool destroyed but not all memory blocks freed! %u remain.", pa_atomic_load(&p->stat.n_allocated));
727 }
728
729 pa_shm_free(&p->memory);
730
731 pa_mutex_free(p->mutex);
732 pa_semaphore_free(p->semaphore);
733
734 pa_xfree(p);
735 }
736
737 /* No lock necessary */
738 const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) {
739 pa_assert(p);
740
741 return &p->stat;
742 }
743
744 /* No lock necessary */
745 size_t pa_mempool_block_size_max(pa_mempool *p) {
746 pa_assert(p);
747
748 return p->block_size - PA_ALIGN(sizeof(struct mempool_slot)) - PA_ALIGN(sizeof(pa_memblock));
749 }
750
751 /* No lock necessary */
752 void pa_mempool_vacuum(pa_mempool *p) {
753 struct mempool_slot *slot;
754 pa_flist *list;
755
756 pa_assert(p);
757
758 list = pa_flist_new(p->n_blocks*2);
759
760 while ((slot = pa_flist_pop(p->free_slots)))
761 while (pa_flist_push(list, slot) < 0)
762 ;
763
764 while ((slot = pa_flist_pop(list))) {
765 pa_shm_punch(&p->memory,
766 (uint8_t*) slot - (uint8_t*) p->memory.ptr + PA_ALIGN(sizeof(struct mempool_slot)),
767 p->block_size - PA_ALIGN(sizeof(struct mempool_slot)));
768
769 while (pa_flist_push(p->free_slots, slot))
770 ;
771 }
772
773 pa_flist_free(list, NULL);
774 }
775
776 /* No lock necessary */
777 int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
778 pa_assert(p);
779
780 if (!p->memory.shared)
781 return -1;
782
783 *id = p->memory.id;
784
785 return 0;
786 }
787
788 /* No lock necessary */
789 pa_bool_t pa_mempool_is_shared(pa_mempool *p) {
790 pa_assert(p);
791
792 return !!p->memory.shared;
793 }
794
795 /* For recieving blocks from other nodes */
796 pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata) {
797 pa_memimport *i;
798
799 pa_assert(p);
800 pa_assert(cb);
801
802 i = pa_xnew(pa_memimport, 1);
803 i->mutex = pa_mutex_new(TRUE, TRUE);
804 i->pool = p;
805 i->segments = pa_hashmap_new(NULL, NULL);
806 i->blocks = pa_hashmap_new(NULL, NULL);
807 i->release_cb = cb;
808 i->userdata = userdata;
809
810 pa_mutex_lock(p->mutex);
811 PA_LLIST_PREPEND(pa_memimport, p->imports, i);
812 pa_mutex_unlock(p->mutex);
813
814 return i;
815 }
816
817 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
818
819 /* Should be called locked */
820 static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
821 pa_memimport_segment* seg;
822
823 if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
824 return NULL;
825
826 seg = pa_xnew(pa_memimport_segment, 1);
827
828 if (pa_shm_attach_ro(&seg->memory, shm_id) < 0) {
829 pa_xfree(seg);
830 return NULL;
831 }
832
833 seg->import = i;
834 seg->n_blocks = 0;
835
836 pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(shm_id), seg);
837 return seg;
838 }
839
840 /* Should be called locked */
841 static void segment_detach(pa_memimport_segment *seg) {
842 pa_assert(seg);
843
844 pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
845 pa_shm_free(&seg->memory);
846 pa_xfree(seg);
847 }
848
849 /* Self-locked. Not multiple-caller safe */
850 void pa_memimport_free(pa_memimport *i) {
851 pa_memexport *e;
852 pa_memblock *b;
853
854 pa_assert(i);
855
856 pa_mutex_lock(i->mutex);
857
858 while ((b = pa_hashmap_get_first(i->blocks)))
859 memblock_replace_import(b);
860
861 pa_assert(pa_hashmap_size(i->segments) == 0);
862
863 pa_mutex_unlock(i->mutex);
864
865 pa_mutex_lock(i->pool->mutex);
866
867 /* If we've exported this block further we need to revoke that export */
868 for (e = i->pool->exports; e; e = e->next)
869 memexport_revoke_blocks(e, i);
870
871 PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);
872
873 pa_mutex_unlock(i->pool->mutex);
874
875 pa_hashmap_free(i->blocks, NULL, NULL);
876 pa_hashmap_free(i->segments, NULL, NULL);
877
878 pa_mutex_free(i->mutex);
879
880 pa_xfree(i);
881 }
882
883 /* Self-locked */
884 pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
885 pa_memblock *b = NULL;
886 pa_memimport_segment *seg;
887
888 pa_assert(i);
889
890 pa_mutex_lock(i->mutex);
891
892 if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
893 goto finish;
894
895 if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
896 if (!(seg = segment_attach(i, shm_id)))
897 goto finish;
898
899 if (offset+size > seg->memory.size)
900 goto finish;
901
902 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
903 b = pa_xnew(pa_memblock, 1);
904
905 PA_REFCNT_INIT(b);
906 b->pool = i->pool;
907 b->type = PA_MEMBLOCK_IMPORTED;
908 b->read_only = TRUE;
909 b->is_silence = FALSE;
910 pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
911 b->length = size;
912 pa_atomic_store(&b->n_acquired, 0);
913 pa_atomic_store(&b->please_signal, 0);
914 b->per_type.imported.id = block_id;
915 b->per_type.imported.segment = seg;
916
917 pa_hashmap_put(i->blocks, PA_UINT32_TO_PTR(block_id), b);
918
919 seg->n_blocks++;
920
921 finish:
922 pa_mutex_unlock(i->mutex);
923
924 if (b)
925 stat_add(b);
926
927 return b;
928 }
929
930 int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
931 pa_memblock *b;
932 int ret = 0;
933 pa_assert(i);
934
935 pa_mutex_lock(i->mutex);
936
937 if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id)))) {
938 ret = -1;
939 goto finish;
940 }
941
942 memblock_replace_import(b);
943
944 finish:
945 pa_mutex_unlock(i->mutex);
946
947 return ret;
948 }
949
950 /* For sending blocks to other nodes */
951 pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata) {
952 pa_memexport *e;
953
954 pa_assert(p);
955 pa_assert(cb);
956
957 if (!p->memory.shared)
958 return NULL;
959
960 e = pa_xnew(pa_memexport, 1);
961 e->mutex = pa_mutex_new(TRUE, TRUE);
962 e->pool = p;
963 PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);
964 PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);
965 e->n_init = 0;
966 e->revoke_cb = cb;
967 e->userdata = userdata;
968
969 pa_mutex_lock(p->mutex);
970 PA_LLIST_PREPEND(pa_memexport, p->exports, e);
971 pa_mutex_unlock(p->mutex);
972 return e;
973 }
974
975 void pa_memexport_free(pa_memexport *e) {
976 pa_assert(e);
977
978 pa_mutex_lock(e->mutex);
979 while (e->used_slots)
980 pa_memexport_process_release(e, e->used_slots - e->slots);
981 pa_mutex_unlock(e->mutex);
982
983 pa_mutex_lock(e->pool->mutex);
984 PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e);
985 pa_mutex_unlock(e->pool->mutex);
986
987 pa_mutex_free(e->mutex);
988 pa_xfree(e);
989 }
990
991 /* Self-locked */
992 int pa_memexport_process_release(pa_memexport *e, uint32_t id) {
993 pa_memblock *b;
994
995 pa_assert(e);
996
997 pa_mutex_lock(e->mutex);
998
999 if (id >= e->n_init)
1000 goto fail;
1001
1002 if (!e->slots[id].block)
1003 goto fail;
1004
1005 b = e->slots[id].block;
1006 e->slots[id].block = NULL;
1007
1008 PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]);
1009 PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]);
1010
1011 pa_mutex_unlock(e->mutex);
1012
1013 /* pa_log("Processing release for %u", id); */
1014
1015 pa_assert(pa_atomic_load(&e->pool->stat.n_exported) > 0);
1016 pa_assert(pa_atomic_load(&e->pool->stat.exported_size) >= (int) b->length);
1017
1018 pa_atomic_dec(&e->pool->stat.n_exported);
1019 pa_atomic_sub(&e->pool->stat.exported_size, b->length);
1020
1021 pa_memblock_unref(b);
1022
1023 return 0;
1024
1025 fail:
1026 pa_mutex_unlock(e->mutex);
1027
1028 return -1;
1029 }
1030
1031 /* Self-locked */
1032 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {
1033 struct memexport_slot *slot, *next;
1034 pa_assert(e);
1035 pa_assert(i);
1036
1037 pa_mutex_lock(e->mutex);
1038
1039 for (slot = e->used_slots; slot; slot = next) {
1040 uint32_t idx;
1041 next = slot->next;
1042
1043 if (slot->block->type != PA_MEMBLOCK_IMPORTED ||
1044 slot->block->per_type.imported.segment->import != i)
1045 continue;
1046
1047 idx = slot - e->slots;
1048 e->revoke_cb(e, idx, e->userdata);
1049 pa_memexport_process_release(e, idx);
1050 }
1051
1052 pa_mutex_unlock(e->mutex);
1053 }
1054
1055 /* No lock necessary */
1056 static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
1057 pa_memblock *n;
1058
1059 pa_assert(p);
1060 pa_assert(b);
1061
1062 if (b->type == PA_MEMBLOCK_IMPORTED ||
1063 b->type == PA_MEMBLOCK_POOL ||
1064 b->type == PA_MEMBLOCK_POOL_EXTERNAL) {
1065 pa_assert(b->pool == p);
1066 return pa_memblock_ref(b);
1067 }
1068
1069 if (!(n = pa_memblock_new_pool(p, b->length)))
1070 return NULL;
1071
1072 memcpy(pa_atomic_ptr_load(&n->data), pa_atomic_ptr_load(&b->data), b->length);
1073 return n;
1074 }
1075
1076 /* Self-locked */
1077 int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
1078 pa_shm *memory;
1079 struct memexport_slot *slot;
1080 void *data;
1081
1082 pa_assert(e);
1083 pa_assert(b);
1084 pa_assert(block_id);
1085 pa_assert(shm_id);
1086 pa_assert(offset);
1087 pa_assert(size);
1088 pa_assert(b->pool == e->pool);
1089
1090 if (!(b = memblock_shared_copy(e->pool, b)))
1091 return -1;
1092
1093 pa_mutex_lock(e->mutex);
1094
1095 if (e->free_slots) {
1096 slot = e->free_slots;
1097 PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot);
1098 } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX)
1099 slot = &e->slots[e->n_init++];
1100 else {
1101 pa_mutex_unlock(e->mutex);
1102 pa_memblock_unref(b);
1103 return -1;
1104 }
1105
1106 PA_LLIST_PREPEND(struct memexport_slot, e->used_slots, slot);
1107 slot->block = b;
1108 *block_id = slot - e->slots;
1109
1110 pa_mutex_unlock(e->mutex);
1111 /* pa_log("Got block id %u", *block_id); */
1112
1113 data = pa_memblock_acquire(b);
1114
1115 if (b->type == PA_MEMBLOCK_IMPORTED) {
1116 pa_assert(b->per_type.imported.segment);
1117 memory = &b->per_type.imported.segment->memory;
1118 } else {
1119 pa_assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
1120 pa_assert(b->pool);
1121 memory = &b->pool->memory;
1122 }
1123
1124 pa_assert(data >= memory->ptr);
1125 pa_assert((uint8_t*) data + b->length <= (uint8_t*) memory->ptr + memory->size);
1126
1127 *shm_id = memory->id;
1128 *offset = (uint8_t*) data - (uint8_t*) memory->ptr;
1129 *size = b->length;
1130
1131 pa_memblock_release(b);
1132
1133 pa_atomic_inc(&e->pool->stat.n_exported);
1134 pa_atomic_add(&e->pool->stat.exported_size, b->length);
1135
1136 return 0;
1137 }