9 #include "sourceoutput.h"
10 #include "protocol-simple.h"
12 #include "sample-util.h"
15 struct protocol_simple
*protocol
;
17 struct sink_input
*sink_input
;
18 struct source_output
*source_output
;
19 struct client
*client
;
20 struct memblockq
*input_memblockq
, *output_memblockq
;
23 struct protocol_simple
{
25 struct socket_server
*server
;
26 struct idxset
*connections
;
27 enum protocol_simple_mode mode
;
30 #define BUFSIZE PIPE_BUF
32 static void free_connection(void *data
, void *userdata
) {
33 struct connection
*c
= data
;
37 sink_input_free(c
->sink_input
);
39 source_output_free(c
->source_output
);
41 client_free(c
->client
);
43 iochannel_free(c
->io
);
44 if (c
->input_memblockq
)
45 memblockq_free(c
->input_memblockq
);
46 if (c
->output_memblockq
)
47 memblockq_free(c
->output_memblockq
);
51 static void destroy_connection(struct connection
*c
) {
52 assert(c
&& c
->protocol
);
53 idxset_remove_by_data(c
->protocol
->connections
, c
, NULL
);
54 free_connection(c
, NULL
);
57 static int do_read(struct connection
*c
) {
58 struct memchunk chunk
;
61 if (!iochannel_is_readable(c
->io
))
64 if (!c
->sink_input
|| !memblockq_is_writable(c
->input_memblockq
, BUFSIZE
))
67 chunk
.memblock
= memblock_new(BUFSIZE
);
68 assert(chunk
.memblock
);
70 if ((r
= iochannel_read(c
->io
, chunk
.memblock
->data
, BUFSIZE
)) <= 0) {
71 fprintf(stderr
, "read(): %s\n", r
== 0 ? "EOF" : strerror(errno
));
72 memblock_unref(chunk
.memblock
);
76 chunk
.memblock
->length
= chunk
.length
= r
;
79 assert(c
->input_memblockq
);
80 memblockq_push(c
->input_memblockq
, &chunk
, 0);
81 memblock_unref(chunk
.memblock
);
82 assert(c
->sink_input
);
83 sink_notify(c
->sink_input
->sink
);
88 static int do_write(struct connection
*c
) {
89 struct memchunk chunk
;
92 if (!iochannel_is_writable(c
->io
))
95 if (!c
->source_output
)
98 assert(c
->output_memblockq
);
99 if (memblockq_peek(c
->output_memblockq
, &chunk
) < 0)
102 assert(chunk
.memblock
&& chunk
.length
);
104 if ((r
= iochannel_write(c
->io
, chunk
.memblock
->data
+chunk
.index
, chunk
.length
)) < 0) {
105 fprintf(stderr
, "write(): %s\n", strerror(errno
));
106 memblock_unref(chunk
.memblock
);
110 memblockq_drop(c
->output_memblockq
, r
);
111 memblock_unref(chunk
.memblock
);
115 /*** sink_input callbacks ***/
117 static int sink_input_peek_cb(struct sink_input
*i
, struct memchunk
*chunk
) {
119 assert(i
&& i
->userdata
&& chunk
);
122 if (memblockq_peek(c
->input_memblockq
, chunk
) < 0)
128 static void sink_input_drop_cb(struct sink_input
*i
, size_t length
) {
129 struct connection
*c
= i
->userdata
;
130 assert(i
&& c
&& length
);
132 memblockq_drop(c
->input_memblockq
, length
);
135 destroy_connection(c
);
138 static void sink_input_kill_cb(struct sink_input
*i
) {
139 assert(i
&& i
->userdata
);
140 destroy_connection((struct connection
*) i
->userdata
);
144 static uint32_t sink_input_get_latency_cb(struct sink_input
*i
) {
145 struct connection
*c
= i
->userdata
;
147 return pa_samples_usec(memblockq_get_length(c
->input_memblockq
), &c
->sink_input
->sample_spec
);
150 /*** source_output callbacks ***/
152 static void source_output_push_cb(struct source_output
*o
, struct memchunk
*chunk
) {
153 struct connection
*c
= o
->userdata
;
154 assert(o
&& c
&& chunk
);
156 memblockq_push(c
->output_memblockq
, chunk
, 0);
159 destroy_connection(c
);
162 static void source_output_kill_cb(struct source_output
*o
) {
163 assert(o
&& o
->userdata
);
164 destroy_connection((struct connection
*) o
->userdata
);
167 /*** client callbacks ***/
169 static void client_kill_cb(struct client
*c
) {
170 assert(c
&& c
->userdata
);
171 destroy_connection((struct connection
*) c
->userdata
);
174 /*** iochannel callbacks ***/
176 static void io_callback(struct iochannel
*io
, void *userdata
) {
177 struct connection
*c
= userdata
;
178 assert(io
&& c
&& c
->io
== io
);
180 if (do_read(c
) < 0 || do_write(c
) < 0)
181 destroy_connection(c
);
184 /*** socket_server callbacks */
186 static void on_connection(struct socket_server
*s
, struct iochannel
*io
, void *userdata
) {
187 struct protocol_simple
*p
= userdata
;
188 struct connection
*c
= NULL
;
190 assert(s
&& io
&& p
);
192 c
= malloc(sizeof(struct connection
));
195 c
->sink_input
= NULL
;
196 c
->source_output
= NULL
;
197 c
->input_memblockq
= c
->output_memblockq
= NULL
;
200 iochannel_peer_to_string(io
, cname
, sizeof(cname
));
201 c
->client
= client_new(p
->core
, "SIMPLE", cname
);
203 c
->client
->kill
= client_kill_cb
;
204 c
->client
->userdata
= c
;
206 if (p
->mode
& PROTOCOL_SIMPLE_RECORD
) {
207 struct source
*source
;
210 if (!(source
= source_get_default(p
->core
))) {
211 fprintf(stderr
, "Failed to get default source.\n");
215 c
->source_output
= source_output_new(source
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
216 assert(c
->source_output
);
217 c
->source_output
->push
= source_output_push_cb
;
218 c
->source_output
->kill
= source_output_kill_cb
;
219 c
->source_output
->userdata
= c
;
221 l
= 5*pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC
); /* 5s */
222 c
->output_memblockq
= memblockq_new(l
, pa_sample_size(&DEFAULT_SAMPLE_SPEC
), l
/2);
225 if (p
->mode
& PROTOCOL_SIMPLE_PLAYBACK
) {
229 if (!(sink
= sink_get_default(p
->core
))) {
230 fprintf(stderr
, "Failed to get default sink.\n");
234 c
->sink_input
= sink_input_new(sink
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
235 assert(c
->sink_input
);
236 c
->sink_input
->peek
= sink_input_peek_cb
;
237 c
->sink_input
->drop
= sink_input_drop_cb
;
238 c
->sink_input
->kill
= sink_input_kill_cb
;
239 c
->sink_input
->get_latency
= sink_input_get_latency_cb
;
240 c
->sink_input
->userdata
= c
;
242 l
= pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC
)/2; /* half a second */
243 c
->input_memblockq
= memblockq_new(l
, pa_sample_size(&DEFAULT_SAMPLE_SPEC
), l
/2);
247 iochannel_set_callback(c
->io
, io_callback
, c
);
248 idxset_put(p
->connections
, c
, NULL
);
253 free_connection(c
, NULL
);
254 iochannel_free(c
->io
);
259 struct protocol_simple
* protocol_simple_new(struct core
*core
, struct socket_server
*server
, enum protocol_simple_mode mode
) {
260 struct protocol_simple
* p
;
261 assert(core
&& server
&& mode
<= PROTOCOL_SIMPLE_DUPLEX
&& mode
> 0);
263 p
= malloc(sizeof(struct protocol_simple
));
267 p
->connections
= idxset_new(NULL
, NULL
);
270 socket_server_set_callback(p
->server
, on_connection
, p
);
276 void protocol_simple_free(struct protocol_simple
*p
) {
279 idxset_free(p
->connections
, free_connection
, NULL
);
280 socket_server_free(p
->server
);