]> code.delx.au - pulseaudio/blob - src/polyp.c
make native playback work
[pulseaudio] / src / polyp.c
1 #include <stdio.h>
2 #include <assert.h>
3 #include <stdlib.h>
4 #include <string.h>
5
6 #include "polyp.h"
7 #include "protocol-native-spec.h"
8 #include "pdispatch.h"
9 #include "pstream.h"
10 #include "dynarray.h"
11 #include "socket-client.h"
12 #include "pstream-util.h"
13
14 #define DEFAULT_QUEUE_LENGTH 10240
15 #define DEFAULT_MAX_LENGTH 20480
16 #define DEFAULT_PREBUF 4096
17 #define DEFAULT_TIMEOUT (5*60)
18 #define DEFAULT_SERVER "/tmp/polypaudio_native"
19
20 struct pa_context {
21 char *name;
22 struct pa_mainloop_api* mainloop;
23 struct socket_client *client;
24 struct pstream *pstream;
25 struct pdispatch *pdispatch;
26 struct dynarray *streams;
27 struct pa_stream *first_stream;
28 uint32_t ctag;
29 uint32_t errno;
30 enum { CONTEXT_UNCONNECTED, CONTEXT_CONNECTING, CONTEXT_READY, CONTEXT_DEAD} state;
31
32 void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata);
33 void *connect_complete_userdata;
34
35 void (*die_callback)(struct pa_context*c, void *userdata);
36 void *die_userdata;
37 };
38
39 struct pa_stream {
40 struct pa_context *context;
41 struct pa_stream *next, *previous;
42 uint32_t device_index;
43 uint32_t channel;
44 int channel_valid;
45 enum pa_stream_direction direction;
46 enum { STREAM_CREATING, STREAM_READY, STREAM_DEAD} state;
47 uint32_t requested_bytes;
48
49 void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata);
50 void *read_userdata;
51
52 void (*write_callback)(struct pa_stream *p, size_t length, void *userdata);
53 void *write_userdata;
54
55 void (*create_complete_callback)(struct pa_context*c, struct pa_stream *s, void *userdata);
56 void *create_complete_userdata;
57
58 void (*die_callback)(struct pa_stream*c, void *userdata);
59 void *die_userdata;
60 };
61
62 static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
63
64 static const struct pdispatch_command command_table[PA_COMMAND_MAX] = {
65 [PA_COMMAND_ERROR] = { NULL },
66 [PA_COMMAND_REPLY] = { NULL },
67 [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { NULL },
68 [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { NULL },
69 [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL },
70 [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL },
71 [PA_COMMAND_EXIT] = { NULL },
72 [PA_COMMAND_REQUEST] = { command_request },
73 };
74
75 struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) {
76 assert(mainloop && name);
77 struct pa_context *c;
78 c = malloc(sizeof(struct pa_context));
79 assert(c);
80 c->name = strdup(name);
81 c->mainloop = mainloop;
82 c->client = NULL;
83 c->pstream = NULL;
84 c->pdispatch = NULL;
85 c->streams = dynarray_new();
86 assert(c->streams);
87 c->first_stream = NULL;
88 c->errno = PA_ERROR_OK;
89 c->state = CONTEXT_UNCONNECTED;
90 c->ctag = 0;
91
92 c->connect_complete_callback = NULL;
93 c->connect_complete_userdata = NULL;
94
95 c->die_callback = NULL;
96 c->die_userdata = NULL;
97
98 return c;
99 }
100
101 void pa_context_free(struct pa_context *c) {
102 assert(c);
103
104 while (c->first_stream)
105 pa_stream_free(c->first_stream);
106
107 if (c->client)
108 socket_client_free(c->client);
109 if (c->pdispatch)
110 pdispatch_free(c->pdispatch);
111 if (c->pstream)
112 pstream_free(c->pstream);
113 if (c->streams)
114 dynarray_free(c->streams, NULL, NULL);
115
116 free(c->name);
117 free(c);
118 }
119
120 static void stream_dead(struct pa_stream *s) {
121 if (s->state == STREAM_DEAD)
122 return;
123
124 s->state = STREAM_DEAD;
125 if (s->die_callback)
126 s->die_callback(s, s->die_userdata);
127 }
128
129 static void context_dead(struct pa_context *c) {
130 struct pa_stream *s;
131 assert(c);
132
133 for (s = c->first_stream; s; s = s->next)
134 stream_dead(s);
135
136 if (c->state == CONTEXT_DEAD)
137 return;
138
139 c->state = CONTEXT_DEAD;
140 if (c->die_callback)
141 c->die_callback(c, c->die_userdata);
142 }
143
144 static void pstream_die_callback(struct pstream *p, void *userdata) {
145 struct pa_context *c = userdata;
146 assert(p && c);
147
148 assert(c->state != CONTEXT_DEAD);
149
150 c->state = CONTEXT_DEAD;
151
152 context_dead(c);
153 }
154
155 static int pstream_packet_callback(struct pstream *p, struct packet *packet, void *userdata) {
156 struct pa_context *c = userdata;
157 assert(p && packet && c);
158
159 if (pdispatch_run(c->pdispatch, packet, c) < 0) {
160 fprintf(stderr, "polyp.c: invalid packet.\n");
161 return -1;
162 }
163
164 return 0;
165 }
166
167 static int pstream_memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata) {
168 struct pa_context *c = userdata;
169 struct pa_stream *s;
170 assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
171
172 if (!(s = dynarray_get(c->streams, channel)))
173 return -1;
174
175 if (s->read_callback)
176 s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata);
177
178 return 0;
179 }
180
181 static void on_connection(struct socket_client *client, struct iochannel*io, void *userdata) {
182 struct pa_context *c = userdata;
183 assert(client && io && c && c->state == CONTEXT_CONNECTING);
184
185 socket_client_free(client);
186 c->client = NULL;
187
188 if (!io) {
189 c->errno = PA_ERROR_CONNECTIONREFUSED;
190 c->state = CONTEXT_UNCONNECTED;
191
192 if (c->connect_complete_callback)
193 c->connect_complete_callback(c, 0, c->connect_complete_userdata);
194
195 return;
196 }
197
198 c->pstream = pstream_new(c->mainloop, io);
199 assert(c->pstream);
200 pstream_set_die_callback(c->pstream, pstream_die_callback, c);
201 pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
202 pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
203
204 c->pdispatch = pdispatch_new(c->mainloop, command_table, PA_COMMAND_MAX);
205 assert(c->pdispatch);
206
207 c->state = CONTEXT_READY;
208
209 if (c->connect_complete_callback)
210 c->connect_complete_callback(c, 1, c->connect_complete_userdata);
211 }
212
213 int pa_context_connect(struct pa_context *c, const char *server, void (*complete) (struct pa_context*c, int success, void *userdata), void *userdata) {
214 assert(c && c->state == CONTEXT_UNCONNECTED);
215
216 assert(!c->client);
217 if (!(c->client = socket_client_new_unix(c->mainloop, server ? server : DEFAULT_SERVER))) {
218 c->errno = PA_ERROR_CONNECTIONREFUSED;
219 return -1;
220 }
221
222 c->connect_complete_callback = complete;
223 c->connect_complete_userdata = userdata;
224
225 socket_client_set_callback(c->client, on_connection, c);
226 c->state = CONTEXT_CONNECTING;
227
228 return 0;
229 }
230
231 int pa_context_is_dead(struct pa_context *c) {
232 assert(c);
233 return c->state == CONTEXT_DEAD;
234 }
235
236 int pa_context_is_ready(struct pa_context *c) {
237 assert(c);
238 return c->state == CONTEXT_READY;
239 }
240
241 int pa_context_errno(struct pa_context *c) {
242 assert(c);
243 return c->errno;
244 }
245
246 void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata) {
247 assert(c);
248 c->die_callback = cb;
249 c->die_userdata = userdata;
250 }
251
252 static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
253 struct pa_stream *s;
254 struct pa_context *c = userdata;
255 uint32_t bytes, channel;
256 assert(pd && command == PA_COMMAND_REQUEST && t && c);
257
258 if (tagstruct_getu32(t, &channel) < 0 ||
259 tagstruct_getu32(t, &bytes) < 0 ||
260 !tagstruct_eof(t)) {
261 c->errno = PA_ERROR_PROTOCOL;
262 return -1;
263 }
264
265 if (!(s = dynarray_get(c->streams, channel))) {
266 c->errno = PA_ERROR_PROTOCOL;
267 return -1;
268 }
269
270 /*fprintf(stderr, "Requested %u bytes\n", bytes);*/
271
272 s->requested_bytes += bytes;
273
274 if (s->requested_bytes && s->write_callback)
275 s->write_callback(s, s->requested_bytes, s->write_userdata);
276
277 return 0;
278 }
279
280 static int create_playback_callback(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
281 int ret = 0;
282 struct pa_stream *s = userdata;
283 assert(pd && s && s->state == STREAM_CREATING);
284
285 if (command != PA_COMMAND_REPLY) {
286 struct pa_context *c = s->context;
287 assert(c);
288
289 if (command == PA_COMMAND_ERROR && tagstruct_getu32(t, &s->context->errno) < 0) {
290 s->context->errno = PA_ERROR_PROTOCOL;
291 ret = -1;
292 } else if (command == PA_COMMAND_TIMEOUT) {
293 s->context->errno = PA_ERROR_TIMEOUT;
294 ret = -1;
295 }
296
297 goto fail;
298 }
299
300 if (tagstruct_getu32(t, &s->channel) < 0 ||
301 tagstruct_getu32(t, &s->device_index) < 0 ||
302 !tagstruct_eof(t)) {
303 s->context->errno = PA_ERROR_PROTOCOL;
304 ret = -1;
305 goto fail;
306 }
307
308 s->channel_valid = 1;
309 dynarray_put(s->context->streams, s->channel, s);
310
311 s->state = STREAM_READY;
312 assert(s->create_complete_callback);
313 s->create_complete_callback(s->context, s, s->create_complete_userdata);
314 return 0;
315
316 fail:
317 assert(s->create_complete_callback);
318 s->create_complete_callback(s->context, NULL, s->create_complete_userdata);
319 pa_stream_free(s);
320 return ret;
321 }
322
323 int pa_stream_new(
324 struct pa_context *c,
325 enum pa_stream_direction dir,
326 const char *dev,
327 const char *name,
328 const struct pa_sample_spec *ss,
329 const struct pa_buffer_attr *attr,
330 void (*complete) (struct pa_context*c, struct pa_stream *s, void *userdata),
331 void *userdata) {
332
333 struct pa_stream *s;
334 struct tagstruct *t;
335 uint32_t tag;
336
337 assert(c && name && ss && c->state == CONTEXT_READY && complete);
338
339 s = malloc(sizeof(struct pa_stream));
340 assert(s);
341 s->context = c;
342
343 s->read_callback = NULL;
344 s->read_userdata = NULL;
345 s->write_callback = NULL;
346 s->write_userdata = NULL;
347 s->die_callback = NULL;
348 s->die_userdata = NULL;
349 s->create_complete_callback = complete;
350 s->create_complete_userdata = NULL;
351
352 s->state = STREAM_CREATING;
353 s->requested_bytes = 0;
354 s->channel = 0;
355 s->channel_valid = 0;
356 s->device_index = (uint32_t) -1;
357 s->direction = dir;
358
359 t = tagstruct_new(NULL, 0);
360 assert(t);
361
362 tagstruct_putu32(t, dir == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM);
363 tagstruct_putu32(t, tag = c->ctag++);
364 tagstruct_puts(t, name);
365 tagstruct_put_sample_spec(t, ss);
366 tagstruct_putu32(t, (uint32_t) -1);
367 tagstruct_putu32(t, attr ? attr->queue_length : DEFAULT_QUEUE_LENGTH);
368 tagstruct_putu32(t, attr ? attr->max_length : DEFAULT_MAX_LENGTH);
369 tagstruct_putu32(t, attr ? attr->prebuf : DEFAULT_PREBUF);
370
371 pstream_send_tagstruct(c->pstream, t);
372
373 pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s);
374
375 s->next = c->first_stream;
376 if (s->next)
377 s->next->previous = s;
378 s->previous = NULL;
379 c->first_stream = s;
380
381 return 0;
382 }
383
384 void pa_stream_free(struct pa_stream *s) {
385 assert(s && s->context);
386
387 if (s->channel_valid) {
388 struct tagstruct *t = tagstruct_new(NULL, 0);
389 assert(t);
390
391 tagstruct_putu32(t, PA_COMMAND_DELETE_PLAYBACK_STREAM);
392 tagstruct_putu32(t, s->context->ctag++);
393 tagstruct_putu32(t, s->channel);
394 pstream_send_tagstruct(s->context->pstream, t);
395 }
396
397 if (s->channel_valid)
398 dynarray_put(s->context->streams, s->channel, NULL);
399
400 if (s->next)
401 s->next->previous = s->previous;
402 if (s->previous)
403 s->previous->next = s->next;
404 else
405 s->context->first_stream = s->next;
406
407 free(s);
408 }
409
410 void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata) {
411 assert(s && cb);
412 s->write_callback = cb;
413 s->write_userdata = userdata;
414 }
415
416 void pa_stream_write(struct pa_stream *s, const void *data, size_t length) {
417 struct memchunk chunk;
418 assert(s && s->context && data && length);
419
420 chunk.memblock = memblock_new(length);
421 assert(chunk.memblock && chunk.memblock->data);
422 memcpy(chunk.memblock->data, data, length);
423 chunk.index = 0;
424 chunk.length = length;
425
426 pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk);
427 memblock_unref(chunk.memblock);
428
429 /*fprintf(stderr, "Sent %u bytes\n", length);*/
430
431 if (length < s->requested_bytes)
432 s->requested_bytes -= length;
433 else
434 s->requested_bytes = 0;
435 }
436
437 size_t pa_stream_writable_size(struct pa_stream *s) {
438 assert(s);
439 return s->requested_bytes;
440 }
441
442 void pa_stream_set_read_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) {
443 assert(s && cb);
444 s->read_callback = cb;
445 s->read_userdata = userdata;
446 }
447
448 int pa_stream_is_dead(struct pa_stream *s) {
449 return s->state == STREAM_DEAD;
450 }
451
452 int pa_stream_is_ready(struct pa_stream*s) {
453 return s->state == STREAM_READY;
454 }
455
456 void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata) {
457 assert(s);
458 s->die_callback = cb;
459 s->die_userdata = userdata;
460 }