7 #include "protocol-native-spec.h"
11 #include "socket-client.h"
12 #include "pstream-util.h"
14 #define DEFAULT_QUEUE_LENGTH 10240
15 #define DEFAULT_MAX_LENGTH 20480
16 #define DEFAULT_PREBUF 4096
17 #define DEFAULT_TIMEOUT (5*60)
18 #define DEFAULT_SERVER "/tmp/polypaudio_native"
22 struct pa_mainloop_api
* mainloop
;
23 struct socket_client
*client
;
24 struct pstream
*pstream
;
25 struct pdispatch
*pdispatch
;
26 struct dynarray
*streams
;
27 struct pa_stream
*first_stream
;
30 enum { CONTEXT_UNCONNECTED
, CONTEXT_CONNECTING
, CONTEXT_READY
, CONTEXT_DEAD
} state
;
32 void (*connect_complete_callback
)(struct pa_context
*c
, int success
, void *userdata
);
33 void *connect_complete_userdata
;
35 void (*die_callback
)(struct pa_context
*c
, void *userdata
);
40 struct pa_context
*context
;
41 struct pa_stream
*next
, *previous
;
42 uint32_t device_index
;
45 enum pa_stream_direction direction
;
46 enum { STREAM_CREATING
, STREAM_READY
, STREAM_DEAD
} state
;
47 uint32_t requested_bytes
;
49 void (*read_callback
)(struct pa_stream
*p
, const void*data
, size_t length
, void *userdata
);
52 void (*write_callback
)(struct pa_stream
*p
, size_t length
, void *userdata
);
55 void (*create_complete_callback
)(struct pa_context
*c
, struct pa_stream
*s
, void *userdata
);
56 void *create_complete_userdata
;
58 void (*die_callback
)(struct pa_stream
*c
, void *userdata
);
62 static int command_request(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
);
64 static const struct pdispatch_command command_table
[PA_COMMAND_MAX
] = {
65 [PA_COMMAND_ERROR
] = { NULL
},
66 [PA_COMMAND_REPLY
] = { NULL
},
67 [PA_COMMAND_CREATE_PLAYBACK_STREAM
] = { NULL
},
68 [PA_COMMAND_DELETE_PLAYBACK_STREAM
] = { NULL
},
69 [PA_COMMAND_CREATE_RECORD_STREAM
] = { NULL
},
70 [PA_COMMAND_DELETE_RECORD_STREAM
] = { NULL
},
71 [PA_COMMAND_EXIT
] = { NULL
},
72 [PA_COMMAND_REQUEST
] = { command_request
},
75 struct pa_context
*pa_context_new(struct pa_mainloop_api
*mainloop
, const char *name
) {
76 assert(mainloop
&& name
);
78 c
= malloc(sizeof(struct pa_context
));
80 c
->name
= strdup(name
);
81 c
->mainloop
= mainloop
;
85 c
->streams
= dynarray_new();
87 c
->first_stream
= NULL
;
88 c
->errno
= PA_ERROR_OK
;
89 c
->state
= CONTEXT_UNCONNECTED
;
92 c
->connect_complete_callback
= NULL
;
93 c
->connect_complete_userdata
= NULL
;
95 c
->die_callback
= NULL
;
96 c
->die_userdata
= NULL
;
101 void pa_context_free(struct pa_context
*c
) {
104 while (c
->first_stream
)
105 pa_stream_free(c
->first_stream
);
108 socket_client_free(c
->client
);
110 pdispatch_free(c
->pdispatch
);
112 pstream_free(c
->pstream
);
114 dynarray_free(c
->streams
, NULL
, NULL
);
120 static void stream_dead(struct pa_stream
*s
) {
121 if (s
->state
== STREAM_DEAD
)
124 s
->state
= STREAM_DEAD
;
126 s
->die_callback(s
, s
->die_userdata
);
129 static void context_dead(struct pa_context
*c
) {
133 for (s
= c
->first_stream
; s
; s
= s
->next
)
136 if (c
->state
== CONTEXT_DEAD
)
139 c
->state
= CONTEXT_DEAD
;
141 c
->die_callback(c
, c
->die_userdata
);
144 static void pstream_die_callback(struct pstream
*p
, void *userdata
) {
145 struct pa_context
*c
= userdata
;
148 assert(c
->state
!= CONTEXT_DEAD
);
150 c
->state
= CONTEXT_DEAD
;
155 static int pstream_packet_callback(struct pstream
*p
, struct packet
*packet
, void *userdata
) {
156 struct pa_context
*c
= userdata
;
157 assert(p
&& packet
&& c
);
159 if (pdispatch_run(c
->pdispatch
, packet
, c
) < 0) {
160 fprintf(stderr
, "polyp.c: invalid packet.\n");
167 static int pstream_memblock_callback(struct pstream
*p
, uint32_t channel
, int32_t delta
, struct memchunk
*chunk
, void *userdata
) {
168 struct pa_context
*c
= userdata
;
170 assert(p
&& chunk
&& c
&& chunk
->memblock
&& chunk
->memblock
->data
);
172 if (!(s
= dynarray_get(c
->streams
, channel
)))
175 if (s
->read_callback
)
176 s
->read_callback(s
, chunk
->memblock
->data
+ chunk
->index
, chunk
->length
, s
->read_userdata
);
181 static void on_connection(struct socket_client
*client
, struct iochannel
*io
, void *userdata
) {
182 struct pa_context
*c
= userdata
;
183 assert(client
&& io
&& c
&& c
->state
== CONTEXT_CONNECTING
);
185 socket_client_free(client
);
189 c
->errno
= PA_ERROR_CONNECTIONREFUSED
;
190 c
->state
= CONTEXT_UNCONNECTED
;
192 if (c
->connect_complete_callback
)
193 c
->connect_complete_callback(c
, 0, c
->connect_complete_userdata
);
198 c
->pstream
= pstream_new(c
->mainloop
, io
);
200 pstream_set_die_callback(c
->pstream
, pstream_die_callback
, c
);
201 pstream_set_recieve_packet_callback(c
->pstream
, pstream_packet_callback
, c
);
202 pstream_set_recieve_memblock_callback(c
->pstream
, pstream_memblock_callback
, c
);
204 c
->pdispatch
= pdispatch_new(c
->mainloop
, command_table
, PA_COMMAND_MAX
);
205 assert(c
->pdispatch
);
207 c
->state
= CONTEXT_READY
;
209 if (c
->connect_complete_callback
)
210 c
->connect_complete_callback(c
, 1, c
->connect_complete_userdata
);
213 int pa_context_connect(struct pa_context
*c
, const char *server
, void (*complete
) (struct pa_context
*c
, int success
, void *userdata
), void *userdata
) {
214 assert(c
&& c
->state
== CONTEXT_UNCONNECTED
);
217 if (!(c
->client
= socket_client_new_unix(c
->mainloop
, server
? server
: DEFAULT_SERVER
))) {
218 c
->errno
= PA_ERROR_CONNECTIONREFUSED
;
222 c
->connect_complete_callback
= complete
;
223 c
->connect_complete_userdata
= userdata
;
225 socket_client_set_callback(c
->client
, on_connection
, c
);
226 c
->state
= CONTEXT_CONNECTING
;
231 int pa_context_is_dead(struct pa_context
*c
) {
233 return c
->state
== CONTEXT_DEAD
;
236 int pa_context_is_ready(struct pa_context
*c
) {
238 return c
->state
== CONTEXT_READY
;
241 int pa_context_errno(struct pa_context
*c
) {
246 void pa_context_set_die_callback(struct pa_context
*c
, void (*cb
)(struct pa_context
*c
, void *userdata
), void *userdata
) {
248 c
->die_callback
= cb
;
249 c
->die_userdata
= userdata
;
252 static int command_request(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
) {
254 struct pa_context
*c
= userdata
;
255 uint32_t bytes
, channel
;
256 assert(pd
&& command
== PA_COMMAND_REQUEST
&& t
&& c
);
258 if (tagstruct_getu32(t
, &channel
) < 0 ||
259 tagstruct_getu32(t
, &bytes
) < 0 ||
261 c
->errno
= PA_ERROR_PROTOCOL
;
265 if (!(s
= dynarray_get(c
->streams
, channel
))) {
266 c
->errno
= PA_ERROR_PROTOCOL
;
270 /*fprintf(stderr, "Requested %u bytes\n", bytes);*/
272 s
->requested_bytes
+= bytes
;
274 if (s
->requested_bytes
&& s
->write_callback
)
275 s
->write_callback(s
, s
->requested_bytes
, s
->write_userdata
);
280 static int create_playback_callback(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
) {
282 struct pa_stream
*s
= userdata
;
283 assert(pd
&& s
&& s
->state
== STREAM_CREATING
);
285 if (command
!= PA_COMMAND_REPLY
) {
286 struct pa_context
*c
= s
->context
;
289 if (command
== PA_COMMAND_ERROR
&& tagstruct_getu32(t
, &s
->context
->errno
) < 0) {
290 s
->context
->errno
= PA_ERROR_PROTOCOL
;
292 } else if (command
== PA_COMMAND_TIMEOUT
) {
293 s
->context
->errno
= PA_ERROR_TIMEOUT
;
300 if (tagstruct_getu32(t
, &s
->channel
) < 0 ||
301 tagstruct_getu32(t
, &s
->device_index
) < 0 ||
303 s
->context
->errno
= PA_ERROR_PROTOCOL
;
308 s
->channel_valid
= 1;
309 dynarray_put(s
->context
->streams
, s
->channel
, s
);
311 s
->state
= STREAM_READY
;
312 assert(s
->create_complete_callback
);
313 s
->create_complete_callback(s
->context
, s
, s
->create_complete_userdata
);
317 assert(s
->create_complete_callback
);
318 s
->create_complete_callback(s
->context
, NULL
, s
->create_complete_userdata
);
324 struct pa_context
*c
,
325 enum pa_stream_direction dir
,
328 const struct pa_sample_spec
*ss
,
329 const struct pa_buffer_attr
*attr
,
330 void (*complete
) (struct pa_context
*c
, struct pa_stream
*s
, void *userdata
),
337 assert(c
&& name
&& ss
&& c
->state
== CONTEXT_READY
&& complete
);
339 s
= malloc(sizeof(struct pa_stream
));
343 s
->read_callback
= NULL
;
344 s
->read_userdata
= NULL
;
345 s
->write_callback
= NULL
;
346 s
->write_userdata
= NULL
;
347 s
->die_callback
= NULL
;
348 s
->die_userdata
= NULL
;
349 s
->create_complete_callback
= complete
;
350 s
->create_complete_userdata
= NULL
;
352 s
->state
= STREAM_CREATING
;
353 s
->requested_bytes
= 0;
355 s
->channel_valid
= 0;
356 s
->device_index
= (uint32_t) -1;
359 t
= tagstruct_new(NULL
, 0);
362 tagstruct_putu32(t
, dir
== PA_STREAM_PLAYBACK
? PA_COMMAND_CREATE_PLAYBACK_STREAM
: PA_COMMAND_CREATE_RECORD_STREAM
);
363 tagstruct_putu32(t
, tag
= c
->ctag
++);
364 tagstruct_puts(t
, name
);
365 tagstruct_put_sample_spec(t
, ss
);
366 tagstruct_putu32(t
, (uint32_t) -1);
367 tagstruct_putu32(t
, attr
? attr
->queue_length
: DEFAULT_QUEUE_LENGTH
);
368 tagstruct_putu32(t
, attr
? attr
->max_length
: DEFAULT_MAX_LENGTH
);
369 tagstruct_putu32(t
, attr
? attr
->prebuf
: DEFAULT_PREBUF
);
371 pstream_send_tagstruct(c
->pstream
, t
);
373 pdispatch_register_reply(c
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_playback_callback
, s
);
375 s
->next
= c
->first_stream
;
377 s
->next
->previous
= s
;
384 void pa_stream_free(struct pa_stream
*s
) {
385 assert(s
&& s
->context
);
387 if (s
->channel_valid
) {
388 struct tagstruct
*t
= tagstruct_new(NULL
, 0);
391 tagstruct_putu32(t
, PA_COMMAND_DELETE_PLAYBACK_STREAM
);
392 tagstruct_putu32(t
, s
->context
->ctag
++);
393 tagstruct_putu32(t
, s
->channel
);
394 pstream_send_tagstruct(s
->context
->pstream
, t
);
397 if (s
->channel_valid
)
398 dynarray_put(s
->context
->streams
, s
->channel
, NULL
);
401 s
->next
->previous
= s
->previous
;
403 s
->previous
->next
= s
->next
;
405 s
->context
->first_stream
= s
->next
;
410 void pa_stream_set_write_callback(struct pa_stream
*s
, void (*cb
)(struct pa_stream
*p
, size_t length
, void *userdata
), void *userdata
) {
412 s
->write_callback
= cb
;
413 s
->write_userdata
= userdata
;
416 void pa_stream_write(struct pa_stream
*s
, const void *data
, size_t length
) {
417 struct memchunk chunk
;
418 assert(s
&& s
->context
&& data
&& length
);
420 chunk
.memblock
= memblock_new(length
);
421 assert(chunk
.memblock
&& chunk
.memblock
->data
);
422 memcpy(chunk
.memblock
->data
, data
, length
);
424 chunk
.length
= length
;
426 pstream_send_memblock(s
->context
->pstream
, s
->channel
, 0, &chunk
);
427 memblock_unref(chunk
.memblock
);
429 /*fprintf(stderr, "Sent %u bytes\n", length);*/
431 if (length
< s
->requested_bytes
)
432 s
->requested_bytes
-= length
;
434 s
->requested_bytes
= 0;
437 size_t pa_stream_writable_size(struct pa_stream
*s
) {
439 return s
->requested_bytes
;
442 void pa_stream_set_read_callback(struct pa_stream
*s
, void (*cb
)(struct pa_stream
*p
, const void*data
, size_t length
, void *userdata
), void *userdata
) {
444 s
->read_callback
= cb
;
445 s
->read_userdata
= userdata
;
448 int pa_stream_is_dead(struct pa_stream
*s
) {
449 return s
->state
== STREAM_DEAD
;
452 int pa_stream_is_ready(struct pa_stream
*s
) {
453 return s
->state
== STREAM_READY
;
456 void pa_stream_set_die_callback(struct pa_stream
*s
, void (*cb
)(struct pa_stream
*s
, void *userdata
), void *userdata
) {
458 s
->die_callback
= cb
;
459 s
->die_userdata
= userdata
;