]> code.delx.au - pulseaudio/blob - src/polyp.c
main part of the native protocol
[pulseaudio] / src / polyp.c
1 #include <stdio.h>
2 #include <assert.h>
3 #include <stdlib.h>
4 #include <string.h>
5
6 #include "polyp.h"
7 #include "protocol-native-spec.h"
8 #include "pdispatch.h"
9 #include "pstream.h"
10 #include "dynarray.h"
11 #include "socket-client.h"
12 #include "pstream-util.h"
13
14 #define DEFAULT_QUEUE_LENGTH 10240
15 #define DEFAULT_MAX_LENGTH 20480
16 #define DEFAULT_PREBUF 4096
17 #define DEFAULT_TIMEOUT 5
18
19 struct pa_context {
20 char *name;
21 struct pa_mainloop_api* mainloop;
22 struct socket_client *client;
23 struct pstream *pstream;
24 struct pdispatch *pdispatch;
25 struct dynarray *streams;
26 struct pa_stream *first_stream;
27 uint32_t ctag;
28 uint32_t errno;
29 enum { CONTEXT_UNCONNECTED, CONTEXT_CONNECTING, CONTEXT_READY, CONTEXT_DEAD} state;
30
31 void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata);
32 void *connect_complete_userdata;
33
34 void (*die_callback)(struct pa_context*c, void *userdata);
35 void *die_userdata;
36 };
37
38 struct pa_stream {
39 struct pa_context *context;
40 struct pa_stream *next, *previous;
41 uint32_t channel;
42 int channel_valid;
43 enum pa_stream_direction direction;
44 enum { STREAM_CREATING, STREAM_READY, STREAM_DEAD} state;
45 uint32_t requested_bytes;
46
47 void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata);
48 void *read_userdata;
49
50 void (*write_callback)(struct pa_stream *p, size_t length, void *userdata);
51 void *write_userdata;
52
53 void (*create_complete_callback)(struct pa_context*c, struct pa_stream *s, void *userdata);
54 void *create_complete_userdata;
55
56 void (*die_callback)(struct pa_stream*c, void *userdata);
57 void *die_userdata;
58 };
59
60 static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
61
62 static const struct pdispatch_command command_table[PA_COMMAND_MAX] = {
63 [PA_COMMAND_ERROR] = { NULL },
64 [PA_COMMAND_REPLY] = { NULL },
65 [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { NULL },
66 [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { NULL },
67 [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL },
68 [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL },
69 [PA_COMMAND_EXIT] = { NULL },
70 [PA_COMMAND_REQUEST] = { command_request },
71 };
72
73 struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) {
74 assert(mainloop && name);
75 struct pa_context *c;
76 c = malloc(sizeof(struct pa_context));
77 assert(c);
78 c->name = strdup(name);
79 c->mainloop = mainloop;
80 c->client = NULL;
81 c->pstream = NULL;
82 c->pdispatch = NULL;
83 c->streams = dynarray_new();
84 assert(c->streams);
85 c->first_stream = NULL;
86 c->errno = PA_ERROR_OK;
87 c->state = CONTEXT_UNCONNECTED;
88 c->ctag = 0;
89
90 c->connect_complete_callback = NULL;
91 c->connect_complete_userdata = NULL;
92
93 c->die_callback = NULL;
94 c->die_userdata = NULL;
95
96 return c;
97 }
98
99 void pa_context_free(struct pa_context *c) {
100 assert(c);
101
102 while (c->first_stream)
103 pa_stream_free(c->first_stream);
104
105 if (c->client)
106 socket_client_free(c->client);
107 if (c->pdispatch)
108 pdispatch_free(c->pdispatch);
109 if (c->pstream)
110 pstream_free(c->pstream);
111 if (c->streams)
112 dynarray_free(c->streams, NULL, NULL);
113
114 free(c->name);
115 free(c);
116 }
117
118 static void stream_dead(struct pa_stream *s) {
119 if (s->state == STREAM_DEAD)
120 return;
121
122 s->state = STREAM_DEAD;
123 if (s->die_callback)
124 s->die_callback(s, s->die_userdata);
125 }
126
127 static void context_dead(struct pa_context *c) {
128 struct pa_stream *s;
129 assert(c);
130
131 for (s = c->first_stream; s; s = s->next)
132 stream_dead(s);
133
134 if (c->state == CONTEXT_DEAD)
135 return;
136
137 c->state = CONTEXT_DEAD;
138 if (c->die_callback)
139 c->die_callback(c, c->die_userdata);
140 }
141
142 static void pstream_die_callback(struct pstream *p, void *userdata) {
143 struct pa_context *c = userdata;
144 assert(p && c);
145
146 assert(c->state != CONTEXT_DEAD);
147
148 c->state = CONTEXT_DEAD;
149
150 context_dead(c);
151 }
152
153 static int pstream_packet_callback(struct pstream *p, struct packet *packet, void *userdata) {
154 struct pa_context *c = userdata;
155 assert(p && packet && c);
156
157 if (pdispatch_run(c->pdispatch, packet, c) < 0) {
158 fprintf(stderr, "polyp.c: invalid packet.\n");
159 return -1;
160 }
161
162 return 0;
163 }
164
165 static int pstream_memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata) {
166 struct pa_context *c = userdata;
167 struct pa_stream *s;
168 assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
169
170 if (!(s = dynarray_get(c->streams, channel)))
171 return -1;
172
173 if (s->read_callback)
174 s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata);
175
176 return 0;
177 }
178
179 static void on_connection(struct socket_client *client, struct iochannel*io, void *userdata) {
180 struct pa_context *c = userdata;
181 assert(client && io && c && c->state == CONTEXT_CONNECTING);
182
183 socket_client_free(client);
184 c->client = NULL;
185
186 if (!io) {
187 c->errno = PA_ERROR_CONNECTIONREFUSED;
188 c->state = CONTEXT_UNCONNECTED;
189
190 if (c->connect_complete_callback)
191 c->connect_complete_callback(c, 0, c->connect_complete_userdata);
192
193 return;
194 }
195
196 c->pstream = pstream_new(c->mainloop, io);
197 assert(c->pstream);
198 pstream_set_die_callback(c->pstream, pstream_die_callback, c);
199 pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
200 pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
201
202 c->pdispatch = pdispatch_new(c->mainloop, command_table, PA_COMMAND_MAX);
203 assert(c->pdispatch);
204
205 c->state = CONTEXT_READY;
206
207 if (c->connect_complete_callback)
208 c->connect_complete_callback(c, 1, c->connect_complete_userdata);
209 }
210
211 int pa_context_connect(struct pa_context *c, const char *server, void (*complete) (struct pa_context*c, int success, void *userdata), void *userdata) {
212 assert(c && c->state == CONTEXT_UNCONNECTED);
213
214 assert(!c->client);
215 if (!(c->client = socket_client_new_unix(c->mainloop, server))) {
216 c->errno = PA_ERROR_CONNECTIONREFUSED;
217 return -1;
218 }
219
220 c->connect_complete_callback = complete;
221 c->connect_complete_userdata = userdata;
222
223 socket_client_set_callback(c->client, on_connection, c);
224 c->state = CONTEXT_CONNECTING;
225
226 return 0;
227 }
228
229 int pa_context_is_dead(struct pa_context *c) {
230 assert(c);
231 return c->state == CONTEXT_DEAD;
232 }
233
234 int pa_context_is_ready(struct pa_context *c) {
235 assert(c);
236 return c->state == CONTEXT_READY;
237 }
238
239 int pa_context_errno(struct pa_context *c) {
240 assert(c);
241 return c->errno;
242 }
243
244 void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata) {
245 assert(c);
246 c->die_callback = cb;
247 c->die_userdata = userdata;
248 }
249
250 static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
251 struct pa_stream *s;
252 struct pa_context *c = userdata;
253 uint32_t bytes, channel;
254 assert(pd && command == PA_COMMAND_REQUEST && t && s);
255
256 if (tagstruct_getu32(t, &channel) < 0 ||
257 tagstruct_getu32(t, &bytes) < 0 ||
258 tagstruct_eof(t)) {
259 c->errno = PA_ERROR_PROTOCOL;
260 return -1;
261 }
262
263 if (!(s = dynarray_get(c->streams, channel))) {
264 c->errno = PA_ERROR_PROTOCOL;
265 return -1;
266 }
267
268 s->requested_bytes += bytes;
269
270 if (s->requested_bytes && s->write_callback)
271 s->write_callback(s, s->requested_bytes, s->write_userdata);
272
273 return 0;
274 }
275
276 static int create_playback_callback(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
277 int ret = 0;
278 struct pa_stream *s = userdata;
279 assert(pd && s && s->state == STREAM_CREATING);
280
281 if (command != PA_COMMAND_REPLY) {
282 struct pa_context *c = s->context;
283 assert(c);
284
285 if (command == PA_COMMAND_ERROR && tagstruct_getu32(t, &s->context->errno) < 0) {
286 s->context->errno = PA_ERROR_PROTOCOL;
287 ret = -1;
288 } else if (command == PA_COMMAND_TIMEOUT) {
289 s->context->errno = PA_ERROR_TIMEOUT;
290 ret = -1;
291 }
292
293 goto fail;
294 }
295
296 if (tagstruct_getu32(t, &s->channel) < 0 ||
297 tagstruct_eof(t)) {
298 s->context->errno = PA_ERROR_PROTOCOL;
299 ret = -1;
300 goto fail;
301 }
302
303 s->channel_valid = 1;
304 dynarray_put(s->context->streams, s->channel, s);
305
306 s->state = STREAM_READY;
307 assert(s->create_complete_callback);
308 s->create_complete_callback(s->context, s, s->create_complete_userdata);
309 return 0;
310
311 fail:
312 assert(s->create_complete_callback);
313 s->create_complete_callback(s->context, NULL, s->create_complete_userdata);
314 pa_stream_free(s);
315 return ret;
316 }
317
318 int pa_stream_new(
319 struct pa_context *c,
320 enum pa_stream_direction dir,
321 const char *dev,
322 const char *name,
323 const struct pa_sample_spec *ss,
324 const struct pa_buffer_attr *attr,
325 void (*complete) (struct pa_context*c, struct pa_stream *s, void *userdata),
326 void *userdata) {
327
328 struct pa_stream *s;
329 struct tagstruct *t;
330 uint32_t tag;
331
332 assert(c && name && ss && c->state == CONTEXT_READY && complete);
333
334 s = malloc(sizeof(struct pa_stream));
335 assert(s);
336 s->context = c;
337
338 s->read_callback = NULL;
339 s->read_userdata = NULL;
340 s->write_callback = NULL;
341 s->write_userdata = NULL;
342 s->die_callback = NULL;
343 s->die_userdata = NULL;
344 s->create_complete_callback = complete;
345 s->create_complete_userdata = NULL;
346
347 s->state = STREAM_CREATING;
348 s->requested_bytes = 0;
349 s->channel = 0;
350 s->channel_valid = 0;
351 s->direction = dir;
352
353 t = tagstruct_new(NULL, 0);
354 assert(t);
355
356 tagstruct_putu32(t, dir == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM);
357 tagstruct_putu32(t, tag = c->ctag++);
358 tagstruct_puts(t, name);
359 tagstruct_put_sample_spec(t, ss);
360 tagstruct_putu32(t, (uint32_t) -1);
361 tagstruct_putu32(t, attr ? attr->queue_length : DEFAULT_QUEUE_LENGTH);
362 tagstruct_putu32(t, attr ? attr->max_length : DEFAULT_MAX_LENGTH);
363 tagstruct_putu32(t, attr ? attr->prebuf : DEFAULT_PREBUF);
364
365 pstream_send_tagstruct(c->pstream, t);
366
367 pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s);
368
369 s->next = c->first_stream;
370 if (s->next)
371 s->next->previous = s;
372 s->previous = NULL;
373 c->first_stream = s;
374
375 return 0;
376 }
377
378 void pa_stream_free(struct pa_stream *s) {
379 assert(s && s->context);
380
381 if (s->channel_valid) {
382 struct tagstruct *t = tagstruct_new(NULL, 0);
383 assert(t);
384
385 tagstruct_putu32(t, PA_COMMAND_DELETE_PLAYBACK_STREAM);
386 tagstruct_putu32(t, s->context->ctag++);
387 tagstruct_putu32(t, s->channel);
388 pstream_send_tagstruct(s->context->pstream, t);
389 }
390
391 if (s->channel_valid)
392 dynarray_put(s->context->streams, s->channel, NULL);
393
394 if (s->next)
395 s->next->previous = s->previous;
396 if (s->previous)
397 s->previous->next = s->next;
398 else
399 s->context->first_stream = s->next;
400
401 free(s);
402 }
403
404 void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata) {
405 assert(s && cb);
406 s->write_callback = cb;
407 s->write_userdata = userdata;
408 }
409
410 void pa_stream_write(struct pa_stream *s, const void *data, size_t length) {
411 struct memchunk chunk;
412 assert(s && s->context && data && length);
413
414 chunk.memblock = memblock_new(length);
415 assert(chunk.memblock && chunk.memblock->data);
416 memcpy(chunk.memblock->data, data, length);
417 chunk.index = 0;
418 chunk.length = length;
419
420 pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk);
421
422 if (length < s->requested_bytes)
423 s->requested_bytes -= length;
424 else
425 s->requested_bytes = 0;
426 }
427
428 size_t pa_stream_writable_size(struct pa_stream *s) {
429 assert(s);
430 return s->requested_bytes;
431 }
432
433 void pa_stream_set_read_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) {
434 assert(s && cb);
435 s->read_callback = cb;
436 s->read_userdata = userdata;
437 }
438
439 int pa_stream_is_dead(struct pa_stream *s) {
440 return s->state == STREAM_DEAD;
441 }
442
443 int pa_stream_is_ready(struct pa_stream*s) {
444 return s->state == STREAM_READY;
445 }
446
447 void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata) {
448 assert(s);
449 s->die_callback = cb;
450 s->die_userdata = userdata;
451 }