]> code.delx.au - pulseaudio/blob - src/pulsecore/fdsem.c
make O_CLOEXEC, O_NONBLOCK and socket low latency fd ops more uniform: always return...
[pulseaudio] / src / pulsecore / fdsem.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2006 Lennart Poettering
7
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
12
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21 USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/core-util.h>
36 #include <pulse/xmalloc.h>
37
38 #include "fdsem.h"
39
40 struct pa_fdsem {
41 int fds[2];
42 pa_atomic_t waiting;
43 pa_atomic_t signalled;
44 pa_atomic_t in_pipe;
45 };
46
47 pa_fdsem *pa_fdsem_new(void) {
48 pa_fdsem *f;
49
50 f = pa_xnew(pa_fdsem, 1);
51
52 if (pipe(f->fds) < 0) {
53 pa_xfree(f);
54 return NULL;
55 }
56
57 pa_make_fd_cloexec(f->fds[0]);
58 pa_make_fd_cloexec(f->fds[1]);
59
60 pa_atomic_store(&f->waiting, 0);
61 pa_atomic_store(&f->signalled, 0);
62 pa_atomic_store(&f->in_pipe, 0);
63
64 return f;
65 }
66
67 void pa_fdsem_free(pa_fdsem *f) {
68 pa_assert(f);
69
70 pa_close_pipe(f->fds);
71
72 pa_xfree(f);
73 }
74
75 static void flush(pa_fdsem *f) {
76 ssize_t r;
77 pa_assert(f);
78
79 if (pa_atomic_load(&f->in_pipe) <= 0)
80 return;
81
82 do {
83 char x[10];
84
85 if ((r = read(f->fds[0], &x, sizeof(x))) <= 0) {
86 pa_assert(r < 0 && errno == EINTR);
87 continue;
88 }
89
90 } while (pa_atomic_sub(&f->in_pipe, r) > r);
91 }
92
93 void pa_fdsem_post(pa_fdsem *f) {
94 pa_assert(f);
95
96 if (pa_atomic_cmpxchg(&f->signalled, 0, 1)) {
97
98 if (pa_atomic_load(&f->waiting)) {
99 ssize_t r;
100 char x = 'x';
101
102 pa_atomic_inc(&f->in_pipe);
103
104 for (;;) {
105
106 if ((r = write(f->fds[1], &x, 1)) != 1) {
107 pa_assert(r < 0 && errno == EINTR);
108 continue;
109 }
110
111 break;
112 }
113 }
114 }
115 }
116
117 void pa_fdsem_wait(pa_fdsem *f) {
118 pa_assert(f);
119
120 flush(f);
121
122 if (pa_atomic_cmpxchg(&f->signalled, 1, 0))
123 return;
124
125 pa_atomic_inc(&f->waiting);
126
127 while (!pa_atomic_cmpxchg(&f->signalled, 1, 0)) {
128 char x[10];
129 ssize_t r;
130
131 if ((r = read(f->fds[0], &x, sizeof(x))) <= 0) {
132 pa_assert(r < 0 && errno == EINTR);
133 continue;
134 }
135
136 pa_atomic_sub(&f->in_pipe, r);
137 }
138
139 pa_assert_se(pa_atomic_dec(&f->waiting) >= 1);
140 }
141
142 int pa_fdsem_try(pa_fdsem *f) {
143 pa_assert(f);
144
145 flush(f);
146
147 if (pa_atomic_cmpxchg(&f->signalled, 1, 0))
148 return 1;
149
150 return 0;
151 }
152
153
154 int pa_fdsem_get(pa_fdsem *f) {
155 pa_assert(f);
156
157 return f->fds[0];
158 }
159
160 int pa_fdsem_before_poll(pa_fdsem *f) {
161 pa_assert(f);
162
163 flush(f);
164
165 if (pa_atomic_cmpxchg(&f->signalled, 1, 0))
166 return -1;
167
168 pa_atomic_inc(&f->waiting);
169
170 if (pa_atomic_cmpxchg(&f->signalled, 1, 0)) {
171 pa_assert_se(pa_atomic_dec(&f->waiting) >= 1);
172 return -1;
173 }
174 return 0;
175 }
176
177 int pa_fdsem_after_poll(pa_fdsem *f) {
178 pa_assert(f);
179
180 pa_assert_se(pa_atomic_dec(&f->waiting) >= 1);
181
182 flush(f);
183
184 if (pa_atomic_cmpxchg(&f->signalled, 1, 0))
185 return 1;
186
187 return 0;
188 }