]> code.delx.au - pulseaudio/blobdiff - src/pulsecore/protocol-simple.c
core-util: ensure that we chmod only the dir we ourselves created
[pulseaudio] / src / pulsecore / protocol-simple.c
index 0a7a7acbfa127c5c9dc6f20ed729ee02050b8223..77277e1349325158629708c1b1e9bd5d1517b5d1 100644 (file)
@@ -1,11 +1,11 @@
-/* $Id$ */
-
 /***
   This file is part of PulseAudio.
 
 /***
   This file is part of PulseAudio.
 
+  Copyright 2004-2006 Lennart Poettering
+
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as published
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as published
-  by the Free Software Foundation; either version 2 of the License,
+  by the Free Software Foundation; either version 2.1 of the License,
   or (at your option) any later version.
 
   PulseAudio is distributed in the hope that it will be useful, but
   or (at your option) any later version.
 
   PulseAudio is distributed in the hope that it will be useful, but
@@ -23,7 +23,6 @@
 #include <config.h>
 #endif
 
 #include <config.h>
 #endif
 
-#include <assert.h>
 #include <stdlib.h>
 #include <limits.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <limits.h>
 #include <stdio.h>
@@ -31,6 +30,7 @@
 #include <string.h>
 
 #include <pulse/xmalloc.h>
 #include <string.h>
 
 #include <pulse/xmalloc.h>
+#include <pulse/timeval.h>
 
 #include <pulsecore/sink-input.h>
 #include <pulsecore/source-output.h>
 
 #include <pulsecore/sink-input.h>
 #include <pulsecore/source-output.h>
 #include <pulsecore/namereg.h>
 #include <pulsecore/log.h>
 #include <pulsecore/core-error.h>
 #include <pulsecore/namereg.h>
 #include <pulsecore/log.h>
 #include <pulsecore/core-error.h>
+#include <pulsecore/atomic.h>
+#include <pulsecore/thread-mq.h>
+#include <pulsecore/core-util.h>
+#include <pulsecore/shared.h>
 
 #include "protocol-simple.h"
 
 /* Don't allow more than this many concurrent connections */
 #define MAX_CONNECTIONS 10
 
 
 #include "protocol-simple.h"
 
 /* Don't allow more than this many concurrent connections */
 #define MAX_CONNECTIONS 10
 
-struct connection {
-    pa_protocol_simple *protocol;
+typedef struct connection {
+    pa_msgobject parent;
+    pa_simple_protocol *protocol;
+    pa_simple_options *options;
     pa_iochannel *io;
     pa_sink_input *sink_input;
     pa_source_output *source_output;
     pa_client *client;
     pa_memblockq *input_memblockq, *output_memblockq;
     pa_iochannel *io;
     pa_sink_input *sink_input;
     pa_source_output *source_output;
     pa_client *client;
     pa_memblockq *input_memblockq, *output_memblockq;
-    pa_defer_event *defer_event;
 
 
-    int dead;
+    pa_bool_t dead;
 
     struct {
         pa_memblock *current_memblock;
 
     struct {
         pa_memblock *current_memblock;
-        size_t memblock_index, fragment_size;
+        size_t memblock_index;
+        pa_atomic_t missing;
+        pa_bool_t underrun;
     } playback;
     } playback;
-};
+} connection;
+
+PA_DEFINE_PRIVATE_CLASS(connection, pa_msgobject);
+#define CONNECTION(o) (connection_cast(o))
+
+struct pa_simple_protocol {
+    PA_REFCNT_DECLARE;
 
 
-struct pa_protocol_simple {
-    pa_module *module;
     pa_core *core;
     pa_core *core;
-    pa_socket_server*server;
     pa_idxset *connections;
     pa_idxset *connections;
-    enum {
-        RECORD = 1,
-        PLAYBACK = 2,
-        DUPLEX = 3
-    } mode;
-    pa_sample_spec sample_spec;
-    char *source_name, *sink_name;
+};
+
+enum {
+    SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
+    SINK_INPUT_MESSAGE_DISABLE_PREBUF /* disabled prebuf, get playback started. */
+};
+
+enum {
+    CONNECTION_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
+    CONNECTION_MESSAGE_POST_DATA,         /* data from source output to main loop */
+    CONNECTION_MESSAGE_UNLINK_CONNECTION    /* Please drop a aconnection now */
 };
 
 #define PLAYBACK_BUFFER_SECONDS (.5)
 #define PLAYBACK_BUFFER_FRAGMENTS (10)
 #define RECORD_BUFFER_SECONDS (5)
 };
 
 #define PLAYBACK_BUFFER_SECONDS (.5)
 #define PLAYBACK_BUFFER_FRAGMENTS (10)
 #define RECORD_BUFFER_SECONDS (5)
-#define RECORD_BUFFER_FRAGMENTS (100)
+#define DEFAULT_SINK_LATENCY (300*PA_USEC_PER_MSEC)
+#define DEFAULT_SOURCE_LATENCY (300*PA_USEC_PER_MSEC)
 
 
-static void connection_free(struct connection *c) {
-    assert(c);
+static void connection_unlink(connection *c) {
+    pa_assert(c);
 
 
-    pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
+    if (!c->protocol)
+        return;
+
+    if (c->options) {
+        pa_simple_options_unref(c->options);
+        c->options = NULL;
+    }
 
 
-    if (c->playback.current_memblock)
-        pa_memblock_unref(c->playback.current_memblock);
     if (c->sink_input) {
     if (c->sink_input) {
-        pa_sink_input_disconnect(c->sink_input);
+        pa_sink_input_unlink(c->sink_input);
         pa_sink_input_unref(c->sink_input);
         pa_sink_input_unref(c->sink_input);
+        c->sink_input = NULL;
     }
     }
+
     if (c->source_output) {
     if (c->source_output) {
-        pa_source_output_disconnect(c->source_output);
+        pa_source_output_unlink(c->source_output);
         pa_source_output_unref(c->source_output);
         pa_source_output_unref(c->source_output);
+        c->source_output = NULL;
     }
     }
-    if (c->client)
+
+    if (c->client) {
         pa_client_free(c->client);
         pa_client_free(c->client);
-    if (c->io)
+        c->client = NULL;
+    }
+
+    if (c->io) {
         pa_iochannel_free(c->io);
         pa_iochannel_free(c->io);
+        c->io = NULL;
+    }
+
+    pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
+    c->protocol = NULL;
+    connection_unref(c);
+}
+
+static void connection_free(pa_object *o) {
+    connection *c = CONNECTION(o);
+    pa_assert(c);
+
+    if (c->playback.current_memblock)
+        pa_memblock_unref(c->playback.current_memblock);
+
     if (c->input_memblockq)
         pa_memblockq_free(c->input_memblockq);
     if (c->output_memblockq)
         pa_memblockq_free(c->output_memblockq);
     if (c->input_memblockq)
         pa_memblockq_free(c->input_memblockq);
     if (c->output_memblockq)
         pa_memblockq_free(c->output_memblockq);
-    if (c->defer_event)
-        c->protocol->core->mainloop->defer_free(c->defer_event);
+
     pa_xfree(c);
 }
 
     pa_xfree(c);
 }
 
-static int do_read(struct connection *c) {
+static int do_read(connection *c) {
     pa_memchunk chunk;
     ssize_t r;
     size_t l;
     pa_memchunk chunk;
     ssize_t r;
     size_t l;
+    void *p;
+    size_t space = 0;
 
 
-    if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq)))
+    connection_assert_ref(c);
+
+    if (!c->sink_input || (l = (size_t) pa_atomic_load(&c->playback.missing)) <= 0)
         return 0;
 
         return 0;
 
-    if (l > c->playback.fragment_size)
-        l = c->playback.fragment_size;
+    if (c->playback.current_memblock) {
 
 
-    if (c->playback.current_memblock)
-        if (c->playback.current_memblock->length - c->playback.memblock_index < l) {
+        space = pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index;
+
+        if (space <= 0) {
             pa_memblock_unref(c->playback.current_memblock);
             c->playback.current_memblock = NULL;
             pa_memblock_unref(c->playback.current_memblock);
             c->playback.current_memblock = NULL;
-            c->playback.memblock_index = 0;
         }
         }
+    }
 
     if (!c->playback.current_memblock) {
 
     if (!c->playback.current_memblock) {
-        c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2);
-        assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
+        pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, (size_t) -1));
         c->playback.memblock_index = 0;
         c->playback.memblock_index = 0;
+
+        space = pa_memblock_get_length(c->playback.current_memblock);
     }
 
     }
 
-    if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) {
+    if (l > space)
+        l = space;
+
+    p = pa_memblock_acquire(c->playback.current_memblock);
+    r = pa_iochannel_read(c->io, (uint8_t*) p + c->playback.memblock_index, l);
+    pa_memblock_release(c->playback.current_memblock);
+
+    if (r <= 0) {
+
+        if (r < 0 && (errno == EINTR || errno == EAGAIN))
+            return 0;
+
         pa_log_debug("read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno));
         return -1;
     }
 
     chunk.memblock = c->playback.current_memblock;
     chunk.index = c->playback.memblock_index;
         pa_log_debug("read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno));
         return -1;
     }
 
     chunk.memblock = c->playback.current_memblock;
     chunk.index = c->playback.memblock_index;
-    chunk.length = r;
-    assert(chunk.memblock);
+    chunk.length = (size_t) r;
 
 
-    c->playback.memblock_index += r;
+    c->playback.memblock_index += (size_t) r;
 
 
-    assert(c->input_memblockq);
-    pa_memblockq_push_align(c->input_memblockq, &chunk);
-    assert(c->sink_input);
-    pa_sink_notify(c->sink_input->sink);
+    pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL);
+    pa_atomic_sub(&c->playback.missing, (int) r);
 
     return 0;
 }
 
 
     return 0;
 }
 
-static int do_write(struct connection *c) {
+static int do_write(connection *c) {
     pa_memchunk chunk;
     ssize_t r;
     pa_memchunk chunk;
     ssize_t r;
+    void *p;
+
+    connection_assert_ref(c);
 
     if (!c->source_output)
         return 0;
 
 
     if (!c->source_output)
         return 0;
 
-    assert(c->output_memblockq);
-    if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)
+    if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0) {
+/*         pa_log("peek failed"); */
         return 0;
         return 0;
+    }
+
+    pa_assert(chunk.memblock);
+    pa_assert(chunk.length);
 
 
-    assert(chunk.memblock && chunk.length);
+    p = pa_memblock_acquire(chunk.memblock);
+    r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length);
+    pa_memblock_release(chunk.memblock);
+
+    pa_memblock_unref(chunk.memblock);
+
+    if (r < 0) {
+
+        if (errno == EINTR || errno == EAGAIN)
+            return 0;
 
 
-    if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) {
-        pa_memblock_unref(chunk.memblock);
         pa_log("write(): %s", pa_cstrerror(errno));
         return -1;
     }
 
         pa_log("write(): %s", pa_cstrerror(errno));
         return -1;
     }
 
-    pa_memblockq_drop(c->output_memblockq, &chunk, r);
-    pa_memblock_unref(chunk.memblock);
-
-    pa_source_notify(c->source_output->source);
+    pa_memblockq_drop(c->output_memblockq, (size_t) r);
 
     return 0;
 }
 
 
     return 0;
 }
 
-static void do_work(struct connection *c) {
-    assert(c);
-
-    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
-    c->protocol->core->mainloop->defer_enable(c->defer_event, 0);
+static void do_work(connection *c) {
+    connection_assert_ref(c);
 
     if (c->dead)
         return;
 
 
     if (c->dead)
         return;
 
-    if (pa_iochannel_is_readable(c->io)) {
+    if (pa_iochannel_is_readable(c->io))
         if (do_read(c) < 0)
             goto fail;
         if (do_read(c) < 0)
             goto fail;
-    } else if (pa_iochannel_is_hungup(c->io))
+
+    if (!c->sink_input && pa_iochannel_is_hungup(c->io))
         goto fail;
 
         goto fail;
 
-    if (pa_iochannel_is_writable(c->io)) {
+    if (pa_iochannel_is_writable(c->io))
         if (do_write(c) < 0)
             goto fail;
         if (do_write(c) < 0)
             goto fail;
-    }
 
     return;
 
 fail:
 
     if (c->sink_input) {
 
     return;
 
 fail:
 
     if (c->sink_input) {
-        c->dead = 1;
+
+        /* If there is a sink input, we first drain what we already have read before shutting down the connection */
+        c->dead = TRUE;
 
         pa_iochannel_free(c->io);
         c->io = NULL;
 
 
         pa_iochannel_free(c->io);
         c->io = NULL;
 
-        pa_memblockq_prebuf_disable(c->input_memblockq);
-        pa_sink_notify(c->sink_input->sink);
+        pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL);
     } else
     } else
-        connection_free(c);
+        connection_unlink(c);
+}
+
+static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
+    connection *c = CONNECTION(o);
+    connection_assert_ref(c);
+
+    if (!c->protocol)
+        return -1;
+
+    switch (code) {
+        case CONNECTION_MESSAGE_REQUEST_DATA:
+            do_work(c);
+            break;
+
+        case CONNECTION_MESSAGE_POST_DATA:
+/*             pa_log("got data %u", chunk->length); */
+            pa_memblockq_push_align(c->output_memblockq, chunk);
+            do_work(c);
+            break;
+
+        case CONNECTION_MESSAGE_UNLINK_CONNECTION:
+            connection_unlink(c);
+            break;
+    }
+
+    return 0;
 }
 
 /*** sink_input callbacks ***/
 
 }
 
 /*** sink_input callbacks ***/
 
-static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
-    struct connection*c;
-    assert(i && i->userdata && chunk);
-    c = i->userdata;
+/* Called from thread context */
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
+    pa_sink_input *i = PA_SINK_INPUT(o);
+    connection*c;
+
+    pa_sink_input_assert_ref(i);
+    c = CONNECTION(i->userdata);
+    connection_assert_ref(c);
+
+    switch (code) {
+
+        case SINK_INPUT_MESSAGE_POST_DATA: {
+            pa_assert(chunk);
+
+            /* New data from the main loop */
+            pa_memblockq_push_align(c->input_memblockq, chunk);
+
+            if (pa_memblockq_is_readable(c->input_memblockq) && c->playback.underrun) {
+                pa_log_debug("Requesting rewind due to end of underrun.");
+                pa_sink_input_request_rewind(c->sink_input, 0, FALSE, TRUE, FALSE);
+            }
+
+/*             pa_log("got data, %u", pa_memblockq_get_length(c->input_memblockq)); */
+
+            return 0;
+        }
+
+        case SINK_INPUT_MESSAGE_DISABLE_PREBUF:
+            pa_memblockq_prebuf_disable(c->input_memblockq);
+            return 0;
+
+        case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
+            pa_usec_t *r = userdata;
+
+            *r = pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
+
+            /* Fall through, the default handler will add in the extra
+             * latency added by the resampler */
+        }
+
+        default:
+            return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
+    }
+}
+
+/* Called from thread context */
+static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
+    connection *c;
+
+    pa_sink_input_assert_ref(i);
+    c = CONNECTION(i->userdata);
+    connection_assert_ref(c);
+    pa_assert(chunk);
 
     if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) {
 
 
     if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) {
 
-        if (c->dead)
-            connection_free(c);
+        c->playback.underrun = TRUE;
+
+        if (c->dead && pa_sink_input_safe_to_remove(i))
+            pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL);
 
         return -1;
 
         return -1;
-    }
+    } else {
+        size_t m;
 
 
-    return 0;
+        chunk->length = PA_MIN(length, chunk->length);
+
+        c->playback.underrun = FALSE;
+
+        pa_memblockq_drop(c->input_memblockq, chunk->length);
+        m = pa_memblockq_pop_missing(c->input_memblockq);
+
+        if (m > 0)
+            if (pa_atomic_add(&c->playback.missing, (int) m) <= 0)
+                pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
+
+        return 0;
+    }
 }
 
 }
 
-static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) {
-    struct connection*c = i->userdata;
-    assert(i && c && length);
+/* Called from thread context */
+static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
+    connection *c;
 
 
-    pa_memblockq_drop(c->input_memblockq, chunk, length);
+    pa_sink_input_assert_ref(i);
+    c = CONNECTION(i->userdata);
+    connection_assert_ref(c);
 
 
-    /* do something */
-    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
-    c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
+    /* If we are in an underrun, then we don't rewind */
+    if (i->thread_info.underrun_for > 0)
+        return;
+
+    pa_memblockq_rewind(c->input_memblockq, nbytes);
 }
 
 }
 
-static void sink_input_kill_cb(pa_sink_input *i) {
-    assert(i && i->userdata);
-    connection_free((struct connection *) i->userdata);
+/* Called from thread context */
+static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
+    connection *c;
+
+    pa_sink_input_assert_ref(i);
+    c = CONNECTION(i->userdata);
+    connection_assert_ref(c);
+
+    pa_memblockq_set_maxrewind(c->input_memblockq, nbytes);
 }
 
 }
 
+/* Called from main context */
+static void sink_input_kill_cb(pa_sink_input *i) {
+    pa_sink_input_assert_ref(i);
 
 
-static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i) {
-    struct connection*c = i->userdata;
-    assert(i && c);
-    return pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
+    connection_unlink(CONNECTION(i->userdata));
 }
 
 /*** source_output callbacks ***/
 
 }
 
 /*** source_output callbacks ***/
 
+/* Called from thread context */
 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
-    struct connection *c = o->userdata;
-    assert(o && c && chunk);
+    connection *c;
 
 
-    pa_memblockq_push(c->output_memblockq, chunk);
+    pa_source_output_assert_ref(o);
+    c = CONNECTION(o->userdata);
+    pa_assert(c);
+    pa_assert(chunk);
 
 
-    /* do something */
-    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
-    c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
+    pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
 }
 
 }
 
+/* Called from main context */
 static void source_output_kill_cb(pa_source_output *o) {
 static void source_output_kill_cb(pa_source_output *o) {
-    assert(o && o->userdata);
-    connection_free((struct connection *) o->userdata);
+    pa_source_output_assert_ref(o);
+
+    connection_unlink(CONNECTION(o->userdata));
 }
 
 }
 
+/* Called from main context */
 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
-    struct connection*c = o->userdata;
-    assert(o && c);
+    connection*c;
+
+    pa_source_output_assert_ref(o);
+    c = CONNECTION(o->userdata);
+    pa_assert(c);
+
     return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
 }
 
 /*** client callbacks ***/
 
     return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
 }
 
 /*** client callbacks ***/
 
-static void client_kill_cb(pa_client *c) {
-    assert(c && c->userdata);
-    connection_free((struct connection *) c->userdata);
+static void client_kill_cb(pa_client *client) {
+    connection*c;
+
+    pa_assert(client);
+    c = CONNECTION(client->userdata);
+    pa_assert(c);
+
+    connection_unlink(c);
 }
 
 /*** pa_iochannel callbacks ***/
 
 static void io_callback(pa_iochannel*io, void *userdata) {
 }
 
 /*** pa_iochannel callbacks ***/
 
 static void io_callback(pa_iochannel*io, void *userdata) {
-    struct connection *c = userdata;
-    assert(io && c && c->io == io);
+    connection *c = CONNECTION(userdata);
 
 
-    do_work(c);
-}
-
-/*** fixed callback ***/
-
-static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *userdata) {
-    struct connection *c = userdata;
-    assert(a && c && c->defer_event == e);
+    connection_assert_ref(c);
+    pa_assert(io);
 
     do_work(c);
 }
 
 /*** socket_server callbacks ***/
 
 
     do_work(c);
 }
 
 /*** socket_server callbacks ***/
 
-static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) {
-    pa_protocol_simple *p = userdata;
-    struct connection *c = NULL;
-    char cname[256];
-    assert(s && io && p);
+void pa_simple_protocol_connect(pa_simple_protocol *p, pa_iochannel *io, pa_simple_options *o) {
+    connection *c = NULL;
+    char pname[128];
+    pa_client_new_data client_data;
+
+    pa_assert(p);
+    pa_assert(io);
+    pa_assert(o);
 
     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
         pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
 
     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
         pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
@@ -320,75 +491,116 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
         return;
     }
 
         return;
     }
 
-    c = pa_xmalloc(sizeof(struct connection));
+    c = pa_msgobject_new(connection);
+    c->parent.parent.free = connection_free;
+    c->parent.process_msg = connection_process_msg;
     c->io = io;
     c->io = io;
+    pa_iochannel_set_callback(c->io, io_callback, c);
+
     c->sink_input = NULL;
     c->source_output = NULL;
     c->sink_input = NULL;
     c->source_output = NULL;
-    c->defer_event = NULL;
     c->input_memblockq = c->output_memblockq = NULL;
     c->protocol = p;
     c->input_memblockq = c->output_memblockq = NULL;
     c->protocol = p;
+    c->options = pa_simple_options_ref(o);
     c->playback.current_memblock = NULL;
     c->playback.memblock_index = 0;
     c->playback.current_memblock = NULL;
     c->playback.memblock_index = 0;
-    c->playback.fragment_size = 0;
-    c->dead = 0;
+    c->dead = FALSE;
+    c->playback.underrun = TRUE;
+    pa_atomic_store(&c->playback.missing, 0);
+
+    pa_client_new_data_init(&client_data);
+    client_data.module = o->module;
+    client_data.driver = __FILE__;
+    pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
+    pa_proplist_setf(client_data.proplist, PA_PROP_APPLICATION_NAME, "Simple client (%s)", pname);
+    pa_proplist_sets(client_data.proplist, "simple-protocol.peer", pname);
+    c->client = pa_client_new(p->core, &client_data);
+    pa_client_new_data_done(&client_data);
+
+    if (!c->client)
+        goto fail;
 
 
-    pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname));
-    c->client = pa_client_new(p->core, __FILE__, cname);
-    assert(c->client);
-    c->client->owner = p->module;
     c->client->kill = client_kill_cb;
     c->client->userdata = c;
 
     c->client->kill = client_kill_cb;
     c->client->userdata = c;
 
-    if (p->mode & PLAYBACK) {
+    if (o->playback) {
         pa_sink_input_new_data data;
         pa_sink_input_new_data data;
+        pa_memchunk silence;
         size_t l;
         size_t l;
+        pa_sink *sink;
+
+        if (!(sink = pa_namereg_get(c->protocol->core, o->default_sink, PA_NAMEREG_SINK))) {
+            pa_log("Failed to get sink.");
+            goto fail;
+        }
 
         pa_sink_input_new_data_init(&data);
         data.driver = __FILE__;
 
         pa_sink_input_new_data_init(&data);
         data.driver = __FILE__;
-        data.name = c->client->name;
-        pa_sink_input_new_data_set_sample_spec(&data, &p->sample_spec);
-        data.module = p->module;
+        data.module = o->module;
         data.client = c->client;
         data.client = c->client;
+        data.sink = sink;
+        pa_proplist_update(data.proplist, PA_UPDATE_MERGE, c->client->proplist);
+        pa_sink_input_new_data_set_sample_spec(&data, &o->sample_spec);
+
+        pa_sink_input_new(&c->sink_input, p->core, &data);
+        pa_sink_input_new_data_done(&data);
 
 
-        if (!(c->sink_input = pa_sink_input_new(p->core, &data, 0))) {
+        if (!c->sink_input) {
             pa_log("Failed to create sink input.");
             goto fail;
         }
 
             pa_log("Failed to create sink input.");
             goto fail;
         }
 
-        c->sink_input->peek = sink_input_peek_cb;
-        c->sink_input->drop = sink_input_drop_cb;
+        c->sink_input->parent.process_msg = sink_input_process_msg;
+        c->sink_input->pop = sink_input_pop_cb;
+        c->sink_input->process_rewind = sink_input_process_rewind_cb;
+        c->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
         c->sink_input->kill = sink_input_kill_cb;
         c->sink_input->kill = sink_input_kill_cb;
-        c->sink_input->get_latency = sink_input_get_latency_cb;
         c->sink_input->userdata = c;
 
         c->sink_input->userdata = c;
 
-        l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
+        pa_sink_input_set_requested_latency(c->sink_input, DEFAULT_SINK_LATENCY);
+
+        l = (size_t) ((double) pa_bytes_per_second(&o->sample_spec)*PLAYBACK_BUFFER_SECONDS);
+        pa_sink_input_get_silence(c->sink_input, &silence);
         c->input_memblockq = pa_memblockq_new(
                 0,
                 l,
         c->input_memblockq = pa_memblockq_new(
                 0,
                 l,
-                0,
-                pa_frame_size(&p->sample_spec),
+                l,
+                pa_frame_size(&o->sample_spec),
                 (size_t) -1,
                 l/PLAYBACK_BUFFER_FRAGMENTS,
                 (size_t) -1,
                 l/PLAYBACK_BUFFER_FRAGMENTS,
-                NULL);
-        assert(c->input_memblockq);
-        pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
-        c->playback.fragment_size = l/10;
+                0,
+                &silence);
+        pa_memblock_unref(silence.memblock);
+
+        pa_iochannel_socket_set_rcvbuf(io, l);
 
 
-        pa_sink_notify(c->sink_input->sink);
+        pa_atomic_store(&c->playback.missing, (int) pa_memblockq_pop_missing(c->input_memblockq));
+
+        pa_sink_input_put(c->sink_input);
     }
 
     }
 
-    if (p->mode & RECORD) {
+    if (o->record) {
         pa_source_output_new_data data;
         size_t l;
         pa_source_output_new_data data;
         size_t l;
+        pa_source *source;
+
+        if (!(source = pa_namereg_get(c->protocol->core, o->default_source, PA_NAMEREG_SOURCE))) {
+            pa_log("Failed to get source.");
+            goto fail;
+        }
 
         pa_source_output_new_data_init(&data);
         data.driver = __FILE__;
 
         pa_source_output_new_data_init(&data);
         data.driver = __FILE__;
-        data.name = c->client->name;
-        pa_source_output_new_data_set_sample_spec(&data, &p->sample_spec);
-        data.module = p->module;
+        data.module = o->module;
         data.client = c->client;
         data.client = c->client;
+        data.source = source;
+        pa_proplist_update(data.proplist, PA_UPDATE_MERGE, c->client->proplist);
+        pa_source_output_new_data_set_sample_spec(&data, &o->sample_spec);
+
+        pa_source_output_new(&c->source_output, p->core, &data);
+        pa_source_output_new_data_done(&data);
 
 
-        if (!(c->source_output = pa_source_output_new(p->core, &data, 0))) {
+        if (!c->source_output) {
             pa_log("Failed to create source output.");
             goto fail;
         }
             pa_log("Failed to create source output.");
             goto fail;
         }
@@ -397,96 +609,165 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
         c->source_output->get_latency = source_output_get_latency_cb;
         c->source_output->userdata = c;
 
         c->source_output->get_latency = source_output_get_latency_cb;
         c->source_output->userdata = c;
 
-        l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS);
+        pa_source_output_set_requested_latency(c->source_output, DEFAULT_SOURCE_LATENCY);
+
+        l = (size_t) (pa_bytes_per_second(&o->sample_spec)*RECORD_BUFFER_SECONDS);
         c->output_memblockq = pa_memblockq_new(
                 0,
                 l,
                 0,
         c->output_memblockq = pa_memblockq_new(
                 0,
                 l,
                 0,
-                pa_frame_size(&p->sample_spec),
+                pa_frame_size(&o->sample_spec),
                 1,
                 0,
                 1,
                 0,
+                0,
                 NULL);
                 NULL);
-        pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
-        pa_source_notify(c->source_output->source);
+        pa_iochannel_socket_set_sndbuf(io, l);
+
+        pa_source_output_put(c->source_output);
     }
 
     }
 
-    pa_iochannel_set_callback(c->io, io_callback, c);
     pa_idxset_put(p->connections, c, NULL);
 
     pa_idxset_put(p->connections, c, NULL);
 
-    c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c);
-    assert(c->defer_event);
-    p->core->mainloop->defer_enable(c->defer_event, 0);
-
     return;
 
 fail:
     return;
 
 fail:
-    if (c)
-        connection_free(c);
+    connection_unlink(c);
 }
 
 }
 
-pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) {
-    pa_protocol_simple* p = NULL;
-    int enable;
-    assert(core && server && ma);
+void pa_simple_protocol_disconnect(pa_simple_protocol *p, pa_module *m) {
+    connection *c;
+    void *state = NULL;
+
+    pa_assert(p);
+    pa_assert(m);
 
 
-    p = pa_xmalloc0(sizeof(pa_protocol_simple));
-    p->module = m;
-    p->core = core;
-    p->server = server;
+    while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
+        if (c->options->module == m)
+            connection_unlink(c);
+}
+
+static pa_simple_protocol* simple_protocol_new(pa_core *c) {
+    pa_simple_protocol *p;
+
+    pa_assert(c);
+
+    p = pa_xnew(pa_simple_protocol, 1);
+    PA_REFCNT_INIT(p);
+    p->core = c;
     p->connections = pa_idxset_new(NULL, NULL);
 
     p->connections = pa_idxset_new(NULL, NULL);
 
-    p->sample_spec = core->default_sample_spec;
-    if (pa_modargs_get_sample_spec(ma, &p->sample_spec) < 0) {
-        pa_log("Failed to parse sample type specification.");
-        goto fail;
-    }
+    pa_assert_se(pa_shared_set(c, "simple-protocol", p) >= 0);
 
 
-    p->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
-    p->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
+    return p;
+}
 
 
-    enable = 0;
-    if (pa_modargs_get_value_boolean(ma, "record", &enable) < 0) {
-        pa_log("record= expects a numeric argument.");
-        goto fail;
-    }
-    p->mode = enable ? RECORD : 0;
+pa_simple_protocol* pa_simple_protocol_get(pa_core *c) {
+    pa_simple_protocol *p;
 
 
-    enable = 1;
-    if (pa_modargs_get_value_boolean(ma, "playback", &enable) < 0) {
-        pa_log("playback= expects a numeric argument.");
-        goto fail;
-    }
-    p->mode |= enable ? PLAYBACK : 0;
+    if ((p = pa_shared_get(c, "simple-protocol")))
+        return pa_simple_protocol_ref(p);
 
 
-    if ((p->mode & (RECORD|PLAYBACK)) == 0) {
-        pa_log("neither playback nor recording enabled for protocol.");
-        goto fail;
-    }
+    return simple_protocol_new(c);
+}
 
 
-    pa_socket_server_set_callback(p->server, on_connection, p);
+pa_simple_protocol* pa_simple_protocol_ref(pa_simple_protocol *p) {
+    pa_assert(p);
+    pa_assert(PA_REFCNT_VALUE(p) >= 1);
 
 
-    return p;
+    PA_REFCNT_INC(p);
 
 
-fail:
-    if (p)
-        pa_protocol_simple_free(p);
-    return NULL;
+    return p;
 }
 
 }
 
+void pa_simple_protocol_unref(pa_simple_protocol *p) {
+    connection *c;
+    pa_assert(p);
+    pa_assert(PA_REFCNT_VALUE(p) >= 1);
+
+    if (PA_REFCNT_DEC(p) > 0)
+        return;
 
 
-void pa_protocol_simple_free(pa_protocol_simple *p) {
-    struct connection *c;
-    assert(p);
+    while ((c = pa_idxset_first(p->connections, NULL)))
+        connection_unlink(c);
 
 
-    if (p->connections) {
-        while((c = pa_idxset_first(p->connections, NULL)))
-            connection_free(c);
+    pa_idxset_free(p->connections, NULL, NULL);
 
 
-        pa_idxset_free(p->connections, NULL, NULL);
-    }
+    pa_assert_se(pa_shared_remove(p->core, "simple-protocol") >= 0);
 
 
-    if (p->server)
-        pa_socket_server_unref(p->server);
     pa_xfree(p);
 }
 
     pa_xfree(p);
 }
 
+pa_simple_options* pa_simple_options_new(void) {
+    pa_simple_options *o;
+
+    o = pa_xnew0(pa_simple_options, 1);
+    PA_REFCNT_INIT(o);
+
+    o->record = FALSE;
+    o->playback = TRUE;
+
+    return o;
+}
+
+pa_simple_options* pa_simple_options_ref(pa_simple_options *o) {
+    pa_assert(o);
+    pa_assert(PA_REFCNT_VALUE(o) >= 1);
+
+    PA_REFCNT_INC(o);
+
+    return o;
+}
+
+void pa_simple_options_unref(pa_simple_options *o) {
+    pa_assert(o);
+    pa_assert(PA_REFCNT_VALUE(o) >= 1);
+
+    if (PA_REFCNT_DEC(o) > 0)
+        return;
+
+    pa_xfree(o->default_sink);
+    pa_xfree(o->default_source);
+
+    pa_xfree(o);
+}
+
+int pa_simple_options_parse(pa_simple_options *o, pa_core *c, pa_modargs *ma) {
+    pa_bool_t enabled;
+
+    pa_assert(o);
+    pa_assert(PA_REFCNT_VALUE(o) >= 1);
+    pa_assert(ma);
+
+    o->sample_spec = c->default_sample_spec;
+    if (pa_modargs_get_sample_spec_and_channel_map(ma, &o->sample_spec, &o->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
+        pa_log("Failed to parse sample type specification.");
+        return -1;
+    }
+
+    pa_xfree(o->default_source);
+    o->default_source = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
+
+    pa_xfree(o->default_sink);
+    o->default_sink = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
+
+    enabled = o->record;
+    if (pa_modargs_get_value_boolean(ma, "record", &enabled) < 0) {
+        pa_log("record= expects a boolean argument.");
+        return -1;
+    }
+    o->record = enabled;
+
+    enabled = o->playback;
+    if (pa_modargs_get_value_boolean(ma, "playback", &enabled) < 0) {
+        pa_log("playback= expects a boolean argument.");
+        return -1;
+    }
+    o->playback = enabled;
+
+    if (!o->playback && !o->record) {
+        pa_log("neither playback nor recording enabled for protocol.");
+        return -1;
+    }
+
+    return 0;
+}