X-Git-Url: https://code.delx.au/pulseaudio/blobdiff_plain/b681622b17e23878cdd0683fdf57dcc35c562c43..e02be6c15beddec976220bce2ee1a68520286c01:/polyp/protocol-esound.c diff --git a/polyp/protocol-esound.c b/polyp/protocol-esound.c index 755ec21d..d9ed66b9 100644 --- a/polyp/protocol-esound.c +++ b/polyp/protocol-esound.c @@ -4,7 +4,7 @@ This file is part of polypaudio. polypaudio is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published + it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. @@ -13,7 +13,7 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - You should have received a copy of the GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with polypaudio; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. @@ -42,11 +42,16 @@ #include "scache.h" #include "sample-util.h" #include "authkey.h" -#include "debug.h" #include "namereg.h" #include "xmalloc.h" #include "log.h" +/* Don't accept more connection than this */ +#define MAX_CONNECTIONS 10 + +/* Kick a client if it doesn't authenticate within this time */ +#define AUTH_TIMEOUT 5 + #define DEFAULT_COOKIE_FILE ".esd_auth" #define PLAYBACK_BUFFER_SECONDS (.5) @@ -58,10 +63,13 @@ #define SCACHE_PREFIX "esound." +#define PA_TYPEID_ESOUND PA_TYPEID_MAKE('E', 'S', 'D', 'P') + /* This is heavily based on esound's code */ struct connection { uint32_t index; + int dead; struct pa_protocol_esound *protocol; struct pa_iochannel *io; struct pa_client *client; @@ -76,14 +84,19 @@ struct connection { struct pa_source_output *source_output; struct pa_memblockq *input_memblockq, *output_memblockq; struct pa_defer_event *defer_event; + struct { struct pa_memblock *current_memblock; size_t memblock_index, fragment_size; } playback; - struct pa_memchunk scache_memchunk; - char *scache_name; - struct pa_sample_spec scache_sample_spec; + struct { + struct pa_memchunk memchunk; + char *name; + struct pa_sample_spec sample_spec; + } scache; + + struct pa_time_event *auth_timeout_event; }; struct pa_protocol_esound { @@ -107,6 +120,7 @@ static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk); static void sink_input_kill_cb(struct pa_sink_input *i); static pa_usec_t sink_input_get_latency_cb(struct pa_sink_input *i); +static pa_usec_t source_output_get_latency_cb(struct pa_source_output *o); static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk); static void source_output_kill_cb(struct pa_source_output *o); @@ -121,6 +135,7 @@ static int esd_proto_stream_pan(struct connection *c, esd_proto_t request, const static int esd_proto_sample_cache(struct connection *c, esd_proto_t request, const void *data, size_t length); static int esd_proto_sample_free_or_play(struct connection *c, esd_proto_t request, const void *data, size_t length); static int esd_proto_sample_get_id(struct connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_standby_or_resume(struct connection *c, esd_proto_t request, const void *data, size_t length); /* the big map of protocol handler info */ static struct proto_handler proto_map[ESD_PROTO_MAX] = { @@ -132,17 +147,17 @@ static struct proto_handler proto_map[ESD_PROTO_MAX] = { { ESD_NAME_MAX + 2 * sizeof(int), esd_proto_stream_record, "stream rec" }, { ESD_NAME_MAX + 2 * sizeof(int), esd_proto_stream_record, "stream mon" }, - { ESD_NAME_MAX + 3 * sizeof(int), esd_proto_sample_cache, "sample cache" }, + { ESD_NAME_MAX + 3 * sizeof(int), esd_proto_sample_cache, "sample cache" }, /* 6 */ { sizeof(int), esd_proto_sample_free_or_play, "sample free" }, - { sizeof(int), esd_proto_sample_free_or_play, "sample play" }, + { sizeof(int), esd_proto_sample_free_or_play, "sample play" }, /* 8 */ { sizeof(int), NULL, "sample loop" }, { sizeof(int), NULL, "sample stop" }, { -1, NULL, "TODO: sample kill" }, - { ESD_KEY_LEN + sizeof(int), NULL, "standby" }, - { ESD_KEY_LEN + sizeof(int), NULL, "resume" }, + { ESD_KEY_LEN + sizeof(int), esd_proto_standby_or_resume, "standby" }, /* NOOP! */ + { ESD_KEY_LEN + sizeof(int), esd_proto_standby_or_resume, "resume" }, /* NOOP! */ /* 13 */ - { ESD_NAME_MAX, esd_proto_sample_get_id, "sample getid" }, + { ESD_NAME_MAX, esd_proto_sample_get_id, "sample getid" }, /* 14 */ { ESD_NAME_MAX + 2 * sizeof(int), NULL, "stream filter" }, { sizeof(int), esd_proto_server_info, "server info" }, @@ -167,10 +182,16 @@ static void connection_free(struct connection *c) { pa_client_free(c->client); - if (c->sink_input) - pa_sink_input_free(c->sink_input); - if (c->source_output) - pa_source_output_free(c->source_output); + if (c->sink_input) { + pa_sink_input_disconnect(c->sink_input); + pa_sink_input_unref(c->sink_input); + } + + if (c->source_output) { + pa_source_output_disconnect(c->source_output); + pa_source_output_unref(c->source_output); + } + if (c->input_memblockq) pa_memblockq_free(c->input_memblockq); if (c->output_memblockq) @@ -181,15 +202,19 @@ static void connection_free(struct connection *c) { pa_xfree(c->read_data); pa_xfree(c->write_data); - - pa_iochannel_free(c->io); + + if (c->io) + pa_iochannel_free(c->io); if (c->defer_event) c->protocol->core->mainloop->defer_free(c->defer_event); - if (c->scache_memchunk.memblock) - pa_memblock_unref(c->scache_memchunk.memblock); - pa_xfree(c->scache_name); + if (c->scache.memchunk.memblock) + pa_memblock_unref(c->scache.memchunk.memblock); + pa_xfree(c->scache.name); + + if (c->auth_timeout_event) + c->protocol->core->mainloop->time_free(c->auth_timeout_event); pa_xfree(c); } @@ -244,6 +269,10 @@ static int esd_proto_connect(struct connection *c, esd_proto_t request, const vo } c->authorized = 1; + if (c->auth_timeout_event) { + c->protocol->core->mainloop->time_free(c->auth_timeout_event); + c->auth_timeout_event = NULL; + } } ekey = *(uint32_t*)((uint8_t*) data+ESD_KEY_LEN); @@ -276,30 +305,32 @@ static int esd_proto_stream_play(struct connection *c, esd_proto_t request, cons ss.rate = rate; format_esd2native(format, &ss); - if (!pa_sample_spec_valid(&ss)) + if (!pa_sample_spec_valid(&ss)) { + pa_log(__FILE__": invalid sample specification\n"); return -1; + } if (!(sink = pa_namereg_get(c->protocol->core, c->protocol->sink_name, PA_NAMEREG_SINK, 1))) { - pa_log(__FILE__": No output sink\n"); + pa_log(__FILE__": no such sink\n"); return -1; } strncpy(name, (char*) data + sizeof(int)*2, sizeof(name)); name[sizeof(name)-1] = 0; - pa_client_rename(c->client, name); + pa_client_set_name(c->client, name); - assert(!c->input_memblockq); + assert(!c->sink_input && !c->input_memblockq); + + if (!(c->sink_input = pa_sink_input_new(sink, PA_TYPEID_ESOUND, name, &ss, 0, -1))) { + pa_log(__FILE__": failed to create sink input.\n"); + return -1; + } l = (size_t) (pa_bytes_per_second(&ss)*PLAYBACK_BUFFER_SECONDS); c->input_memblockq = pa_memblockq_new(l, 0, pa_frame_size(&ss), l/2, l/PLAYBACK_BUFFER_FRAGMENTS, c->protocol->core->memblock_stat); - assert(c->input_memblockq); pa_iochannel_socket_set_rcvbuf(c->io, l/PLAYBACK_BUFFER_FRAGMENTS*2); c->playback.fragment_size = l/10; - - assert(!c->sink_input); - c->sink_input = pa_sink_input_new(sink, name, &ss); - assert(c->sink_input); c->sink_input->owner = c->protocol->module; c->sink_input->client = c->client; @@ -330,44 +361,53 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co ss.rate = rate; format_esd2native(format, &ss); - if (!pa_sample_spec_valid(&ss)) + if (!pa_sample_spec_valid(&ss)) { + pa_log(__FILE__": invalid sample specification.\n"); return -1; + } if (request == ESD_PROTO_STREAM_MON) { struct pa_sink* sink; - if (!(sink = pa_namereg_get(c->protocol->core, c->protocol->sink_name, PA_NAMEREG_SINK, 1))) + if (!(sink = pa_namereg_get(c->protocol->core, c->protocol->sink_name, PA_NAMEREG_SINK, 1))) { + pa_log(__FILE__": no such sink.\n"); return -1; + } - if (!(source = sink->monitor_source)) + if (!(source = sink->monitor_source)) { + pa_log(__FILE__": no such monitor source.\n"); return -1; + } } else { assert(request == ESD_PROTO_STREAM_REC); - if (!(source = pa_namereg_get(c->protocol->core, c->protocol->source_name, PA_NAMEREG_SOURCE, 1))) + if (!(source = pa_namereg_get(c->protocol->core, c->protocol->source_name, PA_NAMEREG_SOURCE, 1))) { + pa_log(__FILE__": no such source.\n"); return -1; + } } strncpy(name, (char*) data + sizeof(int)*2, sizeof(name)); name[sizeof(name)-1] = 0; - pa_client_rename(c->client, name); + pa_client_set_name(c->client, name); - assert(!c->output_memblockq); + assert(!c->output_memblockq && !c->source_output); + + if (!(c->source_output = pa_source_output_new(source, PA_TYPEID_ESOUND, name, &ss, -1))) { + pa_log(__FILE__": failed to create source output\n"); + return -1; + } l = (size_t) (pa_bytes_per_second(&ss)*RECORD_BUFFER_SECONDS); c->output_memblockq = pa_memblockq_new(l, 0, pa_frame_size(&ss), 0, 0, c->protocol->core->memblock_stat); - assert(c->output_memblockq); pa_iochannel_socket_set_sndbuf(c->io, l/RECORD_BUFFER_FRAGMENTS*2); - assert(!c->source_output); - c->source_output = pa_source_output_new(source, name, &ss); - assert(c->source_output); - c->source_output->owner = c->protocol->module; c->source_output->client = c->client; c->source_output->push = source_output_push_cb; c->source_output->kill = source_output_kill_cb; + c->source_output->get_latency = source_output_get_latency_cb; c->source_output->userdata = c; c->state = ESD_STREAMING_DATA; @@ -574,17 +614,17 @@ static int esd_proto_sample_cache(struct connection *c, esd_proto_t request, con strncpy(name+sizeof(SCACHE_PREFIX)-1, (char*) data+3*sizeof(int), ESD_NAME_MAX); name[sizeof(name)-1] = 0; - assert(!c->scache_memchunk.memblock); - c->scache_memchunk.memblock = pa_memblock_new(sc_length, c->protocol->core->memblock_stat); - c->scache_memchunk.index = 0; - c->scache_memchunk.length = sc_length; - c->scache_sample_spec = ss; - assert(!c->scache_name); - c->scache_name = pa_xstrdup(name); + assert(!c->scache.memchunk.memblock); + c->scache.memchunk.memblock = pa_memblock_new(sc_length, c->protocol->core->memblock_stat); + c->scache.memchunk.index = 0; + c->scache.memchunk.length = sc_length; + c->scache.sample_spec = ss; + assert(!c->scache.name); + c->scache.name = pa_xstrdup(name); c->state = ESD_CACHING_SAMPLE; - pa_scache_add_item(c->protocol->core, c->scache_name, NULL, NULL, &index); + pa_scache_add_item(c->protocol->core, c->scache.name, NULL, NULL, &index); ok = connection_write(c, sizeof(int)); assert(ok); @@ -646,6 +686,15 @@ static int esd_proto_sample_free_or_play(struct connection *c, esd_proto_t reque return 0; } +static int esd_proto_standby_or_resume(struct connection *c, esd_proto_t request, const void *data, size_t length) { + int *ok; + ok = connection_write(c, sizeof(int)*2); + assert(ok); + ok[0] = 1; + ok[1] = 1; + return 0; +} + /*** client callbacks ***/ static void client_kill_cb(struct pa_client *c) { @@ -658,6 +707,8 @@ static void client_kill_cb(struct pa_client *c) { static int do_read(struct connection *c) { assert(c && c->io); +/* pa_log("READ\n"); */ + if (c->state == ESD_NEXT_REQUEST) { ssize_t r; assert(c->read_data_length < sizeof(c->request)); @@ -680,8 +731,10 @@ static int do_read(struct connection *c) { handler = proto_map+c->request; +/* pa_log(__FILE__": executing request #%u\n", c->request); */ + if (!handler->proc) { - pa_log(__FILE__": recieved unimplemented request.\n"); + pa_log(__FILE__": recieved unimplemented request #%u.\n", c->request); return -1; } @@ -727,29 +780,29 @@ static int do_read(struct connection *c) { } else if (c->state == ESD_CACHING_SAMPLE) { ssize_t r; - assert(c->scache_memchunk.memblock && c->scache_name && c->scache_memchunk.index < c->scache_memchunk.length); + assert(c->scache.memchunk.memblock && c->scache.name && c->scache.memchunk.index < c->scache.memchunk.length); - if ((r = pa_iochannel_read(c->io, (uint8_t*) c->scache_memchunk.memblock->data+c->scache_memchunk.index, c->scache_memchunk.length-c->scache_memchunk.index)) <= 0) { + if ((r = pa_iochannel_read(c->io, (uint8_t*) c->scache.memchunk.memblock->data+c->scache.memchunk.index, c->scache.memchunk.length-c->scache.memchunk.index)) <= 0) { pa_log(__FILE__": read() failed: %s\n", r == 0 ? "EOF" : strerror(errno)); return -1; } - c->scache_memchunk.index += r; - assert(c->scache_memchunk.index <= c->scache_memchunk.length); + c->scache.memchunk.index += r; + assert(c->scache.memchunk.index <= c->scache.memchunk.length); - if (c->scache_memchunk.index == c->scache_memchunk.length) { + if (c->scache.memchunk.index == c->scache.memchunk.length) { uint32_t index; int *ok; - c->scache_memchunk.index = 0; - pa_scache_add_item(c->protocol->core, c->scache_name, &c->scache_sample_spec, &c->scache_memchunk, &index); + c->scache.memchunk.index = 0; + pa_scache_add_item(c->protocol->core, c->scache.name, &c->scache.sample_spec, &c->scache.memchunk, &index); - pa_memblock_unref(c->scache_memchunk.memblock); - c->scache_memchunk.memblock = NULL; - c->scache_memchunk.index = c->scache_memchunk.length = 0; + pa_memblock_unref(c->scache.memchunk.memblock); + c->scache.memchunk.memblock = NULL; + c->scache.memchunk.index = c->scache.memchunk.length = 0; - pa_xfree(c->scache_name); - c->scache_name = NULL; + pa_xfree(c->scache.name); + c->scache.name = NULL; c->state = ESD_NEXT_REQUEST; @@ -765,6 +818,8 @@ static int do_read(struct connection *c) { assert(c->input_memblockq); +/* pa_log("STREAMING_DATA\n"); */ + if (!(l = pa_memblockq_missing(c->input_memblockq))) return 0; @@ -789,6 +844,8 @@ static int do_read(struct connection *c) { return -1; } +/* pa_log(__FILE__": read %u\n", r); */ + chunk.memblock = c->playback.current_memblock; chunk.index = c->playback.memblock_index; chunk.length = r; @@ -808,6 +865,8 @@ static int do_read(struct connection *c) { static int do_write(struct connection *c) { assert(c && c->io); +/* pa_log("WRITE\n"); */ + if (c->write_data_length) { ssize_t r; @@ -835,7 +894,7 @@ static int do_write(struct connection *c) { pa_log(__FILE__": write(): %s\n", strerror(errno)); return -1; } - + pa_memblockq_drop(c->output_memblockq, &chunk, r); pa_memblock_unref(chunk.memblock); } @@ -849,27 +908,43 @@ static void do_work(struct connection *c) { assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); c->protocol->core->mainloop->defer_enable(c->defer_event, 0); - if (pa_iochannel_is_hungup(c->io)) - goto fail; +/* pa_log("DOWORK %i\n", pa_iochannel_is_hungup(c->io)); */ - if (pa_iochannel_is_writable(c->io)) - if (do_write(c) < 0) + if (!c->dead && pa_iochannel_is_readable(c->io)) + if (do_read(c) < 0) goto fail; - if (pa_iochannel_is_readable(c->io)) - if (do_read(c) < 0) + if (!c->dead && pa_iochannel_is_writable(c->io)) + if (do_write(c) < 0) goto fail; + /* In case the line was hungup, make sure to rerun this function + as soon as possible, until all data has been read. */ + + if (!c->dead && pa_iochannel_is_hungup(c->io)) + c->protocol->core->mainloop->defer_enable(c->defer_event, 1); + return; fail: - connection_free(c); + + if (c->state == ESD_STREAMING_DATA && c->sink_input) { + c->dead = 1; + pa_memblockq_prebuf_disable(c->input_memblockq); + + pa_iochannel_free(c->io); + c->io = NULL; + + } else + connection_free(c); } static void io_callback(struct pa_iochannel*io, void *userdata) { struct connection *c = userdata; assert(io && c && c->io == io); +/* pa_log("IO\n"); */ + do_work(c); } @@ -879,6 +954,8 @@ static void defer_callback(struct pa_mainloop_api*a, struct pa_defer_event *e, v struct connection *c = userdata; assert(a && c && c->defer_event == e); +/* pa_log("DEFER\n"); */ + do_work(c); } @@ -889,8 +966,13 @@ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk assert(i && i->userdata && chunk); c = i->userdata; - if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) + if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) { + + if (c->dead) + connection_free(c); + return -1; + } return 0; } @@ -899,11 +981,15 @@ static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk struct connection*c = i->userdata; assert(i && c && length); +/* pa_log("DROP\n"); */ + pa_memblockq_drop(c->input_memblockq, chunk, length); /* do something */ assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); - c->protocol->core->mainloop->defer_enable(c->defer_event, 1); + + if (!c->dead) + c->protocol->core->mainloop->defer_enable(c->defer_event, 1); /* assert(pa_memblockq_get_length(c->input_memblockq) > 2048); */ } @@ -913,7 +999,6 @@ static void sink_input_kill_cb(struct pa_sink_input *i) { connection_free((struct connection *) i->userdata); } - static pa_usec_t sink_input_get_latency_cb(struct pa_sink_input *i) { struct connection*c = i->userdata; assert(i && c); @@ -930,7 +1015,9 @@ static void source_output_push_cb(struct pa_source_output *o, const struct pa_me /* do something */ assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); - c->protocol->core->mainloop->defer_enable(c->defer_event, 1); + + if (!c->dead) + c->protocol->core->mainloop->defer_enable(c->defer_event, 1); } static void source_output_kill_cb(struct pa_source_output *o) { @@ -938,28 +1025,50 @@ static void source_output_kill_cb(struct pa_source_output *o) { connection_free((struct connection *) o->userdata); } +static pa_usec_t source_output_get_latency_cb(struct pa_source_output *o) { + struct connection*c = o->userdata; + assert(o && c); + return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec); +} + /*** socket server callback ***/ +static void auth_timeout(struct pa_mainloop_api*m, struct pa_time_event *e, const struct timeval *tv, void *userdata) { + struct connection *c = userdata; + assert(m && tv && c && c->auth_timeout_event == e); + + if (!c->authorized) + connection_free(c); +} + static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, void *userdata) { struct connection *c; + struct pa_protocol_esound *p = userdata; char cname[256]; - assert(s && io && userdata); + assert(s && io && p); + if (pa_idxset_ncontents(p->connections)+1 > MAX_CONNECTIONS) { + pa_log(__FILE__": Warning! Too many connections (%u), dropping incoming connection.\n", MAX_CONNECTIONS); + pa_iochannel_free(io); + return; + } + c = pa_xmalloc(sizeof(struct connection)); - c->protocol = userdata; + c->protocol = p; c->io = io; pa_iochannel_set_callback(c->io, io_callback, c); pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname)); - assert(c->protocol->core); - c->client = pa_client_new(c->protocol->core, "ESOUND", cname); + assert(p->core); + c->client = pa_client_new(p->core, PA_TYPEID_ESOUND, cname); assert(c->client); - c->client->owner = c->protocol->module; + c->client->owner = p->module; c->client->kill = client_kill_cb; c->client->userdata = c; - c->authorized = c->protocol->public; + c->authorized = p->public; c->swap_byte_order = 0; + c->dead = 0; c->read_data_length = 0; c->read_data = pa_xmalloc(c->read_data_alloc = proto_map[ESD_PROTO_CONNECT].data_length); @@ -980,32 +1089,46 @@ static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, vo c->playback.memblock_index = 0; c->playback.fragment_size = 0; - c->scache_memchunk.length = c->scache_memchunk.index = 0; - c->scache_memchunk.memblock = NULL; - c->scache_name = NULL; + c->scache.memchunk.length = c->scache.memchunk.index = 0; + c->scache.memchunk.memblock = NULL; + c->scache.name = NULL; + + if (!c->authorized) { + struct timeval tv; + gettimeofday(&tv, NULL); + tv.tv_sec += AUTH_TIMEOUT; + c->auth_timeout_event = p->core->mainloop->time_new(p->core->mainloop, &tv, auth_timeout, c); + } else + c->auth_timeout_event = NULL; - c->defer_event = c->protocol->core->mainloop->defer_new(c->protocol->core->mainloop, defer_callback, c); + c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c); assert(c->defer_event); - c->protocol->core->mainloop->defer_enable(c->defer_event, 0); + p->core->mainloop->defer_enable(c->defer_event, 0); - pa_idxset_put(c->protocol->connections, c, &c->index); + pa_idxset_put(p->connections, c, &c->index); } /*** entry points ***/ struct pa_protocol_esound* pa_protocol_esound_new(struct pa_core*core, struct pa_socket_server *server, struct pa_module *m, struct pa_modargs *ma) { struct pa_protocol_esound *p; + int public = 0; assert(core && server && ma); p = pa_xmalloc(sizeof(struct pa_protocol_esound)); + if (pa_modargs_get_value_boolean(ma, "public", &public) < 0) { + pa_log(__FILE__": public= expects a boolean argument.\n"); + return NULL; + } + if (pa_authkey_load_auto(pa_modargs_get_value(ma, "cookie", DEFAULT_COOKIE_FILE), p->esd_key, sizeof(p->esd_key)) < 0) { pa_xfree(p); return NULL; } p->module = m; - p->public = 0; + p->public = public; p->server = server; pa_socket_server_set_callback(p->server, on_connection, p); p->core = core;