8 #include "inputstream.h"
9 #include "outputstream.h"
10 #include "protocol-simple.h"
14 struct protocol_simple
*protocol
;
16 struct input_stream
*istream
;
17 struct output_stream
*ostream
;
18 struct client
*client
;
21 struct protocol_simple
{
23 struct socket_server
*server
;
24 struct idxset
*connections
;
25 enum protocol_simple_mode mode
;
28 #define BUFSIZE PIPE_BUF
30 static void free_connection(void *data
, void *userdata
) {
31 struct connection
*c
= data
;
35 input_stream_free(c
->istream
);
37 output_stream_free(c
->ostream
);
39 client_free(c
->client
);
41 iochannel_free(c
->io
);
45 static void destroy_connection(struct connection
*c
) {
46 assert(c
&& c
->protocol
);
47 idxset_remove_by_data(c
->protocol
->connections
, c
, NULL
);
48 free_connection(c
, NULL
);
51 static void istream_kill_cb(struct input_stream
*i
, void *userdata
) {
52 struct connection
*c
= userdata
;
54 destroy_connection(c
);
57 static void ostream_kill_cb(struct output_stream
*o
, void *userdata
) {
58 struct connection
*c
= userdata
;
60 destroy_connection(c
);
63 static void client_kill_cb(struct client
*client
, void*userdata
) {
64 struct connection
*c
= userdata
;
66 destroy_connection(c
);
69 static int do_read(struct connection
*c
) {
70 struct memchunk chunk
;
73 if (!iochannel_is_readable(c
->io
))
76 if (!c
->istream
|| !memblockq_is_writable(c
->istream
->memblockq
, BUFSIZE
))
79 chunk
.memblock
= memblock_new(BUFSIZE
);
80 assert(chunk
.memblock
);
82 if ((r
= iochannel_read(c
->io
, chunk
.memblock
->data
, BUFSIZE
)) <= 0) {
83 fprintf(stderr
, "read(): %s\n", r
== 0 ? "EOF" : strerror(errno
));
84 memblock_unref(chunk
.memblock
);
88 chunk
.memblock
->length
= r
;
92 memblockq_push(c
->istream
->memblockq
, &chunk
, 0);
93 input_stream_notify_sink(c
->istream
);
94 memblock_unref(chunk
.memblock
);
98 static int do_write(struct connection
*c
) {
99 struct memchunk chunk
;
102 if (!iochannel_is_writable(c
->io
))
108 memblockq_peek(c
->ostream
->memblockq
, &chunk
);
109 assert(chunk
.memblock
&& chunk
.length
);
111 if ((r
= iochannel_write(c
->io
, chunk
.memblock
->data
+chunk
.index
, chunk
.length
)) < 0) {
112 fprintf(stderr
, "write(): %s\n", strerror(errno
));
113 memblock_unref(chunk
.memblock
);
117 memblockq_drop(c
->ostream
->memblockq
, r
);
118 memblock_unref(chunk
.memblock
);
122 static void io_callback(struct iochannel
*io
, void *userdata
) {
123 struct connection
*c
= userdata
;
124 assert(io
&& c
&& c
->io
== io
);
126 if (do_read(c
) < 0 || do_write(c
) < 0)
127 destroy_connection(c
);
130 static void istream_notify_cb(struct input_stream
*i
, void *userdata
) {
131 struct connection
*c
= userdata
;
132 assert(i
&& c
&& c
->istream
== i
);
135 destroy_connection(c
);
138 static void on_connection(struct socket_server
*s
, struct iochannel
*io
, void *userdata
) {
139 struct protocol_simple
*p
= userdata
;
140 struct connection
*c
= NULL
;
141 assert(s
&& io
&& p
);
143 c
= malloc(sizeof(struct connection
));
150 c
->client
= client_new(p
->core
, "SIMPLE", "Client");
152 client_set_kill_callback(c
->client
, client_kill_cb
, c
);
154 if (p
->mode
& PROTOCOL_SIMPLE_RECORD
) {
155 struct source
*source
;
157 if (!(source
= core_get_default_source(p
->core
))) {
158 fprintf(stderr
, "Failed to get default source.\n");
162 c
->ostream
= output_stream_new(source
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
164 output_stream_set_kill_callback(c
->ostream
, ostream_kill_cb
, c
);
167 if (p
->mode
& PROTOCOL_SIMPLE_PLAYBACK
) {
170 if (!(sink
= core_get_default_sink(p
->core
))) {
171 fprintf(stderr
, "Failed to get default sink.\n");
175 c
->istream
= input_stream_new(sink
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
177 input_stream_set_kill_callback(c
->istream
, istream_kill_cb
, c
);
178 input_stream_set_notify_callback(c
->istream
, istream_notify_cb
, c
);
182 iochannel_set_callback(c
->io
, io_callback
, c
);
183 idxset_put(p
->connections
, c
, NULL
);
189 client_free(c
->client
);
191 input_stream_free(c
->istream
);
193 output_stream_free(c
->ostream
);
195 iochannel_free(c
->io
);
200 struct protocol_simple
* protocol_simple_new(struct core
*core
, struct socket_server
*server
, enum protocol_simple_mode mode
) {
201 struct protocol_simple
* p
;
202 assert(core
&& server
&& mode
<= PROTOCOL_SIMPLE_DUPLEX
&& mode
> 0);
204 p
= malloc(sizeof(struct protocol_simple
));
208 p
->connections
= idxset_new(NULL
, NULL
);
211 socket_server_set_callback(p
->server
, on_connection
, p
);
217 void protocol_simple_free(struct protocol_simple
*p
) {
220 idxset_free(p
->connections
, free_connection
, NULL
);
221 socket_server_free(p
->server
);