9 #include "sourceoutput.h"
10 #include "protocol-simple.h"
12 #include "sample-util.h"
15 struct protocol_simple
*protocol
;
17 struct sink_input
*sink_input
;
18 struct source_output
*source_output
;
19 struct client
*client
;
20 struct memblockq
*input_memblockq
, *output_memblockq
;
23 struct protocol_simple
{
25 struct socket_server
*server
;
26 struct idxset
*connections
;
27 enum protocol_simple_mode mode
;
30 #define BUFSIZE PIPE_BUF
32 static void free_connection(void *data
, void *userdata
) {
33 struct connection
*c
= data
;
37 sink_input_free(c
->sink_input
);
39 source_output_free(c
->source_output
);
41 client_free(c
->client
);
43 iochannel_free(c
->io
);
44 if (c
->input_memblockq
)
45 memblockq_free(c
->input_memblockq
);
46 if (c
->output_memblockq
)
47 memblockq_free(c
->output_memblockq
);
51 static void destroy_connection(struct connection
*c
) {
52 assert(c
&& c
->protocol
);
53 idxset_remove_by_data(c
->protocol
->connections
, c
, NULL
);
54 free_connection(c
, NULL
);
57 static int do_read(struct connection
*c
) {
58 struct memchunk chunk
;
61 if (!iochannel_is_readable(c
->io
))
64 if (!c
->sink_input
|| !memblockq_is_writable(c
->input_memblockq
, BUFSIZE
))
67 chunk
.memblock
= memblock_new(BUFSIZE
);
68 assert(chunk
.memblock
);
70 if ((r
= iochannel_read(c
->io
, chunk
.memblock
->data
, BUFSIZE
)) <= 0) {
71 fprintf(stderr
, "read(): %s\n", r
== 0 ? "EOF" : strerror(errno
));
72 memblock_unref(chunk
.memblock
);
76 chunk
.memblock
->length
= r
;
80 assert(c
->input_memblockq
);
81 memblockq_push(c
->input_memblockq
, &chunk
, 0);
82 memblock_unref(chunk
.memblock
);
83 assert(c
->sink_input
);
84 sink_notify(c
->sink_input
->sink
);
89 static int do_write(struct connection
*c
) {
90 struct memchunk chunk
;
93 if (!iochannel_is_writable(c
->io
))
96 if (!c
->source_output
)
99 assert(c
->output_memblockq
);
100 if (memblockq_peek(c
->output_memblockq
, &chunk
) < 0)
103 assert(chunk
.memblock
&& chunk
.length
);
105 if ((r
= iochannel_write(c
->io
, chunk
.memblock
->data
+chunk
.index
, chunk
.length
)) < 0) {
106 fprintf(stderr
, "write(): %s\n", strerror(errno
));
107 memblock_unref(chunk
.memblock
);
111 memblockq_drop(c
->output_memblockq
, r
);
112 memblock_unref(chunk
.memblock
);
116 /*** sink_input callbacks ***/
118 static int sink_input_peek_cb(struct sink_input
*i
, struct memchunk
*chunk
) {
120 assert(i
&& i
->userdata
&& chunk
);
123 if (memblockq_peek(c
->input_memblockq
, chunk
) < 0)
129 static void sink_input_drop_cb(struct sink_input
*i
, size_t length
) {
130 struct connection
*c
= i
->userdata
;
131 assert(i
&& c
&& length
);
133 memblockq_drop(c
->input_memblockq
, length
);
136 destroy_connection(c
);
139 static void sink_input_kill_cb(struct sink_input
*i
) {
140 assert(i
&& i
->userdata
);
141 destroy_connection((struct connection
*) i
->userdata
);
145 static uint32_t sink_input_get_latency_cb(struct sink_input
*i
) {
146 struct connection
*c
= i
->userdata
;
148 return pa_samples_usec(memblockq_get_length(c
->input_memblockq
), &c
->sink_input
->sample_spec
);
151 /*** source_output callbacks ***/
153 static void source_output_push_cb(struct source_output
*o
, struct memchunk
*chunk
) {
154 struct connection
*c
= o
->userdata
;
155 assert(o
&& c
&& chunk
);
157 memblockq_push(c
->output_memblockq
, chunk
, 0);
160 destroy_connection(c
);
163 static void source_output_kill_cb(struct source_output
*o
) {
164 assert(o
&& o
->userdata
);
165 destroy_connection((struct connection
*) o
->userdata
);
168 /*** client callbacks ***/
170 static void client_kill_cb(struct client
*c
) {
171 assert(c
&& c
->userdata
);
172 destroy_connection((struct connection
*) c
->userdata
);
175 /*** iochannel callbacks ***/
177 static void io_callback(struct iochannel
*io
, void *userdata
) {
178 struct connection
*c
= userdata
;
179 assert(io
&& c
&& c
->io
== io
);
181 if (do_read(c
) < 0 || do_write(c
) < 0)
182 destroy_connection(c
);
185 /*** socket_server callbacks */
187 static void on_connection(struct socket_server
*s
, struct iochannel
*io
, void *userdata
) {
188 struct protocol_simple
*p
= userdata
;
189 struct connection
*c
= NULL
;
191 assert(s
&& io
&& p
);
193 c
= malloc(sizeof(struct connection
));
196 c
->sink_input
= NULL
;
197 c
->source_output
= NULL
;
198 c
->input_memblockq
= c
->output_memblockq
= NULL
;
201 iochannel_peer_to_string(io
, cname
, sizeof(cname
));
202 c
->client
= client_new(p
->core
, "SIMPLE", cname
);
204 c
->client
->kill
= client_kill_cb
;
205 c
->client
->userdata
= c
;
207 if (p
->mode
& PROTOCOL_SIMPLE_RECORD
) {
208 struct source
*source
;
211 if (!(source
= source_get_default(p
->core
))) {
212 fprintf(stderr
, "Failed to get default source.\n");
216 c
->source_output
= source_output_new(source
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
217 assert(c
->source_output
);
218 c
->source_output
->push
= source_output_push_cb
;
219 c
->source_output
->kill
= source_output_kill_cb
;
220 c
->source_output
->userdata
= c
;
222 l
= 5*pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC
); /* 5s */
223 c
->output_memblockq
= memblockq_new(l
, pa_sample_size(&DEFAULT_SAMPLE_SPEC
), l
/2);
226 if (p
->mode
& PROTOCOL_SIMPLE_PLAYBACK
) {
230 if (!(sink
= sink_get_default(p
->core
))) {
231 fprintf(stderr
, "Failed to get default sink.\n");
235 c
->sink_input
= sink_input_new(sink
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
236 assert(c
->sink_input
);
237 c
->sink_input
->peek
= sink_input_peek_cb
;
238 c
->sink_input
->drop
= sink_input_drop_cb
;
239 c
->sink_input
->kill
= sink_input_kill_cb
;
240 c
->sink_input
->get_latency
= sink_input_get_latency_cb
;
241 c
->sink_input
->userdata
= c
;
243 l
= pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC
)/2; /* half a second */
244 c
->input_memblockq
= memblockq_new(l
, pa_sample_size(&DEFAULT_SAMPLE_SPEC
), l
/2);
248 iochannel_set_callback(c
->io
, io_callback
, c
);
249 idxset_put(p
->connections
, c
, NULL
);
254 free_connection(c
, NULL
);
255 iochannel_free(c
->io
);
260 struct protocol_simple
* protocol_simple_new(struct core
*core
, struct socket_server
*server
, enum protocol_simple_mode mode
) {
261 struct protocol_simple
* p
;
262 assert(core
&& server
&& mode
<= PROTOCOL_SIMPLE_DUPLEX
&& mode
> 0);
264 p
= malloc(sizeof(struct protocol_simple
));
268 p
->connections
= idxset_new(NULL
, NULL
);
271 socket_server_set_callback(p
->server
, on_connection
, p
);
277 void protocol_simple_free(struct protocol_simple
*p
) {
280 idxset_free(p
->connections
, free_connection
, NULL
);
281 socket_server_free(p
->server
);