]> code.delx.au - pulseaudio/blob - src/pulsecore/asyncq.c
beefup proplist handling for sound events
[pulseaudio] / src / pulsecore / asyncq.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2006 Lennart Poettering
7
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
12
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21 USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/llist.h>
37 #include <pulsecore/flist.h>
38 #include <pulse/xmalloc.h>
39
40 #include "asyncq.h"
41 #include "fdsem.h"
42
43 #define ASYNCQ_SIZE 128
44
45 /* For debugging purposes we can define _Y to put and extra thread
46 * yield between each operation. */
47
48 /* #define PROFILE */
49
50 #ifdef PROFILE
51 #define _Y pa_thread_yield()
52 #else
53 #define _Y do { } while(0)
54 #endif
55
56 struct localq {
57 void *data;
58 PA_LLIST_FIELDS(struct localq);
59 };
60
61 struct pa_asyncq {
62 unsigned size;
63 unsigned read_idx;
64 unsigned write_idx;
65 pa_fdsem *read_fdsem, *write_fdsem;
66
67 PA_LLIST_HEAD(struct localq, localq);
68 struct localq *last_localq;
69 pa_bool_t waiting_for_post;
70 };
71
72 PA_STATIC_FLIST_DECLARE(localq, 0, pa_xfree);
73
74 #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
75
76 static int is_power_of_two(unsigned size) {
77 return !(size & (size - 1));
78 }
79
80 static int reduce(pa_asyncq *l, int value) {
81 return value & (unsigned) (l->size - 1);
82 }
83
84 pa_asyncq *pa_asyncq_new(unsigned size) {
85 pa_asyncq *l;
86
87 if (!size)
88 size = ASYNCQ_SIZE;
89
90 pa_assert(is_power_of_two(size));
91
92 l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size));
93
94 l->size = size;
95
96 PA_LLIST_HEAD_INIT(struct localq, l->localq);
97 l->last_localq = NULL;
98 l->waiting_for_post = FALSE;
99
100 if (!(l->read_fdsem = pa_fdsem_new())) {
101 pa_xfree(l);
102 return NULL;
103 }
104
105 if (!(l->write_fdsem = pa_fdsem_new())) {
106 pa_fdsem_free(l->read_fdsem);
107 pa_xfree(l);
108 return NULL;
109 }
110
111 return l;
112 }
113
114 void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
115 struct localq *q;
116 pa_assert(l);
117
118 if (free_cb) {
119 void *p;
120
121 while ((p = pa_asyncq_pop(l, 0)))
122 free_cb(p);
123 }
124
125 while ((q = l->localq)) {
126 if (free_cb)
127 free_cb(q->data);
128
129 PA_LLIST_REMOVE(struct localq, l->localq, q);
130
131 if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
132 pa_xfree(q);
133 }
134
135 pa_fdsem_free(l->read_fdsem);
136 pa_fdsem_free(l->write_fdsem);
137 pa_xfree(l);
138 }
139
140 static int push(pa_asyncq*l, void *p, pa_bool_t wait) {
141 int idx;
142 pa_atomic_ptr_t *cells;
143
144 pa_assert(l);
145 pa_assert(p);
146
147 cells = PA_ASYNCQ_CELLS(l);
148
149 _Y;
150 idx = reduce(l, l->write_idx);
151
152 if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
153
154 if (!wait)
155 return -1;
156
157 /* pa_log("sleeping on push"); */
158
159 do {
160 pa_fdsem_wait(l->read_fdsem);
161 } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p));
162 }
163
164 _Y;
165 l->write_idx++;
166
167 pa_fdsem_post(l->write_fdsem);
168
169 return 0;
170 }
171
172 static pa_bool_t flush_postq(pa_asyncq *l) {
173 struct localq *q;
174
175 pa_assert(l);
176
177 while ((q = l->last_localq)) {
178
179 if (push(l, q->data, FALSE) < 0)
180 return FALSE;
181
182 l->last_localq = q->prev;
183
184 PA_LLIST_REMOVE(struct localq, l->localq, q);
185
186 if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
187 pa_xfree(q);
188 }
189
190 return TRUE;
191 }
192
193 int pa_asyncq_push(pa_asyncq*l, void *p, pa_bool_t wait) {
194 pa_assert(l);
195
196 if (!flush_postq(l))
197 return -1;
198
199 return push(l, p, wait);
200 }
201
202 void pa_asyncq_post(pa_asyncq*l, void *p) {
203 struct localq *q;
204
205 pa_assert(l);
206 pa_assert(p);
207
208 if (pa_asyncq_push(l, p, FALSE) >= 0)
209 return;
210
211 /* OK, we couldn't push anything in the queue. So let's queue it
212 * locally and push it later */
213
214 pa_log("q overrun, queuing locally");
215
216 if (!(q = pa_flist_pop(PA_STATIC_FLIST_GET(localq))))
217 q = pa_xnew(struct localq, 1);
218
219 q->data = p;
220 PA_LLIST_PREPEND(struct localq, l->localq, q);
221
222 if (!l->last_localq)
223 l->last_localq = q;
224
225 return;
226 }
227
228 void* pa_asyncq_pop(pa_asyncq*l, pa_bool_t wait) {
229 int idx;
230 void *ret;
231 pa_atomic_ptr_t *cells;
232
233 pa_assert(l);
234
235 cells = PA_ASYNCQ_CELLS(l);
236
237 _Y;
238 idx = reduce(l, l->read_idx);
239
240 if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
241
242 if (!wait)
243 return NULL;
244
245 /* pa_log("sleeping on pop"); */
246
247 do {
248 pa_fdsem_wait(l->write_fdsem);
249 } while (!(ret = pa_atomic_ptr_load(&cells[idx])));
250 }
251
252 pa_assert(ret);
253
254 /* Guaranteed to succeed if we only have a single reader */
255 pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL));
256
257 _Y;
258 l->read_idx++;
259
260 pa_fdsem_post(l->read_fdsem);
261
262 return ret;
263 }
264
265 int pa_asyncq_read_fd(pa_asyncq *q) {
266 pa_assert(q);
267
268 return pa_fdsem_get(q->write_fdsem);
269 }
270
271 int pa_asyncq_read_before_poll(pa_asyncq *l) {
272 int idx;
273 pa_atomic_ptr_t *cells;
274
275 pa_assert(l);
276
277 cells = PA_ASYNCQ_CELLS(l);
278
279 _Y;
280 idx = reduce(l, l->read_idx);
281
282 for (;;) {
283 if (pa_atomic_ptr_load(&cells[idx]))
284 return -1;
285
286 if (pa_fdsem_before_poll(l->write_fdsem) >= 0)
287 return 0;
288 }
289
290 return 0;
291 }
292
293 void pa_asyncq_read_after_poll(pa_asyncq *l) {
294 pa_assert(l);
295
296 pa_fdsem_after_poll(l->write_fdsem);
297 }
298
299 int pa_asyncq_write_fd(pa_asyncq *q) {
300 pa_assert(q);
301
302 return pa_fdsem_get(q->read_fdsem);
303 }
304
305 void pa_asyncq_write_before_poll(pa_asyncq *l) {
306 pa_assert(l);
307
308 for (;;) {
309
310 if (flush_postq(l))
311 break;
312
313 if (pa_fdsem_before_poll(l->read_fdsem) >= 0) {
314 l->waiting_for_post = TRUE;
315 break;
316 }
317 }
318 }
319
320 void pa_asyncq_write_after_poll(pa_asyncq *l) {
321 pa_assert(l);
322
323 if (l->waiting_for_post) {
324 pa_fdsem_after_poll(l->read_fdsem);
325 l->waiting_for_post = FALSE;
326 }
327 }