]>
code.delx.au - pulseaudio/blob - src/pulsecore/asyncq.c
4 This file is part of PulseAudio.
6 Copyright 2006 Lennart Poettering
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/core-util.h>
36 #include <pulse/xmalloc.h>
41 #define ASYNCQ_SIZE 128
43 /* For debugging purposes we can define _Y to put and extra thread
44 * yield between each operation. */
49 #define _Y pa_thread_yield()
51 #define _Y do { } while(0)
58 pa_fdsem
*read_fdsem
, *write_fdsem
;
61 #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
63 static int is_power_of_two(unsigned size
) {
64 return !(size
& (size
- 1));
67 static int reduce(pa_asyncq
*l
, int value
) {
68 return value
& (unsigned) (l
->size
- 1);
71 pa_asyncq
*pa_asyncq_new(unsigned size
) {
77 pa_assert(is_power_of_two(size
));
79 l
= pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq
)) + (sizeof(pa_atomic_ptr_t
) * size
));
83 if (!(l
->read_fdsem
= pa_fdsem_new())) {
88 if (!(l
->write_fdsem
= pa_fdsem_new())) {
89 pa_fdsem_free(l
->read_fdsem
);
97 void pa_asyncq_free(pa_asyncq
*l
, pa_free_cb_t free_cb
) {
103 while ((p
= pa_asyncq_pop(l
, 0)))
107 pa_fdsem_free(l
->read_fdsem
);
108 pa_fdsem_free(l
->write_fdsem
);
112 int pa_asyncq_push(pa_asyncq
*l
, void *p
, int wait
) {
114 pa_atomic_ptr_t
*cells
;
119 cells
= PA_ASYNCQ_CELLS(l
);
122 idx
= reduce(l
, l
->write_idx
);
124 if (!pa_atomic_ptr_cmpxchg(&cells
[idx
], NULL
, p
)) {
129 /* pa_log("sleeping on push"); */
132 pa_fdsem_wait(l
->read_fdsem
);
133 } while (!pa_atomic_ptr_cmpxchg(&cells
[idx
], NULL
, p
));
139 pa_fdsem_post(l
->write_fdsem
);
144 void* pa_asyncq_pop(pa_asyncq
*l
, int wait
) {
147 pa_atomic_ptr_t
*cells
;
151 cells
= PA_ASYNCQ_CELLS(l
);
154 idx
= reduce(l
, l
->read_idx
);
156 if (!(ret
= pa_atomic_ptr_load(&cells
[idx
]))) {
161 /* pa_log("sleeping on pop"); */
164 pa_fdsem_wait(l
->write_fdsem
);
165 } while (!(ret
= pa_atomic_ptr_load(&cells
[idx
])));
170 /* Guaranteed to succeed if we only have a single reader */
171 pa_assert_se(pa_atomic_ptr_cmpxchg(&cells
[idx
], ret
, NULL
));
176 pa_fdsem_post(l
->read_fdsem
);
181 int pa_asyncq_get_fd(pa_asyncq
*q
) {
184 return pa_fdsem_get(q
->write_fdsem
);
187 int pa_asyncq_before_poll(pa_asyncq
*l
) {
189 pa_atomic_ptr_t
*cells
;
193 cells
= PA_ASYNCQ_CELLS(l
);
196 idx
= reduce(l
, l
->read_idx
);
199 if (pa_atomic_ptr_load(&cells
[idx
]))
202 if (pa_fdsem_before_poll(l
->write_fdsem
) >= 0)
209 void pa_asyncq_after_poll(pa_asyncq
*l
) {
212 pa_fdsem_after_poll(l
->write_fdsem
);