8 #include "inputstream.h"
9 #include "outputstream.h"
10 #include "protocol-simple.h"
14 struct protocol_simple
*protocol
;
16 struct input_stream
*istream
;
17 struct output_stream
*ostream
;
18 struct client
*client
;
21 struct protocol_simple
{
23 struct socket_server
*server
;
24 struct idxset
*connections
;
25 enum protocol_simple_mode mode
;
28 #define BUFSIZE PIPE_BUF
30 static void free_connection(void *data
, void *userdata
) {
31 struct connection
*c
= data
;
35 input_stream_free(c
->istream
);
37 output_stream_free(c
->ostream
);
39 client_free(c
->client
);
41 iochannel_free(c
->io
);
45 static void destroy_connection(struct connection
*c
) {
46 assert(c
&& c
->protocol
);
47 idxset_remove_by_data(c
->protocol
->connections
, c
, NULL
);
48 free_connection(c
, NULL
);
51 static void istream_kill_cb(struct input_stream
*i
, void *userdata
) {
52 struct connection
*c
= userdata
;
54 destroy_connection(c
);
57 static void ostream_kill_cb(struct output_stream
*o
, void *userdata
) {
58 struct connection
*c
= userdata
;
60 destroy_connection(c
);
63 static void client_kill_cb(struct client
*client
, void*userdata
) {
64 struct connection
*c
= userdata
;
66 destroy_connection(c
);
69 static void io_callback(struct iochannel
*io
, void *userdata
) {
70 struct connection
*c
= userdata
;
73 if (c
->istream
&& iochannel_is_readable(io
)) {
74 struct memchunk chunk
;
77 chunk
.memblock
= memblock_new(BUFSIZE
);
78 assert(chunk
.memblock
);
80 if ((r
= iochannel_read(io
, chunk
.memblock
->data
, BUFSIZE
)) <= 0) {
81 fprintf(stderr
, "read(): %s\n", r
== 0 ? "EOF" : strerror(errno
));
82 memblock_unref(chunk
.memblock
);
86 chunk
.memblock
->length
= r
;
90 memblockq_push(c
->istream
->memblockq
, &chunk
, 0);
91 input_stream_notify_sink(c
->istream
);
92 memblock_unref(chunk
.memblock
);
95 if (c
->ostream
&& iochannel_is_writable(io
)) {
96 struct memchunk chunk
;
99 memblockq_peek(c
->ostream
->memblockq
, &chunk
);
100 assert(chunk
.memblock
&& chunk
.length
);
102 if ((r
= iochannel_write(io
, chunk
.memblock
->data
+chunk
.index
, chunk
.length
)) < 0) {
103 fprintf(stderr
, "write(): %s\n", strerror(errno
));
104 memblock_unref(chunk
.memblock
);
108 memblockq_drop(c
->ostream
->memblockq
, r
);
109 memblock_unref(chunk
.memblock
);
115 destroy_connection(c
);
118 static void on_connection(struct socket_server
*s
, struct iochannel
*io
, void *userdata
) {
119 struct protocol_simple
*p
= userdata
;
120 struct connection
*c
= NULL
;
121 assert(s
&& io
&& p
);
123 c
= malloc(sizeof(struct connection
));
130 c
->client
= client_new(p
->core
, "SIMPLE", "Client");
132 client_set_kill_callback(c
->client
, client_kill_cb
, c
);
134 if (p
->mode
& PROTOCOL_SIMPLE_RECORD
) {
135 struct source
*source
;
137 if (!(source
= core_get_default_source(p
->core
))) {
138 fprintf(stderr
, "Failed to get default source.\n");
142 c
->ostream
= output_stream_new(source
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
144 output_stream_set_kill_callback(c
->ostream
, ostream_kill_cb
, c
);
147 if (p
->mode
& PROTOCOL_SIMPLE_PLAYBACK
) {
150 if (!(sink
= core_get_default_sink(p
->core
))) {
151 fprintf(stderr
, "Failed to get default sink.\n");
155 c
->istream
= input_stream_new(sink
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
157 input_stream_set_kill_callback(c
->istream
, istream_kill_cb
, c
);
161 iochannel_set_callback(c
->io
, io_callback
, c
);
162 idxset_put(p
->connections
, c
, NULL
);
168 client_free(c
->client
);
170 input_stream_free(c
->istream
);
172 output_stream_free(c
->ostream
);
174 iochannel_free(c
->io
);
179 struct protocol_simple
* protocol_simple_new(struct core
*core
, struct socket_server
*server
, enum protocol_simple_mode mode
) {
180 struct protocol_simple
* p
;
181 assert(core
&& server
&& mode
<= PROTOCOL_SIMPLE_DUPLEX
&& mode
> 0);
183 p
= malloc(sizeof(struct protocol_simple
));
187 p
->connections
= idxset_new(NULL
, NULL
);
190 socket_server_set_callback(p
->server
, on_connection
, p
);
196 void protocol_simple_free(struct protocol_simple
*p
) {
199 idxset_free(p
->connections
, free_connection
, NULL
);
200 socket_server_free(p
->server
);