9 #include "sourceoutput.h"
10 #include "protocol-simple.h"
14 struct protocol_simple
*protocol
;
16 struct sink_input
*sink_input
;
17 struct source_output
*source_output
;
18 struct client
*client
;
19 struct memblockq
*input_memblockq
, *output_memblockq
;
22 struct protocol_simple
{
24 struct socket_server
*server
;
25 struct idxset
*connections
;
26 enum protocol_simple_mode mode
;
29 #define BUFSIZE PIPE_BUF
31 static void free_connection(void *data
, void *userdata
) {
32 struct connection
*c
= data
;
36 sink_input_free(c
->sink_input
);
38 source_output_free(c
->source_output
);
40 client_free(c
->client
);
42 iochannel_free(c
->io
);
43 if (c
->input_memblockq
)
44 memblockq_free(c
->input_memblockq
);
45 if (c
->output_memblockq
)
46 memblockq_free(c
->output_memblockq
);
50 static void destroy_connection(struct connection
*c
) {
51 assert(c
&& c
->protocol
);
52 idxset_remove_by_data(c
->protocol
->connections
, c
, NULL
);
53 free_connection(c
, NULL
);
56 static int do_read(struct connection
*c
) {
57 struct memchunk chunk
;
60 if (!iochannel_is_readable(c
->io
))
63 if (!c
->sink_input
|| !memblockq_is_writable(c
->input_memblockq
, BUFSIZE
))
66 chunk
.memblock
= memblock_new(BUFSIZE
);
67 assert(chunk
.memblock
);
69 if ((r
= iochannel_read(c
->io
, chunk
.memblock
->data
, BUFSIZE
)) <= 0) {
70 fprintf(stderr
, "read(): %s\n", r
== 0 ? "EOF" : strerror(errno
));
71 memblock_unref(chunk
.memblock
);
75 chunk
.memblock
->length
= r
;
79 assert(c
->input_memblockq
);
80 memblockq_push(c
->input_memblockq
, &chunk
, 0);
81 memblock_unref(chunk
.memblock
);
82 sink_notify(c
->sink_input
->sink
);
87 static int do_write(struct connection
*c
) {
88 struct memchunk chunk
;
91 if (!iochannel_is_writable(c
->io
))
94 if (!c
->source_output
)
97 assert(c
->output_memblockq
);
98 if (memblockq_peek(c
->output_memblockq
, &chunk
) < 0)
101 assert(chunk
.memblock
&& chunk
.length
);
103 if ((r
= iochannel_write(c
->io
, chunk
.memblock
->data
+chunk
.index
, chunk
.length
)) < 0) {
104 fprintf(stderr
, "write(): %s\n", strerror(errno
));
105 memblock_unref(chunk
.memblock
);
109 memblockq_drop(c
->output_memblockq
, r
);
110 memblock_unref(chunk
.memblock
);
114 /*** sink_input callbacks ***/
116 static int sink_input_peek_cb(struct sink_input
*i
, struct memchunk
*chunk
) {
117 struct connection
*c
= i
->userdata
;
118 assert(i
&& c
&& chunk
);
120 if (memblockq_peek(c
->input_memblockq
, chunk
) < 0)
126 static void sink_input_drop_cb(struct sink_input
*i
, size_t length
) {
127 struct connection
*c
= i
->userdata
;
128 assert(i
&& c
&& length
);
130 memblockq_drop(c
->input_memblockq
, length
);
133 destroy_connection(c
);
136 static void sink_input_kill_cb(struct sink_input
*i
) {
137 assert(i
&& i
->userdata
);
138 destroy_connection((struct connection
*) i
->userdata
);
142 static uint32_t sink_input_get_latency_cb(struct sink_input
*i
) {
143 struct connection
*c
= i
->userdata
;
145 return samples_usec(memblockq_get_length(c
->input_memblockq
), &DEFAULT_SAMPLE_SPEC
);
148 /*** source_output callbacks ***/
150 static void source_output_push_cb(struct source_output
*o
, struct memchunk
*chunk
) {
151 struct connection
*c
= o
->userdata
;
152 assert(o
&& c
&& chunk
);
154 memblockq_push(c
->output_memblockq
, chunk
, 0);
157 destroy_connection(c
);
160 static void source_output_kill_cb(struct source_output
*o
) {
161 assert(o
&& o
->userdata
);
162 destroy_connection((struct connection
*) o
->userdata
);
165 /*** client callbacks ***/
167 static void client_kill_cb(struct client
*c
) {
168 assert(c
&& c
->userdata
);
169 destroy_connection((struct connection
*) c
->userdata
);
172 /*** iochannel callbacks ***/
174 static void io_callback(struct iochannel
*io
, void *userdata
) {
175 struct connection
*c
= userdata
;
176 assert(io
&& c
&& c
->io
== io
);
178 if (do_read(c
) < 0 || do_write(c
) < 0)
179 destroy_connection(c
);
182 /*** socket_server callbacks */
184 static void on_connection(struct socket_server
*s
, struct iochannel
*io
, void *userdata
) {
185 struct protocol_simple
*p
= userdata
;
186 struct connection
*c
= NULL
;
187 assert(s
&& io
&& p
);
189 c
= malloc(sizeof(struct connection
));
192 c
->sink_input
= NULL
;
193 c
->source_output
= NULL
;
194 c
->input_memblockq
= c
->output_memblockq
= NULL
;
197 c
->client
= client_new(p
->core
, "SIMPLE", "Client");
199 c
->client
->kill
= client_kill_cb
;
200 c
->client
->userdata
= c
;
202 if (p
->mode
& PROTOCOL_SIMPLE_RECORD
) {
203 struct source
*source
;
206 if (!(source
= source_get_default(p
->core
))) {
207 fprintf(stderr
, "Failed to get default source.\n");
211 c
->source_output
= source_output_new(source
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
212 assert(c
->source_output
);
213 c
->source_output
->push
= source_output_push_cb
;
214 c
->source_output
->kill
= source_output_kill_cb
;
215 c
->source_output
->userdata
= c
;
217 l
= 5*bytes_per_second(&DEFAULT_SAMPLE_SPEC
); /* 5s */
218 c
->output_memblockq
= memblockq_new(l
, sample_size(&DEFAULT_SAMPLE_SPEC
), l
/2);
221 if (p
->mode
& PROTOCOL_SIMPLE_PLAYBACK
) {
225 if (!(sink
= sink_get_default(p
->core
))) {
226 fprintf(stderr
, "Failed to get default sink.\n");
230 c
->sink_input
= sink_input_new(sink
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
231 assert(c
->sink_input
);
232 c
->sink_input
->peek
= sink_input_peek_cb
;
233 c
->sink_input
->drop
= sink_input_drop_cb
;
234 c
->sink_input
->kill
= sink_input_kill_cb
;
235 c
->sink_input
->get_latency
= sink_input_get_latency_cb
;
236 c
->sink_input
->userdata
= c
;
238 l
= bytes_per_second(&DEFAULT_SAMPLE_SPEC
)/2; /* half a second */
239 c
->input_memblockq
= memblockq_new(l
, sample_size(&DEFAULT_SAMPLE_SPEC
), l
/2);
243 iochannel_set_callback(c
->io
, io_callback
, c
);
244 idxset_put(p
->connections
, c
, NULL
);
249 free_connection(c
, NULL
);
250 iochannel_free(c
->io
);
255 struct protocol_simple
* protocol_simple_new(struct core
*core
, struct socket_server
*server
, enum protocol_simple_mode mode
) {
256 struct protocol_simple
* p
;
257 assert(core
&& server
&& mode
<= PROTOCOL_SIMPLE_DUPLEX
&& mode
> 0);
259 p
= malloc(sizeof(struct protocol_simple
));
263 p
->connections
= idxset_new(NULL
, NULL
);
266 socket_server_set_callback(p
->server
, on_connection
, p
);
272 void protocol_simple_free(struct protocol_simple
*p
) {
275 idxset_free(p
->connections
, free_connection
, NULL
);
276 socket_server_free(p
->server
);