9 #include "sourceoutput.h"
10 #include "protocol-simple.h"
14 struct protocol_simple
*protocol
;
16 struct sink_input
*sink_input
;
17 struct source_output
*source_output
;
18 struct client
*client
;
19 struct memblockq
*input_memblockq
, *output_memblockq
;
22 struct protocol_simple
{
24 struct socket_server
*server
;
25 struct idxset
*connections
;
26 enum protocol_simple_mode mode
;
29 #define BUFSIZE PIPE_BUF
31 static void free_connection(void *data
, void *userdata
) {
32 struct connection
*c
= data
;
36 sink_input_free(c
->sink_input
);
38 source_output_free(c
->source_output
);
40 client_free(c
->client
);
42 iochannel_free(c
->io
);
43 if (c
->input_memblockq
)
44 memblockq_free(c
->input_memblockq
);
45 if (c
->output_memblockq
)
46 memblockq_free(c
->output_memblockq
);
50 static void destroy_connection(struct connection
*c
) {
51 assert(c
&& c
->protocol
);
52 idxset_remove_by_data(c
->protocol
->connections
, c
, NULL
);
53 free_connection(c
, NULL
);
56 static int do_read(struct connection
*c
) {
57 struct memchunk chunk
;
61 if (!iochannel_is_readable(c
->io
))
64 if (!c
->sink_input
|| !memblockq_is_writable(c
->input_memblockq
, BUFSIZE
))
67 chunk
.memblock
= memblock_new(BUFSIZE
);
68 assert(chunk
.memblock
);
70 if ((r
= iochannel_read(c
->io
, chunk
.memblock
->data
, BUFSIZE
)) <= 0) {
71 fprintf(stderr
, "read(): %s\n", r
== 0 ? "EOF" : strerror(errno
));
72 memblock_unref(chunk
.memblock
);
76 chunk
.memblock
->length
= r
;
80 assert(c
->input_memblockq
);
81 memblockq_push(c
->input_memblockq
, &chunk
, 0);
82 memblock_unref(chunk
.memblock
);
83 sink_notify(c
->sink_input
->sink
);
86 u1
= memblockq_get_latency(c
->input_memblockq
);
87 u2
= sink_get_latency(c
->sink_input
->sink
);
89 fprintf(stderr
, "latency: %u+%u=%u\r", u1
, u2
, u1
+u2
);
94 static int do_write(struct connection
*c
) {
95 struct memchunk chunk
;
98 if (!iochannel_is_writable(c
->io
))
101 if (!c
->source_output
)
104 assert(c
->output_memblockq
);
105 if (memblockq_peek(c
->output_memblockq
, &chunk
) < 0)
108 assert(chunk
.memblock
&& chunk
.length
);
110 if ((r
= iochannel_write(c
->io
, chunk
.memblock
->data
+chunk
.index
, chunk
.length
)) < 0) {
111 fprintf(stderr
, "write(): %s\n", strerror(errno
));
112 memblock_unref(chunk
.memblock
);
116 memblockq_drop(c
->output_memblockq
, r
);
117 memblock_unref(chunk
.memblock
);
121 /*** sink_input callbacks ***/
123 static int sink_input_peek_cb(struct sink_input
*i
, struct memchunk
*chunk
, uint8_t *volume
) {
124 struct connection
*c
= i
->userdata
;
125 assert(i
&& c
&& chunk
&& volume
);
127 if (memblockq_peek(c
->input_memblockq
, chunk
) < 0)
134 static void sink_input_drop_cb(struct sink_input
*i
, size_t length
) {
135 struct connection
*c
= i
->userdata
;
136 assert(i
&& c
&& length
);
138 memblockq_drop(c
->input_memblockq
, length
);
141 destroy_connection(c
);
144 static void sink_input_kill_cb(struct sink_input
*i
) {
145 assert(i
&& i
->userdata
);
146 destroy_connection((struct connection
*) i
->userdata
);
149 /*** source_output callbacks ***/
151 static void source_output_push_cb(struct source_output
*o
, struct memchunk
*chunk
) {
152 struct connection
*c
= o
->userdata
;
153 assert(o
&& c
&& chunk
);
155 memblockq_push(c
->output_memblockq
, chunk
, 0);
158 destroy_connection(c
);
161 static void source_output_kill_cb(struct source_output
*o
) {
162 assert(o
&& o
->userdata
);
163 destroy_connection((struct connection
*) o
->userdata
);
167 /*** client callbacks ***/
169 static void client_kill_cb(struct client
*c
) {
170 assert(c
&& c
->userdata
);
171 destroy_connection((struct connection
*) c
->userdata
);
174 /*** iochannel callbacks ***/
176 static void io_callback(struct iochannel
*io
, void *userdata
) {
177 struct connection
*c
= userdata
;
178 assert(io
&& c
&& c
->io
== io
);
180 if (do_read(c
) < 0 || do_write(c
) < 0)
181 destroy_connection(c
);
184 /*** socket_server callbacks */
186 static void on_connection(struct socket_server
*s
, struct iochannel
*io
, void *userdata
) {
187 struct protocol_simple
*p
= userdata
;
188 struct connection
*c
= NULL
;
189 assert(s
&& io
&& p
);
191 c
= malloc(sizeof(struct connection
));
194 c
->sink_input
= NULL
;
195 c
->source_output
= NULL
;
196 c
->input_memblockq
= c
->output_memblockq
= NULL
;
199 c
->client
= client_new(p
->core
, "SIMPLE", "Client");
201 c
->client
->kill
= client_kill_cb
;
202 c
->client
->userdata
= c
;
204 if (p
->mode
& PROTOCOL_SIMPLE_RECORD
) {
205 struct source
*source
;
208 if (!(source
= core_get_default_source(p
->core
))) {
209 fprintf(stderr
, "Failed to get default source.\n");
213 c
->source_output
= source_output_new(source
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
214 assert(c
->source_output
);
215 c
->source_output
->push
= source_output_push_cb
;
216 c
->source_output
->kill
= source_output_kill_cb
;
217 c
->source_output
->userdata
= c
;
219 l
= 5*bytes_per_second(&DEFAULT_SAMPLE_SPEC
); /* 5s */
220 c
->output_memblockq
= memblockq_new(l
, sample_size(&DEFAULT_SAMPLE_SPEC
), l
/2);
223 if (p
->mode
& PROTOCOL_SIMPLE_PLAYBACK
) {
227 if (!(sink
= core_get_default_sink(p
->core
))) {
228 fprintf(stderr
, "Failed to get default sink.\n");
232 c
->sink_input
= sink_input_new(sink
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
233 assert(c
->sink_input
);
234 c
->sink_input
->peek
= sink_input_peek_cb
;
235 c
->sink_input
->drop
= sink_input_drop_cb
;
236 c
->sink_input
->kill
= sink_input_kill_cb
;
237 c
->sink_input
->userdata
= c
;
239 l
= bytes_per_second(&DEFAULT_SAMPLE_SPEC
)/2; /* half a second */
240 c
->input_memblockq
= memblockq_new(l
, sample_size(&DEFAULT_SAMPLE_SPEC
), l
/2);
244 iochannel_set_callback(c
->io
, io_callback
, c
);
245 idxset_put(p
->connections
, c
, NULL
);
250 free_connection(c
, NULL
);
251 iochannel_free(c
->io
);
256 struct protocol_simple
* protocol_simple_new(struct core
*core
, struct socket_server
*server
, enum protocol_simple_mode mode
) {
257 struct protocol_simple
* p
;
258 assert(core
&& server
&& mode
<= PROTOCOL_SIMPLE_DUPLEX
&& mode
> 0);
260 p
= malloc(sizeof(struct protocol_simple
));
264 p
->connections
= idxset_new(NULL
, NULL
);
267 socket_server_set_callback(p
->server
, on_connection
, p
);
273 void protocol_simple_free(struct protocol_simple
*p
) {
276 idxset_free(p
->connections
, free_connection
, NULL
);
277 socket_server_free(p
->server
);