5 #include "protocol-native.h"
6 #include "protocol-native-spec.h"
9 #include "sourceoutput.h"
10 #include "sinkinput.h"
12 #include "tagstruct.h"
13 #include "pdispatch.h"
14 #include "pstream-util.h"
17 struct protocol_native
;
19 struct record_stream
{
20 struct connection
*connection
;
22 struct source_output
*source_output
;
23 struct memblockq
*memblockq
;
26 struct playback_stream
{
27 struct connection
*connection
;
30 struct sink_input
*sink_input
;
31 struct memblockq
*memblockq
;
32 size_t requested_bytes
;
37 struct protocol_native
*protocol
;
38 struct client
*client
;
39 struct pstream
*pstream
;
40 struct pdispatch
*pdispatch
;
41 struct idxset
*record_streams
, *playback_streams
;
44 struct protocol_native
{
47 struct socket_server
*server
;
48 struct idxset
*connections
;
51 static int sink_input_peek_cb(struct sink_input
*i
, struct memchunk
*chunk
);
52 static void sink_input_drop_cb(struct sink_input
*i
, size_t length
);
53 static void sink_input_kill_cb(struct sink_input
*i
);
54 static uint32_t sink_input_get_latency_cb(struct sink_input
*i
);
56 static void request_bytes(struct playback_stream
*s
);
58 static int command_exit(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
);
59 static int command_create_playback_stream(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
);
60 static int command_delete_playback_stream(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
);
62 static const struct pdispatch_command command_table
[PA_COMMAND_MAX
] = {
63 [PA_COMMAND_ERROR
] = { NULL
},
64 [PA_COMMAND_REPLY
] = { NULL
},
65 [PA_COMMAND_CREATE_PLAYBACK_STREAM
] = { command_create_playback_stream
},
66 [PA_COMMAND_DELETE_PLAYBACK_STREAM
] = { command_delete_playback_stream
},
67 [PA_COMMAND_CREATE_RECORD_STREAM
] = { NULL
},
68 [PA_COMMAND_DELETE_RECORD_STREAM
] = { NULL
},
69 [PA_COMMAND_EXIT
] = { command_exit
},
72 /* structure management */
74 static void record_stream_free(struct record_stream
* r
) {
75 assert(r
&& r
->connection
);
77 idxset_remove_by_data(r
->connection
->record_streams
, r
, NULL
);
78 source_output_free(r
->source_output
);
79 memblockq_free(r
->memblockq
);
83 static struct playback_stream
* playback_stream_new(struct connection
*c
, struct sink
*sink
, struct pa_sample_spec
*ss
, const char *name
, size_t qlen
, size_t maxlength
, size_t prebuf
) {
84 struct playback_stream
*s
;
85 assert(c
&& sink
&& ss
&& name
&& qlen
&& maxlength
&& prebuf
);
87 s
= malloc(sizeof(struct playback_stream
));
92 s
->sink_input
= sink_input_new(sink
, ss
, name
);
93 assert(s
->sink_input
);
94 s
->sink_input
->peek
= sink_input_peek_cb
;
95 s
->sink_input
->drop
= sink_input_drop_cb
;
96 s
->sink_input
->kill
= sink_input_kill_cb
;
97 s
->sink_input
->get_latency
= sink_input_get_latency_cb
;
98 s
->sink_input
->userdata
= s
;
100 s
->memblockq
= memblockq_new(maxlength
, pa_sample_size(ss
), prebuf
);
101 assert(s
->memblockq
);
103 s
->requested_bytes
= 0;
105 idxset_put(c
->playback_streams
, s
, &s
->index
);
109 static void playback_stream_free(struct playback_stream
* p
) {
110 assert(p
&& p
->connection
);
112 idxset_remove_by_data(p
->connection
->playback_streams
, p
, NULL
);
113 sink_input_free(p
->sink_input
);
114 memblockq_free(p
->memblockq
);
118 static void connection_free(struct connection
*c
) {
119 struct record_stream
*r
;
120 struct playback_stream
*p
;
121 assert(c
&& c
->protocol
);
123 idxset_remove_by_data(c
->protocol
->connections
, c
, NULL
);
124 while ((r
= idxset_first(c
->record_streams
, NULL
)))
125 record_stream_free(r
);
126 idxset_free(c
->record_streams
, NULL
, NULL
);
128 while ((p
= idxset_first(c
->playback_streams
, NULL
)))
129 playback_stream_free(p
);
130 idxset_free(c
->playback_streams
, NULL
, NULL
);
132 pdispatch_free(c
->pdispatch
);
133 pstream_free(c
->pstream
);
134 client_free(c
->client
);
138 static void request_bytes(struct playback_stream
*s
) {
143 if (!(l
= memblockq_missing_to(s
->memblockq
, s
->qlength
)))
146 if (l
<= s
->requested_bytes
)
149 l
-= s
->requested_bytes
;
150 s
->requested_bytes
+= l
;
152 t
= tagstruct_new(NULL
, 0);
154 tagstruct_putu32(t
, PA_COMMAND_REQUEST
);
155 tagstruct_putu32(t
, (uint32_t) -1); /* tag */
156 tagstruct_putu32(t
, s
->index
);
157 tagstruct_putu32(t
, l
);
158 pstream_send_tagstruct(s
->connection
->pstream
, t
);
160 /* fprintf(stderr, "Requesting %u bytes\n", l);*/
163 /*** sinkinput callbacks ***/
165 static int sink_input_peek_cb(struct sink_input
*i
, struct memchunk
*chunk
) {
166 struct playback_stream
*s
;
167 assert(i
&& i
->userdata
&& chunk
);
170 if (memblockq_peek(s
->memblockq
, chunk
) < 0)
176 static void sink_input_drop_cb(struct sink_input
*i
, size_t length
) {
177 struct playback_stream
*s
;
178 assert(i
&& i
->userdata
&& length
);
181 memblockq_drop(s
->memblockq
, length
);
185 static void sink_input_kill_cb(struct sink_input
*i
) {
186 struct playback_stream
*s
;
187 assert(i
&& i
->userdata
);
190 playback_stream_free(s
);
193 static uint32_t sink_input_get_latency_cb(struct sink_input
*i
) {
194 struct playback_stream
*s
;
195 assert(i
&& i
->userdata
);
198 return pa_samples_usec(memblockq_get_length(s
->memblockq
), &s
->sink_input
->sample_spec
);
201 /*** pdispatch callbacks ***/
203 static int command_create_playback_stream(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
) {
204 struct connection
*c
= userdata
;
205 struct playback_stream
*s
;
206 size_t maxlength
, prebuf
, qlength
;
209 struct pa_sample_spec ss
;
210 struct tagstruct
*reply
;
212 assert(c
&& t
&& c
->protocol
&& c
->protocol
->core
);
214 if (tagstruct_gets(t
, &name
) < 0 ||
215 tagstruct_get_sample_spec(t
, &ss
) < 0 ||
216 tagstruct_getu32(t
, &sink_index
) < 0 ||
217 tagstruct_getu32(t
, &qlength
) < 0 ||
218 tagstruct_getu32(t
, &maxlength
) < 0 ||
219 tagstruct_getu32(t
, &prebuf
) < 0 ||
223 if (!c
->authorized
) {
224 pstream_send_error(c
->pstream
, tag
, PA_ERROR_ACCESS
);
228 if (sink_index
== (uint32_t) -1)
229 sink
= sink_get_default(c
->protocol
->core
);
231 sink
= idxset_get_by_index(c
->protocol
->core
->sinks
, sink_index
);
234 pstream_send_error(c
->pstream
, tag
, PA_ERROR_EXIST
);
238 if (!(s
= playback_stream_new(c
, sink
, &ss
, name
, qlength
, maxlength
, prebuf
))) {
239 pstream_send_error(c
->pstream
, tag
, PA_ERROR_INVALID
);
243 reply
= tagstruct_new(NULL
, 0);
245 tagstruct_putu32(reply
, PA_COMMAND_REPLY
);
246 tagstruct_putu32(reply
, tag
);
247 tagstruct_putu32(reply
, s
->index
);
248 assert(s
->sink_input
);
249 tagstruct_putu32(reply
, s
->sink_input
->index
);
250 pstream_send_tagstruct(c
->pstream
, reply
);
255 static int command_delete_playback_stream(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
) {
256 struct connection
*c
= userdata
;
258 struct playback_stream
*s
;
261 if (tagstruct_getu32(t
, &channel
) < 0 ||
265 if (!c
->authorized
) {
266 pstream_send_error(c
->pstream
, tag
, PA_ERROR_ACCESS
);
270 if (!(s
= idxset_get_by_index(c
->playback_streams
, channel
))) {
271 pstream_send_error(c
->pstream
, tag
, PA_ERROR_EXIST
);
275 pstream_send_simple_ack(c
->pstream
, tag
);
279 static int command_exit(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
) {
280 struct connection
*c
= userdata
;
283 if (!tagstruct_eof(t
))
286 if (!c
->authorized
) {
287 pstream_send_error(c
->pstream
, tag
, PA_ERROR_ACCESS
);
291 assert(c
->protocol
&& c
->protocol
->core
&& c
->protocol
->core
->mainloop
);
292 c
->protocol
->core
->mainloop
->quit(c
->protocol
->core
->mainloop
, 0);
293 pstream_send_simple_ack(c
->pstream
, tag
); /* nonsense */
297 /*** pstream callbacks ***/
299 static int packet_callback(struct pstream
*p
, struct packet
*packet
, void *userdata
) {
300 struct connection
*c
= userdata
;
301 assert(p
&& packet
&& packet
->data
&& c
);
303 if (pdispatch_run(c
->pdispatch
, packet
, c
) < 0) {
304 fprintf(stderr
, "protocol-native: invalid packet.\n");
311 static int memblock_callback(struct pstream
*p
, uint32_t channel
, int32_t delta
, struct memchunk
*chunk
, void *userdata
) {
312 struct connection
*c
= userdata
;
313 struct playback_stream
*stream
;
314 assert(p
&& chunk
&& userdata
);
316 if (!(stream
= idxset_get_by_index(c
->playback_streams
, channel
))) {
317 fprintf(stderr
, "protocol-native: client sent block for invalid stream.\n");
321 if (chunk
->length
>= stream
->requested_bytes
)
322 stream
->requested_bytes
= 0;
324 stream
->requested_bytes
-= chunk
->length
;
326 memblockq_push(stream
->memblockq
, chunk
, delta
);
327 assert(stream
->sink_input
);
328 sink_notify(stream
->sink_input
->sink
);
330 /*fprintf(stderr, "Recieved %u bytes.\n", chunk->length);*/
335 static void die_callback(struct pstream
*p
, void *userdata
) {
336 struct connection
*c
= userdata
;
340 fprintf(stderr
, "protocol-native: connection died.\n");
343 /*** socket server callbacks ***/
345 static void on_connection(struct socket_server
*s
, struct iochannel
*io
, void *userdata
) {
346 struct protocol_native
*p
= userdata
;
347 struct connection
*c
;
348 assert(s
&& io
&& p
);
350 c
= malloc(sizeof(struct connection
));
352 c
->authorized
= p
->public;
355 c
->client
= client_new(p
->core
, "NATIVE", "Client");
357 c
->pstream
= pstream_new(p
->core
->mainloop
, io
);
360 pstream_set_recieve_packet_callback(c
->pstream
, packet_callback
, c
);
361 pstream_set_recieve_memblock_callback(c
->pstream
, memblock_callback
, c
);
362 pstream_set_die_callback(c
->pstream
, die_callback
, c
);
364 c
->pdispatch
= pdispatch_new(p
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
365 assert(c
->pdispatch
);
367 c
->record_streams
= idxset_new(NULL
, NULL
);
368 c
->playback_streams
= idxset_new(NULL
, NULL
);
369 assert(c
->record_streams
&& c
->playback_streams
);
371 idxset_put(p
->connections
, c
, NULL
);
374 /*** module entry points ***/
376 struct protocol_native
* protocol_native_new(struct core
*core
, struct socket_server
*server
) {
377 struct protocol_native
*p
;
378 assert(core
&& server
);
380 p
= malloc(sizeof(struct protocol_native
));
386 p
->connections
= idxset_new(NULL
, NULL
);
387 assert(p
->connections
);
389 socket_server_set_callback(p
->server
, on_connection
, p
);
394 void protocol_native_free(struct protocol_native
*p
) {
395 struct connection
*c
;
398 while ((c
= idxset_first(p
->connections
, NULL
)))
400 idxset_free(p
->connections
, NULL
, NULL
);
401 socket_server_free(p
->server
);