]> code.delx.au - pulseaudio/blobdiff - src/pulsecore/protocol-simple.c
core: memory leak, fix ref counting when moving streams
[pulseaudio] / src / pulsecore / protocol-simple.c
index 3ee2a0587450c03023fa1ab804d506efb0431322..44fe5973d07cd7b9536a0f3112ce0a989a5d6c9f 100644 (file)
@@ -1,5 +1,3 @@
-/* $Id$ */
-
 /***
   This file is part of PulseAudio.
 
 /***
   This file is part of PulseAudio.
 
@@ -7,7 +5,7 @@
 
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as published
 
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as published
-  by the Free Software Foundation; either version 2 of the License,
+  by the Free Software Foundation; either version 2.1 of the License,
   or (at your option) any later version.
 
   PulseAudio is distributed in the hope that it will be useful, but
   or (at your option) any later version.
 
   PulseAudio is distributed in the hope that it will be useful, but
@@ -32,6 +30,7 @@
 #include <string.h>
 
 #include <pulse/xmalloc.h>
 #include <string.h>
 
 #include <pulse/xmalloc.h>
+#include <pulse/timeval.h>
 
 #include <pulsecore/sink-input.h>
 #include <pulsecore/source-output.h>
 
 #include <pulsecore/sink-input.h>
 #include <pulsecore/source-output.h>
@@ -42,6 +41,8 @@
 #include <pulsecore/core-error.h>
 #include <pulsecore/atomic.h>
 #include <pulsecore/thread-mq.h>
 #include <pulsecore/core-error.h>
 #include <pulsecore/atomic.h>
 #include <pulsecore/thread-mq.h>
+#include <pulsecore/core-util.h>
+#include <pulsecore/shared.h>
 
 #include "protocol-simple.h"
 
 
 #include "protocol-simple.h"
 
 
 typedef struct connection {
     pa_msgobject parent;
 
 typedef struct connection {
     pa_msgobject parent;
-    pa_protocol_simple *protocol;
+    pa_simple_protocol *protocol;
+    pa_simple_options *options;
     pa_iochannel *io;
     pa_sink_input *sink_input;
     pa_source_output *source_output;
     pa_client *client;
     pa_memblockq *input_memblockq, *output_memblockq;
 
     pa_iochannel *io;
     pa_sink_input *sink_input;
     pa_source_output *source_output;
     pa_client *client;
     pa_memblockq *input_memblockq, *output_memblockq;
 
-    int dead;
+    pa_bool_t dead;
 
     struct {
         pa_memblock *current_memblock;
 
     struct {
         pa_memblock *current_memblock;
-        size_t memblock_index, fragment_size;
+        size_t memblock_index;
         pa_atomic_t missing;
         pa_atomic_t missing;
+        pa_bool_t underrun;
     } playback;
 } connection;
 
     } playback;
 } connection;
 
@@ -70,20 +73,11 @@ PA_DECLARE_CLASS(connection);
 #define CONNECTION(o) (connection_cast(o))
 static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);
 
 #define CONNECTION(o) (connection_cast(o))
 static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);
 
-struct pa_protocol_simple {
-    pa_module *module;
+struct pa_simple_protocol {
+    PA_REFCNT_DECLARE;
+
     pa_core *core;
     pa_core *core;
-    pa_socket_server*server;
     pa_idxset *connections;
     pa_idxset *connections;
-
-    enum {
-        RECORD = 1,
-        PLAYBACK = 2,
-        DUPLEX = 3
-    } mode;
-
-    pa_sample_spec sample_spec;
-    char *source_name, *sink_name;
 };
 
 enum {
 };
 
 enum {
@@ -97,11 +91,11 @@ enum {
     CONNECTION_MESSAGE_UNLINK_CONNECTION    /* Please drop a aconnection now */
 };
 
     CONNECTION_MESSAGE_UNLINK_CONNECTION    /* Please drop a aconnection now */
 };
 
-
 #define PLAYBACK_BUFFER_SECONDS (.5)
 #define PLAYBACK_BUFFER_FRAGMENTS (10)
 #define RECORD_BUFFER_SECONDS (5)
 #define PLAYBACK_BUFFER_SECONDS (.5)
 #define PLAYBACK_BUFFER_FRAGMENTS (10)
 #define RECORD_BUFFER_SECONDS (5)
-#define RECORD_BUFFER_FRAGMENTS (100)
+#define DEFAULT_SINK_LATENCY (300*PA_USEC_PER_MSEC)
+#define DEFAULT_SOURCE_LATENCY (300*PA_USEC_PER_MSEC)
 
 static void connection_unlink(connection *c) {
     pa_assert(c);
 
 static void connection_unlink(connection *c) {
     pa_assert(c);
@@ -109,6 +103,11 @@ static void connection_unlink(connection *c) {
     if (!c->protocol)
         return;
 
     if (!c->protocol)
         return;
 
+    if (c->options) {
+        pa_simple_options_unref(c->options);
+        c->options = NULL;
+    }
+
     if (c->sink_input) {
         pa_sink_input_unlink(c->sink_input);
         pa_sink_input_unref(c->sink_input);
     if (c->sink_input) {
         pa_sink_input_unlink(c->sink_input);
         pa_sink_input_unref(c->sink_input);
@@ -140,8 +139,6 @@ static void connection_free(pa_object *o) {
     connection *c = CONNECTION(o);
     pa_assert(c);
 
     connection *c = CONNECTION(o);
     pa_assert(c);
 
-    connection_unref(c);
-
     if (c->playback.current_memblock)
         pa_memblock_unref(c->playback.current_memblock);
 
     if (c->playback.current_memblock)
         pa_memblock_unref(c->playback.current_memblock);
 
@@ -158,27 +155,33 @@ static int do_read(connection *c) {
     ssize_t r;
     size_t l;
     void *p;
     ssize_t r;
     size_t l;
     void *p;
+    size_t space;
 
     connection_assert_ref(c);
 
 
     connection_assert_ref(c);
 
-    if (!c->sink_input || (l = pa_atomic_load(&c->playback.missing)) <= 0)
+    if (!c->sink_input || (l = (size_t) pa_atomic_load(&c->playback.missing)) <= 0)
         return 0;
 
         return 0;
 
-    if (l > c->playback.fragment_size)
-        l = c->playback.fragment_size;
+    if (c->playback.current_memblock) {
 
 
-    if (c->playback.current_memblock)
-        if (pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index < l) {
+        space = pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index;
+
+        if (space <= 0) {
             pa_memblock_unref(c->playback.current_memblock);
             c->playback.current_memblock = NULL;
             pa_memblock_unref(c->playback.current_memblock);
             c->playback.current_memblock = NULL;
-            c->playback.memblock_index = 0;
         }
         }
+    }
 
     if (!c->playback.current_memblock) {
 
     if (!c->playback.current_memblock) {
-        pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, l));
+        pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, (size_t) -1));
         c->playback.memblock_index = 0;
         c->playback.memblock_index = 0;
+
+        space = pa_memblock_get_length(c->playback.current_memblock);
     }
 
     }
 
+    if (l > space)
+        l = space;
+
     p = pa_memblock_acquire(c->playback.current_memblock);
     r = pa_iochannel_read(c->io, (uint8_t*) p + c->playback.memblock_index, l);
     pa_memblock_release(c->playback.current_memblock);
     p = pa_memblock_acquire(c->playback.current_memblock);
     r = pa_iochannel_read(c->io, (uint8_t*) p + c->playback.memblock_index, l);
     pa_memblock_release(c->playback.current_memblock);
@@ -194,12 +197,12 @@ static int do_read(connection *c) {
 
     chunk.memblock = c->playback.current_memblock;
     chunk.index = c->playback.memblock_index;
 
     chunk.memblock = c->playback.current_memblock;
     chunk.index = c->playback.memblock_index;
-    chunk.length = r;
+    chunk.length = (size_t) r;
 
 
-    c->playback.memblock_index += r;
+    c->playback.memblock_index += (size_t) r;
 
     pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL);
 
     pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL);
-    pa_atomic_sub(&c->playback.missing, r);
+    pa_atomic_sub(&c->playback.missing, (int) r);
 
     return 0;
 }
 
     return 0;
 }
@@ -237,7 +240,7 @@ static int do_write(connection *c) {
         return -1;
     }
 
         return -1;
     }
 
-    pa_memblockq_drop(c->output_memblockq, r);
+    pa_memblockq_drop(c->output_memblockq, (size_t) r);
 
     return 0;
 }
 
     return 0;
 }
@@ -248,16 +251,16 @@ static void do_work(connection *c) {
     if (c->dead)
         return;
 
     if (c->dead)
         return;
 
-    if (pa_iochannel_is_readable(c->io)) {
+    if (pa_iochannel_is_readable(c->io))
         if (do_read(c) < 0)
             goto fail;
         if (do_read(c) < 0)
             goto fail;
-    } else if (pa_iochannel_is_hungup(c->io))
+
+    if (!c->sink_input && pa_iochannel_is_hungup(c->io))
         goto fail;
 
         goto fail;
 
-    if (pa_iochannel_is_writable(c->io)) {
+    if (pa_iochannel_is_writable(c->io))
         if (do_write(c) < 0)
             goto fail;
         if (do_write(c) < 0)
             goto fail;
-    }
 
     return;
 
 
     return;
 
@@ -266,7 +269,7 @@ fail:
     if (c->sink_input) {
 
         /* If there is a sink input, we first drain what we already have read before shutting down the connection */
     if (c->sink_input) {
 
         /* If there is a sink input, we first drain what we already have read before shutting down the connection */
-        c->dead = 1;
+        c->dead = TRUE;
 
         pa_iochannel_free(c->io);
         c->io = NULL;
 
         pa_iochannel_free(c->io);
         c->io = NULL;
@@ -280,6 +283,9 @@ static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int6
     connection *c = CONNECTION(o);
     connection_assert_ref(c);
 
     connection *c = CONNECTION(o);
     connection_assert_ref(c);
 
+    if (!c->protocol)
+        return -1;
+
     switch (code) {
         case CONNECTION_MESSAGE_REQUEST_DATA:
             do_work(c);
     switch (code) {
         case CONNECTION_MESSAGE_REQUEST_DATA:
             do_work(c);
@@ -318,15 +324,19 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
             /* New data from the main loop */
             pa_memblockq_push_align(c->input_memblockq, chunk);
 
             /* New data from the main loop */
             pa_memblockq_push_align(c->input_memblockq, chunk);
 
+            if (pa_memblockq_is_readable(c->input_memblockq) && c->playback.underrun) {
+                pa_log_debug("Requesting rewind due to end of underrun.");
+                pa_sink_input_request_rewind(c->sink_input, 0, FALSE, TRUE, FALSE);
+            }
+
 /*             pa_log("got data, %u", pa_memblockq_get_length(c->input_memblockq)); */
 
             return 0;
         }
 
 /*             pa_log("got data, %u", pa_memblockq_get_length(c->input_memblockq)); */
 
             return 0;
         }
 
-        case SINK_INPUT_MESSAGE_DISABLE_PREBUF: {
+        case SINK_INPUT_MESSAGE_DISABLE_PREBUF:
             pa_memblockq_prebuf_disable(c->input_memblockq);
             return 0;
             pa_memblockq_prebuf_disable(c->input_memblockq);
             return 0;
-        }
 
         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
             pa_usec_t *r = userdata;
 
         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
             pa_usec_t *r = userdata;
@@ -345,32 +355,62 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
 /* Called from thread context */
 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
     connection *c;
 /* Called from thread context */
 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
     connection *c;
-    int r;
 
 
-    pa_assert(i);
+    pa_sink_input_assert_ref(i);
     c = CONNECTION(i->userdata);
     connection_assert_ref(c);
     pa_assert(chunk);
 
     c = CONNECTION(i->userdata);
     connection_assert_ref(c);
     pa_assert(chunk);
 
-    if ((r = pa_memblockq_peek(c->input_memblockq, chunk)) < 0) {
+    if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) {
+
+        c->playback.underrun = TRUE;
 
         if (c->dead && pa_sink_input_safe_to_remove(i))
             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL);
 
 
         if (c->dead && pa_sink_input_safe_to_remove(i))
             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL);
 
+        return -1;
     } else {
     } else {
-        size_t old, new;
+        size_t m;
+
+        chunk->length = PA_MIN(length, chunk->length);
+
+        c->playback.underrun = FALSE;
 
 
-        old = pa_memblockq_missing(c->input_memblockq);
         pa_memblockq_drop(c->input_memblockq, chunk->length);
         pa_memblockq_drop(c->input_memblockq, chunk->length);
-        new = pa_memblockq_missing(c->input_memblockq);
+        m = pa_memblockq_pop_missing(c->input_memblockq);
 
 
-        if (new > old) {
-            if (pa_atomic_add(&c->playback.missing, new - old) <= 0)
+        if (m > 0)
+            if (pa_atomic_add(&c->playback.missing, (int) m) <= 0)
                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
-        }
+
+        return 0;
     }
     }
+}
+
+/* Called from thread context */
+static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
+    connection *c;
+
+    pa_sink_input_assert_ref(i);
+    c = CONNECTION(i->userdata);
+    connection_assert_ref(c);
+
+    /* If we are in an underrun, then we don't rewind */
+    if (i->thread_info.underrun_for > 0)
+        return;
 
 
-    return r;
+    pa_memblockq_rewind(c->input_memblockq, nbytes);
+}
+
+/* Called from thread context */
+static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
+    connection *c;
+
+    pa_sink_input_assert_ref(i);
+    c = CONNECTION(i->userdata);
+    connection_assert_ref(c);
+
+    pa_memblockq_set_maxrewind(c->input_memblockq, nbytes);
 }
 
 /* Called from main context */
 }
 
 /* Called from main context */
@@ -386,7 +426,7 @@ static void sink_input_kill_cb(pa_sink_input *i) {
 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
     connection *c;
 
 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
     connection *c;
 
-    pa_assert(o);
+    pa_source_output_assert_ref(o);
     c = CONNECTION(o->userdata);
     pa_assert(c);
     pa_assert(chunk);
     c = CONNECTION(o->userdata);
     pa_assert(c);
     pa_assert(chunk);
@@ -405,7 +445,7 @@ static void source_output_kill_cb(pa_source_output *o) {
 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
     connection*c;
 
 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
     connection*c;
 
-    pa_assert(o);
+    pa_source_output_assert_ref(o);
     c = CONNECTION(o->userdata);
     pa_assert(c);
 
     c = CONNECTION(o->userdata);
     pa_assert(c);
 
@@ -437,14 +477,14 @@ static void io_callback(pa_iochannel*io, void *userdata) {
 
 /*** socket_server callbacks ***/
 
 
 /*** socket_server callbacks ***/
 
-static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) {
-    pa_protocol_simple *p = userdata;
+void pa_simple_protocol_connect(pa_simple_protocol *p, pa_iochannel *io, pa_simple_options *o) {
     connection *c = NULL;
     connection *c = NULL;
-    char cname[256];
+    char pname[128];
+    pa_client_new_data client_data;
 
 
-    pa_assert(s);
-    pa_assert(io);
     pa_assert(p);
     pa_assert(p);
+    pa_assert(io);
+    pa_assert(o);
 
     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
         pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
 
     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
         pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
@@ -456,41 +496,53 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
     c->parent.parent.free = connection_free;
     c->parent.process_msg = connection_process_msg;
     c->io = io;
     c->parent.parent.free = connection_free;
     c->parent.process_msg = connection_process_msg;
     c->io = io;
+    pa_iochannel_set_callback(c->io, io_callback, c);
+
     c->sink_input = NULL;
     c->source_output = NULL;
     c->input_memblockq = c->output_memblockq = NULL;
     c->protocol = p;
     c->sink_input = NULL;
     c->source_output = NULL;
     c->input_memblockq = c->output_memblockq = NULL;
     c->protocol = p;
+    c->options = pa_simple_options_ref(o);
     c->playback.current_memblock = NULL;
     c->playback.memblock_index = 0;
     c->playback.current_memblock = NULL;
     c->playback.memblock_index = 0;
-    c->playback.fragment_size = 0;
-    c->dead = 0;
+    c->dead = FALSE;
+    c->playback.underrun = TRUE;
     pa_atomic_store(&c->playback.missing, 0);
 
     pa_atomic_store(&c->playback.missing, 0);
 
-    pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname));
-    pa_assert_se(c->client = pa_client_new(p->core, __FILE__, cname));
-    c->client->module = p->module;
+    pa_client_new_data_init(&client_data);
+    client_data.module = o->module;
+    client_data.driver = __FILE__;
+    pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
+    pa_proplist_setf(client_data.proplist, PA_PROP_APPLICATION_NAME, "Simple client (%s)", pname);
+    pa_proplist_sets(client_data.proplist, "simple-protocol.peer", pname);
+    c->client = pa_client_new(p->core, &client_data);
+    pa_client_new_data_done(&client_data);
+
+    if (!c->client)
+        goto fail;
+
     c->client->kill = client_kill_cb;
     c->client->userdata = c;
 
     c->client->kill = client_kill_cb;
     c->client->userdata = c;
 
-    if (p->mode & PLAYBACK) {
+    if (o->playback) {
         pa_sink_input_new_data data;
         size_t l;
         pa_sink *sink;
 
         pa_sink_input_new_data data;
         size_t l;
         pa_sink *sink;
 
-        if (!(sink = pa_namereg_get(c->protocol->core, c->protocol->sink_name, PA_NAMEREG_SINK, TRUE))) {
+        if (!(sink = pa_namereg_get(c->protocol->core, o->default_sink, PA_NAMEREG_SINK))) {
             pa_log("Failed to get sink.");
             goto fail;
         }
 
         pa_sink_input_new_data_init(&data);
         data.driver = __FILE__;
             pa_log("Failed to get sink.");
             goto fail;
         }
 
         pa_sink_input_new_data_init(&data);
         data.driver = __FILE__;
-        data.module = p->module;
+        data.module = o->module;
         data.client = c->client;
         data.sink = sink;
         pa_proplist_update(data.proplist, PA_UPDATE_MERGE, c->client->proplist);
         data.client = c->client;
         data.sink = sink;
         pa_proplist_update(data.proplist, PA_UPDATE_MERGE, c->client->proplist);
-        pa_sink_input_new_data_set_sample_spec(&data, &p->sample_spec);
+        pa_sink_input_new_data_set_sample_spec(&data, &o->sample_spec);
 
 
-        c->sink_input = pa_sink_input_new(p->core, &data, 0);
+        pa_sink_input_new(&c->sink_input, p->core, &data, 0);
         pa_sink_input_new_data_done(&data);
 
         if (!c->sink_input) {
         pa_sink_input_new_data_done(&data);
 
         if (!c->sink_input) {
@@ -500,46 +552,49 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
 
         c->sink_input->parent.process_msg = sink_input_process_msg;
         c->sink_input->pop = sink_input_pop_cb;
 
         c->sink_input->parent.process_msg = sink_input_process_msg;
         c->sink_input->pop = sink_input_pop_cb;
+        c->sink_input->process_rewind = sink_input_process_rewind_cb;
+        c->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
         c->sink_input->kill = sink_input_kill_cb;
         c->sink_input->userdata = c;
 
         c->sink_input->kill = sink_input_kill_cb;
         c->sink_input->userdata = c;
 
-        l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
+        pa_sink_input_set_requested_latency(c->sink_input, DEFAULT_SINK_LATENCY);
+
+        l = (size_t) ((double) pa_bytes_per_second(&o->sample_spec)*PLAYBACK_BUFFER_SECONDS);
         c->input_memblockq = pa_memblockq_new(
                 0,
                 l,
         c->input_memblockq = pa_memblockq_new(
                 0,
                 l,
-                0,
-                pa_frame_size(&p->sample_spec),
+                l,
+                pa_frame_size(&o->sample_spec),
                 (size_t) -1,
                 l/PLAYBACK_BUFFER_FRAGMENTS,
                 0,
                 NULL);
                 (size_t) -1,
                 l/PLAYBACK_BUFFER_FRAGMENTS,
                 0,
                 NULL);
-        pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
-        c->playback.fragment_size = l/PLAYBACK_BUFFER_FRAGMENTS;
+        pa_iochannel_socket_set_rcvbuf(io, l);
 
 
-        pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq));
+        pa_atomic_store(&c->playback.missing, (int) pa_memblockq_missing(c->input_memblockq));
 
         pa_sink_input_put(c->sink_input);
     }
 
 
         pa_sink_input_put(c->sink_input);
     }
 
-    if (p->mode & RECORD) {
+    if (o->record) {
         pa_source_output_new_data data;
         size_t l;
         pa_source *source;
 
         pa_source_output_new_data data;
         size_t l;
         pa_source *source;
 
-        if (!(source = pa_namereg_get(c->protocol->core, c->protocol->source_name, PA_NAMEREG_SOURCE, TRUE))) {
+        if (!(source = pa_namereg_get(c->protocol->core, o->default_source, PA_NAMEREG_SOURCE))) {
             pa_log("Failed to get source.");
             goto fail;
         }
 
         pa_source_output_new_data_init(&data);
         data.driver = __FILE__;
             pa_log("Failed to get source.");
             goto fail;
         }
 
         pa_source_output_new_data_init(&data);
         data.driver = __FILE__;
-        data.module = p->module;
+        data.module = o->module;
         data.client = c->client;
         data.source = source;
         pa_proplist_update(data.proplist, PA_UPDATE_MERGE, c->client->proplist);
         data.client = c->client;
         data.source = source;
         pa_proplist_update(data.proplist, PA_UPDATE_MERGE, c->client->proplist);
-        pa_source_output_new_data_set_sample_spec(&data, &p->sample_spec);
+        pa_source_output_new_data_set_sample_spec(&data, &o->sample_spec);
 
 
-        c->source_output = pa_source_output_new(p->core, &data, 0);
+        pa_source_output_new(&c->source_output, p->core, &data, 0);
         pa_source_output_new_data_done(&data);
 
         if (!c->source_output) {
         pa_source_output_new_data_done(&data);
 
         if (!c->source_output) {
@@ -551,22 +606,23 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
         c->source_output->get_latency = source_output_get_latency_cb;
         c->source_output->userdata = c;
 
         c->source_output->get_latency = source_output_get_latency_cb;
         c->source_output->userdata = c;
 
-        l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS);
+        pa_source_output_set_requested_latency(c->source_output, DEFAULT_SOURCE_LATENCY);
+
+        l = (size_t) (pa_bytes_per_second(&o->sample_spec)*RECORD_BUFFER_SECONDS);
         c->output_memblockq = pa_memblockq_new(
                 0,
                 l,
                 0,
         c->output_memblockq = pa_memblockq_new(
                 0,
                 l,
                 0,
-                pa_frame_size(&p->sample_spec),
+                pa_frame_size(&o->sample_spec),
                 1,
                 0,
                 0,
                 NULL);
                 1,
                 0,
                 0,
                 NULL);
-        pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
+        pa_iochannel_socket_set_sndbuf(io, l);
 
         pa_source_output_put(c->source_output);
     }
 
 
         pa_source_output_put(c->source_output);
     }
 
-    pa_iochannel_set_callback(c->io, io_callback, c);
     pa_idxset_put(p->connections, c, NULL);
 
     return;
     pa_idxset_put(p->connections, c, NULL);
 
     return;
@@ -576,73 +632,140 @@ fail:
         connection_unlink(c);
 }
 
         connection_unlink(c);
 }
 
-pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) {
-    pa_protocol_simple* p = NULL;
-    pa_bool_t enable;
+void pa_simple_protocol_disconnect(pa_simple_protocol *p, pa_module *m) {
+    connection *c;
+    void *state = NULL;
 
 
-    pa_assert(core);
-    pa_assert(server);
-    pa_assert(ma);
+    pa_assert(p);
+    pa_assert(m);
 
 
-    p = pa_xnew0(pa_protocol_simple, 1);
-    p->module = m;
-    p->core = core;
-    p->server = server;
+    while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
+        if (c->options->module == m)
+            connection_unlink(c);
+}
+
+static pa_simple_protocol* simple_protocol_new(pa_core *c) {
+    pa_simple_protocol *p;
+
+    pa_assert(c);
+
+    p = pa_xnew(pa_simple_protocol, 1);
+    PA_REFCNT_INIT(p);
+    p->core = c;
     p->connections = pa_idxset_new(NULL, NULL);
 
     p->connections = pa_idxset_new(NULL, NULL);
 
-    p->sample_spec = core->default_sample_spec;
-    if (pa_modargs_get_sample_spec(ma, &p->sample_spec) < 0) {
-        pa_log("Failed to parse sample type specification.");
-        goto fail;
-    }
+    pa_assert_se(pa_shared_set(c, "simple-protocol", p) >= 0);
 
 
-    p->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
-    p->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
+    return p;
+}
 
 
-    enable = FALSE;
-    if (pa_modargs_get_value_boolean(ma, "record", &enable) < 0) {
-        pa_log("record= expects a numeric argument.");
-        goto fail;
-    }
-    p->mode = enable ? RECORD : 0;
+pa_simple_protocol* pa_simple_protocol_get(pa_core *c) {
+    pa_simple_protocol *p;
 
 
-    enable = 1;
-    if (pa_modargs_get_value_boolean(ma, "playback", &enable) < 0) {
-        pa_log("playback= expects a numeric argument.");
-        goto fail;
-    }
-    p->mode |= enable ? PLAYBACK : 0;
+    if ((p = pa_shared_get(c, "simple-protocol")))
+        return pa_simple_protocol_ref(p);
 
 
-    if ((p->mode & (RECORD|PLAYBACK)) == 0) {
-        pa_log("neither playback nor recording enabled for protocol.");
-        goto fail;
-    }
+    return simple_protocol_new(c);
+}
 
 
-    pa_socket_server_set_callback(p->server, on_connection, p);
+pa_simple_protocol* pa_simple_protocol_ref(pa_simple_protocol *p) {
+    pa_assert(p);
+    pa_assert(PA_REFCNT_VALUE(p) >= 1);
+
+    PA_REFCNT_INC(p);
 
     return p;
 
     return p;
+}
 
 
-fail:
-    if (p)
-        pa_protocol_simple_free(p);
+void pa_simple_protocol_unref(pa_simple_protocol *p) {
+    connection *c;
+    pa_assert(p);
+    pa_assert(PA_REFCNT_VALUE(p) >= 1);
 
 
-    return NULL;
+    if (PA_REFCNT_DEC(p) > 0)
+        return;
+
+    while ((c = pa_idxset_first(p->connections, NULL)))
+        connection_unlink(c);
+
+    pa_idxset_free(p->connections, NULL, NULL);
+
+    pa_assert_se(pa_shared_remove(p->core, "simple-protocol") >= 0);
+
+    pa_xfree(p);
 }
 
 }
 
+pa_simple_options* pa_simple_options_new(void) {
+    pa_simple_options *o;
 
 
-void pa_protocol_simple_free(pa_protocol_simple *p) {
-    connection *c;
-    pa_assert(p);
+    o = pa_xnew0(pa_simple_options, 1);
+    PA_REFCNT_INIT(o);
 
 
-    if (p->connections) {
-        while((c = pa_idxset_first(p->connections, NULL)))
-            connection_unlink(c);
+    o->record = FALSE;
+    o->playback = TRUE;
+
+    return o;
+}
+
+pa_simple_options* pa_simple_options_ref(pa_simple_options *o) {
+    pa_assert(o);
+    pa_assert(PA_REFCNT_VALUE(o) >= 1);
+
+    PA_REFCNT_INC(o);
+
+    return o;
+}
+
+void pa_simple_options_unref(pa_simple_options *o) {
+    pa_assert(o);
+    pa_assert(PA_REFCNT_VALUE(o) >= 1);
+
+    if (PA_REFCNT_DEC(o) > 0)
+        return;
+
+    pa_xfree(o->default_sink);
+    pa_xfree(o->default_source);
+
+    pa_xfree(o);
+}
+
+int pa_simple_options_parse(pa_simple_options *o, pa_core *c, pa_modargs *ma) {
+    pa_bool_t enabled;
+
+    pa_assert(o);
+    pa_assert(PA_REFCNT_VALUE(o) >= 1);
+    pa_assert(ma);
 
 
-        pa_idxset_free(p->connections, NULL, NULL);
+    o->sample_spec = c->default_sample_spec;
+    if (pa_modargs_get_sample_spec_and_channel_map(ma, &o->sample_spec, &o->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
+        pa_log("Failed to parse sample type specification.");
+        return -1;
     }
 
     }
 
-    if (p->server)
-        pa_socket_server_unref(p->server);
+    pa_xfree(o->default_source);
+    o->default_source = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
 
 
-    pa_xfree(p);
+    pa_xfree(o->default_sink);
+    o->default_sink = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
+
+    enabled = o->record;
+    if (pa_modargs_get_value_boolean(ma, "record", &enabled) < 0) {
+        pa_log("record= expects a boolean argument.");
+        return -1;
+    }
+    o->record = enabled;
+
+    enabled = o->playback;
+    if (pa_modargs_get_value_boolean(ma, "playback", &enabled) < 0) {
+        pa_log("playback= expects a boolean argument.");
+        return -1;
+    }
+    o->playback = enabled;
+
+    if (!o->playback && !o->record) {
+        pa_log("neither playback nor recording enabled for protocol.");
+        return -1;
+    }
+
+    return 0;
 }
 }