4 This file is part of PulseAudio.
6 Copyright 2006 Lennart Poettering
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/llist.h>
37 #include <pulsecore/flist.h>
38 #include <pulse/xmalloc.h>
43 #define ASYNCQ_SIZE 128
45 /* For debugging purposes we can define _Y to put and extra thread
46 * yield between each operation. */
51 #define _Y pa_thread_yield()
53 #define _Y do { } while(0)
58 PA_LLIST_FIELDS(struct localq
);
65 pa_fdsem
*read_fdsem
, *write_fdsem
;
67 PA_LLIST_HEAD(struct localq
, localq
);
68 struct localq
*last_localq
;
69 pa_bool_t waiting_for_post
;
72 PA_STATIC_FLIST_DECLARE(localq
, 0, pa_xfree
);
74 #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
76 static int is_power_of_two(unsigned size
) {
77 return !(size
& (size
- 1));
80 static int reduce(pa_asyncq
*l
, int value
) {
81 return value
& (unsigned) (l
->size
- 1);
84 pa_asyncq
*pa_asyncq_new(unsigned size
) {
90 pa_assert(is_power_of_two(size
));
92 l
= pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq
)) + (sizeof(pa_atomic_ptr_t
) * size
));
96 PA_LLIST_HEAD_INIT(struct localq
, l
->localq
);
97 l
->last_localq
= NULL
;
98 l
->waiting_for_post
= FALSE
;
100 if (!(l
->read_fdsem
= pa_fdsem_new())) {
105 if (!(l
->write_fdsem
= pa_fdsem_new())) {
106 pa_fdsem_free(l
->read_fdsem
);
114 void pa_asyncq_free(pa_asyncq
*l
, pa_free_cb_t free_cb
) {
121 while ((p
= pa_asyncq_pop(l
, 0)))
125 while ((q
= l
->localq
)) {
129 PA_LLIST_REMOVE(struct localq
, l
->localq
, q
);
131 if (pa_flist_push(PA_STATIC_FLIST_GET(localq
), q
) < 0)
135 pa_fdsem_free(l
->read_fdsem
);
136 pa_fdsem_free(l
->write_fdsem
);
140 static int push(pa_asyncq
*l
, void *p
, pa_bool_t wait
) {
142 pa_atomic_ptr_t
*cells
;
147 cells
= PA_ASYNCQ_CELLS(l
);
150 idx
= reduce(l
, l
->write_idx
);
152 if (!pa_atomic_ptr_cmpxchg(&cells
[idx
], NULL
, p
)) {
157 /* pa_log("sleeping on push"); */
160 pa_fdsem_wait(l
->read_fdsem
);
161 } while (!pa_atomic_ptr_cmpxchg(&cells
[idx
], NULL
, p
));
167 pa_fdsem_post(l
->write_fdsem
);
172 static pa_bool_t
flush_postq(pa_asyncq
*l
) {
177 while ((q
= l
->last_localq
)) {
179 if (push(l
, q
->data
, FALSE
) < 0)
182 l
->last_localq
= q
->prev
;
184 PA_LLIST_REMOVE(struct localq
, l
->localq
, q
);
186 if (pa_flist_push(PA_STATIC_FLIST_GET(localq
), q
) < 0)
193 int pa_asyncq_push(pa_asyncq
*l
, void *p
, pa_bool_t wait
) {
199 return push(l
, p
, wait
);
202 void pa_asyncq_post(pa_asyncq
*l
, void *p
) {
208 if (pa_asyncq_push(l
, p
, FALSE
) >= 0)
211 /* OK, we couldn't push anything in the queue. So let's queue it
212 * locally and push it later */
214 pa_log("q overrun, queuing locally");
216 if (!(q
= pa_flist_pop(PA_STATIC_FLIST_GET(localq
))))
217 q
= pa_xnew(struct localq
, 1);
220 PA_LLIST_PREPEND(struct localq
, l
->localq
, q
);
228 void* pa_asyncq_pop(pa_asyncq
*l
, pa_bool_t wait
) {
231 pa_atomic_ptr_t
*cells
;
235 cells
= PA_ASYNCQ_CELLS(l
);
238 idx
= reduce(l
, l
->read_idx
);
240 if (!(ret
= pa_atomic_ptr_load(&cells
[idx
]))) {
245 /* pa_log("sleeping on pop"); */
248 pa_fdsem_wait(l
->write_fdsem
);
249 } while (!(ret
= pa_atomic_ptr_load(&cells
[idx
])));
254 /* Guaranteed to succeed if we only have a single reader */
255 pa_assert_se(pa_atomic_ptr_cmpxchg(&cells
[idx
], ret
, NULL
));
260 pa_fdsem_post(l
->read_fdsem
);
265 int pa_asyncq_read_fd(pa_asyncq
*q
) {
268 return pa_fdsem_get(q
->write_fdsem
);
271 int pa_asyncq_read_before_poll(pa_asyncq
*l
) {
273 pa_atomic_ptr_t
*cells
;
277 cells
= PA_ASYNCQ_CELLS(l
);
280 idx
= reduce(l
, l
->read_idx
);
283 if (pa_atomic_ptr_load(&cells
[idx
]))
286 if (pa_fdsem_before_poll(l
->write_fdsem
) >= 0)
293 void pa_asyncq_read_after_poll(pa_asyncq
*l
) {
296 pa_fdsem_after_poll(l
->write_fdsem
);
299 int pa_asyncq_write_fd(pa_asyncq
*q
) {
302 return pa_fdsem_get(q
->read_fdsem
);
305 void pa_asyncq_write_before_poll(pa_asyncq
*l
) {
313 if (pa_fdsem_before_poll(l
->read_fdsem
) >= 0) {
314 l
->waiting_for_post
= TRUE
;
320 void pa_asyncq_write_after_poll(pa_asyncq
*l
) {
323 if (l
->waiting_for_post
) {
324 pa_fdsem_after_poll(l
->read_fdsem
);
325 l
->waiting_for_post
= FALSE
;