7 #include "protocol-native-spec.h"
11 #include "socket-client.h"
12 #include "pstream-util.h"
14 #define DEFAULT_QUEUE_LENGTH 10240
15 #define DEFAULT_MAX_LENGTH 20480
16 #define DEFAULT_PREBUF 4096
17 #define DEFAULT_TIMEOUT 5
21 struct pa_mainloop_api
* mainloop
;
22 struct socket_client
*client
;
23 struct pstream
*pstream
;
24 struct pdispatch
*pdispatch
;
25 struct dynarray
*streams
;
26 struct pa_stream
*first_stream
;
29 enum { CONTEXT_UNCONNECTED
, CONTEXT_CONNECTING
, CONTEXT_READY
, CONTEXT_DEAD
} state
;
31 void (*connect_complete_callback
)(struct pa_context
*c
, int success
, void *userdata
);
32 void *connect_complete_userdata
;
34 void (*die_callback
)(struct pa_context
*c
, void *userdata
);
39 struct pa_context
*context
;
40 struct pa_stream
*next
, *previous
;
43 enum pa_stream_direction direction
;
44 enum { STREAM_CREATING
, STREAM_READY
, STREAM_DEAD
} state
;
45 uint32_t requested_bytes
;
47 void (*read_callback
)(struct pa_stream
*p
, const void*data
, size_t length
, void *userdata
);
50 void (*write_callback
)(struct pa_stream
*p
, size_t length
, void *userdata
);
53 void (*create_complete_callback
)(struct pa_context
*c
, struct pa_stream
*s
, void *userdata
);
54 void *create_complete_userdata
;
56 void (*die_callback
)(struct pa_stream
*c
, void *userdata
);
60 static int command_request(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
);
62 static const struct pdispatch_command command_table
[PA_COMMAND_MAX
] = {
63 [PA_COMMAND_ERROR
] = { NULL
},
64 [PA_COMMAND_REPLY
] = { NULL
},
65 [PA_COMMAND_CREATE_PLAYBACK_STREAM
] = { NULL
},
66 [PA_COMMAND_DELETE_PLAYBACK_STREAM
] = { NULL
},
67 [PA_COMMAND_CREATE_RECORD_STREAM
] = { NULL
},
68 [PA_COMMAND_DELETE_RECORD_STREAM
] = { NULL
},
69 [PA_COMMAND_EXIT
] = { NULL
},
70 [PA_COMMAND_REQUEST
] = { command_request
},
73 struct pa_context
*pa_context_new(struct pa_mainloop_api
*mainloop
, const char *name
) {
74 assert(mainloop
&& name
);
76 c
= malloc(sizeof(struct pa_context
));
78 c
->name
= strdup(name
);
79 c
->mainloop
= mainloop
;
83 c
->streams
= dynarray_new();
85 c
->first_stream
= NULL
;
86 c
->errno
= PA_ERROR_OK
;
87 c
->state
= CONTEXT_UNCONNECTED
;
90 c
->connect_complete_callback
= NULL
;
91 c
->connect_complete_userdata
= NULL
;
93 c
->die_callback
= NULL
;
94 c
->die_userdata
= NULL
;
99 void pa_context_free(struct pa_context
*c
) {
102 while (c
->first_stream
)
103 pa_stream_free(c
->first_stream
);
106 socket_client_free(c
->client
);
108 pdispatch_free(c
->pdispatch
);
110 pstream_free(c
->pstream
);
112 dynarray_free(c
->streams
, NULL
, NULL
);
118 static void stream_dead(struct pa_stream
*s
) {
119 if (s
->state
== STREAM_DEAD
)
122 s
->state
= STREAM_DEAD
;
124 s
->die_callback(s
, s
->die_userdata
);
127 static void context_dead(struct pa_context
*c
) {
131 for (s
= c
->first_stream
; s
; s
= s
->next
)
134 if (c
->state
== CONTEXT_DEAD
)
137 c
->state
= CONTEXT_DEAD
;
139 c
->die_callback(c
, c
->die_userdata
);
142 static void pstream_die_callback(struct pstream
*p
, void *userdata
) {
143 struct pa_context
*c
= userdata
;
146 assert(c
->state
!= CONTEXT_DEAD
);
148 c
->state
= CONTEXT_DEAD
;
153 static int pstream_packet_callback(struct pstream
*p
, struct packet
*packet
, void *userdata
) {
154 struct pa_context
*c
= userdata
;
155 assert(p
&& packet
&& c
);
157 if (pdispatch_run(c
->pdispatch
, packet
, c
) < 0) {
158 fprintf(stderr
, "polyp.c: invalid packet.\n");
165 static int pstream_memblock_callback(struct pstream
*p
, uint32_t channel
, int32_t delta
, struct memchunk
*chunk
, void *userdata
) {
166 struct pa_context
*c
= userdata
;
168 assert(p
&& chunk
&& c
&& chunk
->memblock
&& chunk
->memblock
->data
);
170 if (!(s
= dynarray_get(c
->streams
, channel
)))
173 if (s
->read_callback
)
174 s
->read_callback(s
, chunk
->memblock
->data
+ chunk
->index
, chunk
->length
, s
->read_userdata
);
179 static void on_connection(struct socket_client
*client
, struct iochannel
*io
, void *userdata
) {
180 struct pa_context
*c
= userdata
;
181 assert(client
&& io
&& c
&& c
->state
== CONTEXT_CONNECTING
);
183 socket_client_free(client
);
187 c
->errno
= PA_ERROR_CONNECTIONREFUSED
;
188 c
->state
= CONTEXT_UNCONNECTED
;
190 if (c
->connect_complete_callback
)
191 c
->connect_complete_callback(c
, 0, c
->connect_complete_userdata
);
196 c
->pstream
= pstream_new(c
->mainloop
, io
);
198 pstream_set_die_callback(c
->pstream
, pstream_die_callback
, c
);
199 pstream_set_recieve_packet_callback(c
->pstream
, pstream_packet_callback
, c
);
200 pstream_set_recieve_memblock_callback(c
->pstream
, pstream_memblock_callback
, c
);
202 c
->pdispatch
= pdispatch_new(c
->mainloop
, command_table
, PA_COMMAND_MAX
);
203 assert(c
->pdispatch
);
205 c
->state
= CONTEXT_READY
;
207 if (c
->connect_complete_callback
)
208 c
->connect_complete_callback(c
, 1, c
->connect_complete_userdata
);
211 int pa_context_connect(struct pa_context
*c
, const char *server
, void (*complete
) (struct pa_context
*c
, int success
, void *userdata
), void *userdata
) {
212 assert(c
&& c
->state
== CONTEXT_UNCONNECTED
);
215 if (!(c
->client
= socket_client_new_unix(c
->mainloop
, server
))) {
216 c
->errno
= PA_ERROR_CONNECTIONREFUSED
;
220 c
->connect_complete_callback
= complete
;
221 c
->connect_complete_userdata
= userdata
;
223 socket_client_set_callback(c
->client
, on_connection
, c
);
224 c
->state
= CONTEXT_CONNECTING
;
229 int pa_context_is_dead(struct pa_context
*c
) {
231 return c
->state
== CONTEXT_DEAD
;
234 int pa_context_is_ready(struct pa_context
*c
) {
236 return c
->state
== CONTEXT_READY
;
239 int pa_context_errno(struct pa_context
*c
) {
244 void pa_context_set_die_callback(struct pa_context
*c
, void (*cb
)(struct pa_context
*c
, void *userdata
), void *userdata
) {
246 c
->die_callback
= cb
;
247 c
->die_userdata
= userdata
;
250 static int command_request(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
) {
252 struct pa_context
*c
= userdata
;
253 uint32_t bytes
, channel
;
254 assert(pd
&& command
== PA_COMMAND_REQUEST
&& t
&& s
);
256 if (tagstruct_getu32(t
, &channel
) < 0 ||
257 tagstruct_getu32(t
, &bytes
) < 0 ||
259 c
->errno
= PA_ERROR_PROTOCOL
;
263 if (!(s
= dynarray_get(c
->streams
, channel
))) {
264 c
->errno
= PA_ERROR_PROTOCOL
;
268 s
->requested_bytes
+= bytes
;
270 if (s
->requested_bytes
&& s
->write_callback
)
271 s
->write_callback(s
, s
->requested_bytes
, s
->write_userdata
);
276 static int create_playback_callback(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
) {
278 struct pa_stream
*s
= userdata
;
279 assert(pd
&& s
&& s
->state
== STREAM_CREATING
);
281 if (command
!= PA_COMMAND_REPLY
) {
282 struct pa_context
*c
= s
->context
;
285 if (command
== PA_COMMAND_ERROR
&& tagstruct_getu32(t
, &s
->context
->errno
) < 0) {
286 s
->context
->errno
= PA_ERROR_PROTOCOL
;
288 } else if (command
== PA_COMMAND_TIMEOUT
) {
289 s
->context
->errno
= PA_ERROR_TIMEOUT
;
296 if (tagstruct_getu32(t
, &s
->channel
) < 0 ||
298 s
->context
->errno
= PA_ERROR_PROTOCOL
;
303 s
->channel_valid
= 1;
304 dynarray_put(s
->context
->streams
, s
->channel
, s
);
306 s
->state
= STREAM_READY
;
307 assert(s
->create_complete_callback
);
308 s
->create_complete_callback(s
->context
, s
, s
->create_complete_userdata
);
312 assert(s
->create_complete_callback
);
313 s
->create_complete_callback(s
->context
, NULL
, s
->create_complete_userdata
);
319 struct pa_context
*c
,
320 enum pa_stream_direction dir
,
323 const struct pa_sample_spec
*ss
,
324 const struct pa_buffer_attr
*attr
,
325 void (*complete
) (struct pa_context
*c
, struct pa_stream
*s
, void *userdata
),
332 assert(c
&& name
&& ss
&& c
->state
== CONTEXT_READY
&& complete
);
334 s
= malloc(sizeof(struct pa_stream
));
338 s
->read_callback
= NULL
;
339 s
->read_userdata
= NULL
;
340 s
->write_callback
= NULL
;
341 s
->write_userdata
= NULL
;
342 s
->die_callback
= NULL
;
343 s
->die_userdata
= NULL
;
344 s
->create_complete_callback
= complete
;
345 s
->create_complete_userdata
= NULL
;
347 s
->state
= STREAM_CREATING
;
348 s
->requested_bytes
= 0;
350 s
->channel_valid
= 0;
353 t
= tagstruct_new(NULL
, 0);
356 tagstruct_putu32(t
, dir
== PA_STREAM_PLAYBACK
? PA_COMMAND_CREATE_PLAYBACK_STREAM
: PA_COMMAND_CREATE_RECORD_STREAM
);
357 tagstruct_putu32(t
, tag
= c
->ctag
++);
358 tagstruct_puts(t
, name
);
359 tagstruct_put_sample_spec(t
, ss
);
360 tagstruct_putu32(t
, (uint32_t) -1);
361 tagstruct_putu32(t
, attr
? attr
->queue_length
: DEFAULT_QUEUE_LENGTH
);
362 tagstruct_putu32(t
, attr
? attr
->max_length
: DEFAULT_MAX_LENGTH
);
363 tagstruct_putu32(t
, attr
? attr
->prebuf
: DEFAULT_PREBUF
);
365 pstream_send_tagstruct(c
->pstream
, t
);
367 pdispatch_register_reply(c
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_playback_callback
, s
);
369 s
->next
= c
->first_stream
;
371 s
->next
->previous
= s
;
378 void pa_stream_free(struct pa_stream
*s
) {
379 assert(s
&& s
->context
);
381 if (s
->channel_valid
) {
382 struct tagstruct
*t
= tagstruct_new(NULL
, 0);
385 tagstruct_putu32(t
, PA_COMMAND_DELETE_PLAYBACK_STREAM
);
386 tagstruct_putu32(t
, s
->context
->ctag
++);
387 tagstruct_putu32(t
, s
->channel
);
388 pstream_send_tagstruct(s
->context
->pstream
, t
);
391 if (s
->channel_valid
)
392 dynarray_put(s
->context
->streams
, s
->channel
, NULL
);
395 s
->next
->previous
= s
->previous
;
397 s
->previous
->next
= s
->next
;
399 s
->context
->first_stream
= s
->next
;
404 void pa_stream_set_write_callback(struct pa_stream
*s
, void (*cb
)(struct pa_stream
*p
, size_t length
, void *userdata
), void *userdata
) {
406 s
->write_callback
= cb
;
407 s
->write_userdata
= userdata
;
410 void pa_stream_write(struct pa_stream
*s
, const void *data
, size_t length
) {
411 struct memchunk chunk
;
412 assert(s
&& s
->context
&& data
&& length
);
414 chunk
.memblock
= memblock_new(length
);
415 assert(chunk
.memblock
&& chunk
.memblock
->data
);
416 memcpy(chunk
.memblock
->data
, data
, length
);
418 chunk
.length
= length
;
420 pstream_send_memblock(s
->context
->pstream
, s
->channel
, 0, &chunk
);
422 if (length
< s
->requested_bytes
)
423 s
->requested_bytes
-= length
;
425 s
->requested_bytes
= 0;
428 size_t pa_stream_writable_size(struct pa_stream
*s
) {
430 return s
->requested_bytes
;
433 void pa_stream_set_read_callback(struct pa_stream
*s
, void (*cb
)(struct pa_stream
*p
, const void*data
, size_t length
, void *userdata
), void *userdata
) {
435 s
->read_callback
= cb
;
436 s
->read_userdata
= userdata
;
439 int pa_stream_is_dead(struct pa_stream
*s
) {
440 return s
->state
== STREAM_DEAD
;
443 int pa_stream_is_ready(struct pa_stream
*s
) {
444 return s
->state
== STREAM_READY
;
447 void pa_stream_set_die_callback(struct pa_stream
*s
, void (*cb
)(struct pa_stream
*s
, void *userdata
), void *userdata
) {
449 s
->die_callback
= cb
;
450 s
->die_userdata
= userdata
;