]> code.delx.au - pulseaudio/blob - src/pulsecore/shmasyncq.c
merge glitch-free branch back into trunk
[pulseaudio] / src / pulsecore / shmasyncq.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2006 Lennart Poettering
7
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
12
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21 USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/core-util.h>
36 #include <pulse/xmalloc.h>
37
38 #include "fdsem.h"
39
40 /* For debugging purposes we can define _Y to put and extra thread
41 * yield between each operation. */
42
43 /* #define PROFILE */
44
45 #ifdef PROFILE
46 #define _Y pa_thread_yield()
47 #else
48 #define _Y do { } while(0)
49 #endif
50
51
52 struct pa_shmasyncq {
53 pa_fdsem *read_fdsem, *write_fdsem;
54 pa_shmasyncq_data *data;
55 };
56
57 static int is_power_of_two(unsigned size) {
58 return !(size & (size - 1));
59 }
60
61 static int reduce(pa_shmasyncq *l, int value) {
62 return value & (unsigned) (l->n_elements - 1);
63 }
64
65 static pa_atomic_t* get_cell(pa_shmasyncq *l, unsigned i) {
66 pa_assert(i < l->data->n_elements);
67
68 return (pa_atomic_t*) ((uint8*t) l->data + PA_ALIGN(sizeof(pa_shmasyncq_data)) + i * (PA_ALIGN(sizeof(pa_atomic_t)) + PA_ALIGN(element_size)))
69 }
70
71 static void *get_cell_data(pa_atomic_t *a) {
72 return (uint8_t*) a + PA_ALIGN(sizeof(atomic_t));
73 }
74
75 pa_shmasyncq *pa_shmasyncq_new(unsigned n_elements, size_t element_size, void *data, int fd[2]) {
76 pa_shmasyncq *l;
77
78 pa_assert(n_elements > 0);
79 pa_assert(is_power_of_two(n_elements));
80 pa_assert(element_size > 0);
81 pa_assert(data);
82 pa_assert(fd);
83
84 l = pa_xnew(pa_shmasyncq, 1);
85
86 l->data = data;
87 memset(data, 0, PA_SHMASYNCQ_SIZE(n_elements, element_size));
88
89 l->data->n_elements = n_elements;
90 l->data->element_size = element_size;
91
92 if (!(l->read_fdsem = pa_fdsem_new_shm(&d->read_fdsem_data, &fd[0]))) {
93 pa_xfree(l);
94 return NULL;
95 }
96
97 if (!(l->write_fdsem = pa_fdsem_new(&d->write_fdsem_data, &fd[1]))) {
98 pa_fdsem_free(l->read_fdsem);
99 pa_xfree(l);
100 return NULL;
101 }
102
103 return l;
104 }
105
106 void pa_shmasyncq_free(pa_shmasyncq *l, pa_free_cb_t free_cb) {
107 pa_assert(l);
108
109 if (free_cb) {
110 void *p;
111
112 while ((p = pa_shmasyncq_pop(l, 0)))
113 free_cb(p);
114 }
115
116 pa_fdsem_free(l->read_fdsem);
117 pa_fdsem_free(l->write_fdsem);
118 pa_xfree(l);
119 }
120
121 int pa_shmasyncq_push(pa_shmasyncq*l, void *p, int wait) {
122 int idx;
123 pa_atomic_ptr_t *cells;
124
125 pa_assert(l);
126 pa_assert(p);
127
128 cells = PA_SHMASYNCQ_CELLS(l);
129
130 _Y;
131 idx = reduce(l, l->write_idx);
132
133 if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
134
135 if (!wait)
136 return -1;
137
138 /* pa_log("sleeping on push"); */
139
140 do {
141 pa_fdsem_wait(l->read_fdsem);
142 } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p));
143 }
144
145 _Y;
146 l->write_idx++;
147
148 pa_fdsem_post(l->write_fdsem);
149
150 return 0;
151 }
152
153 void* pa_shmasyncq_pop(pa_shmasyncq*l, int wait) {
154 int idx;
155 void *ret;
156 pa_atomic_ptr_t *cells;
157
158 pa_assert(l);
159
160 cells = PA_SHMASYNCQ_CELLS(l);
161
162 _Y;
163 idx = reduce(l, l->read_idx);
164
165 if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
166
167 if (!wait)
168 return NULL;
169
170 /* pa_log("sleeping on pop"); */
171
172 do {
173 pa_fdsem_wait(l->write_fdsem);
174 } while (!(ret = pa_atomic_ptr_load(&cells[idx])));
175 }
176
177 pa_assert(ret);
178
179 /* Guaranteed to succeed if we only have a single reader */
180 pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL));
181
182 _Y;
183 l->read_idx++;
184
185 pa_fdsem_post(l->read_fdsem);
186
187 return ret;
188 }
189
190 int pa_shmasyncq_get_fd(pa_shmasyncq *q) {
191 pa_assert(q);
192
193 return pa_fdsem_get(q->write_fdsem);
194 }
195
196 int pa_shmasyncq_before_poll(pa_shmasyncq *l) {
197 int idx;
198 pa_atomic_ptr_t *cells;
199
200 pa_assert(l);
201
202 cells = PA_SHMASYNCQ_CELLS(l);
203
204 _Y;
205 idx = reduce(l, l->read_idx);
206
207 for (;;) {
208 if (pa_atomic_ptr_load(&cells[idx]))
209 return -1;
210
211 if (pa_fdsem_before_poll(l->write_fdsem) >= 0)
212 return 0;
213 }
214
215 return 0;
216 }
217
218 void pa_shmasyncq_after_poll(pa_shmasyncq *l) {
219 pa_assert(l);
220
221 pa_fdsem_after_poll(l->write_fdsem);
222 }