]> code.delx.au - pulseaudio/blob - src/pulsecore/asyncq.c
merge 'lennart' branch back into trunk.
[pulseaudio] / src / pulsecore / asyncq.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2006 Lennart Poettering
7
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
12
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21 USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/core-util.h>
36 #include <pulse/xmalloc.h>
37
38 #include "asyncq.h"
39 #include "fdsem.h"
40
41 #define ASYNCQ_SIZE 128
42
43 /* For debugging purposes we can define _Y to put and extra thread
44 * yield between each operation. */
45
46 /* #define PROFILE */
47
48 #ifdef PROFILE
49 #define _Y pa_thread_yield()
50 #else
51 #define _Y do { } while(0)
52 #endif
53
54 struct pa_asyncq {
55 unsigned size;
56 unsigned read_idx;
57 unsigned write_idx;
58 pa_fdsem *read_fdsem, *write_fdsem;
59 };
60
61 #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
62
63 static int is_power_of_two(unsigned size) {
64 return !(size & (size - 1));
65 }
66
67 static int reduce(pa_asyncq *l, int value) {
68 return value & (unsigned) (l->size - 1);
69 }
70
71 pa_asyncq *pa_asyncq_new(unsigned size) {
72 pa_asyncq *l;
73
74 if (!size)
75 size = ASYNCQ_SIZE;
76
77 pa_assert(is_power_of_two(size));
78
79 l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size));
80
81 l->size = size;
82
83 if (!(l->read_fdsem = pa_fdsem_new())) {
84 pa_xfree(l);
85 return NULL;
86 }
87
88 if (!(l->write_fdsem = pa_fdsem_new())) {
89 pa_fdsem_free(l->read_fdsem);
90 pa_xfree(l);
91 return NULL;
92 }
93
94 return l;
95 }
96
97 void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
98 pa_assert(l);
99
100 if (free_cb) {
101 void *p;
102
103 while ((p = pa_asyncq_pop(l, 0)))
104 free_cb(p);
105 }
106
107 pa_fdsem_free(l->read_fdsem);
108 pa_fdsem_free(l->write_fdsem);
109 pa_xfree(l);
110 }
111
112 int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
113 int idx;
114 pa_atomic_ptr_t *cells;
115
116 pa_assert(l);
117 pa_assert(p);
118
119 cells = PA_ASYNCQ_CELLS(l);
120
121 _Y;
122 idx = reduce(l, l->write_idx);
123
124 if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
125
126 if (!wait)
127 return -1;
128
129 /* pa_log("sleeping on push"); */
130
131 do {
132 pa_fdsem_wait(l->read_fdsem);
133 } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p));
134 }
135
136 _Y;
137 l->write_idx++;
138
139 pa_fdsem_post(l->write_fdsem);
140
141 return 0;
142 }
143
144 void* pa_asyncq_pop(pa_asyncq*l, int wait) {
145 int idx;
146 void *ret;
147 pa_atomic_ptr_t *cells;
148
149 pa_assert(l);
150
151 cells = PA_ASYNCQ_CELLS(l);
152
153 _Y;
154 idx = reduce(l, l->read_idx);
155
156 if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
157
158 if (!wait)
159 return NULL;
160
161 /* pa_log("sleeping on pop"); */
162
163 do {
164 pa_fdsem_wait(l->write_fdsem);
165 } while (!(ret = pa_atomic_ptr_load(&cells[idx])));
166 }
167
168 pa_assert(ret);
169
170 /* Guaranteed to succeed if we only have a single reader */
171 pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL));
172
173 _Y;
174 l->read_idx++;
175
176 pa_fdsem_post(l->read_fdsem);
177
178 return ret;
179 }
180
181 int pa_asyncq_get_fd(pa_asyncq *q) {
182 pa_assert(q);
183
184 return pa_fdsem_get(q->write_fdsem);
185 }
186
187 int pa_asyncq_before_poll(pa_asyncq *l) {
188 int idx;
189 pa_atomic_ptr_t *cells;
190
191 pa_assert(l);
192
193 cells = PA_ASYNCQ_CELLS(l);
194
195 _Y;
196 idx = reduce(l, l->read_idx);
197
198 for (;;) {
199 if (pa_atomic_ptr_load(&cells[idx]))
200 return -1;
201
202 if (pa_fdsem_before_poll(l->write_fdsem) >= 0)
203 return 0;
204 }
205
206 return 0;
207 }
208
209 void pa_asyncq_after_poll(pa_asyncq *l) {
210 pa_assert(l);
211
212 pa_fdsem_after_poll(l->write_fdsem);
213 }