]> code.delx.au - pulseaudio/commitdiff
A lot of updates, all necessary to get the native protocol ported:
authorLennart Poettering <lennart@poettering.net>
Tue, 31 Jul 2007 22:44:53 +0000 (22:44 +0000)
committerLennart Poettering <lennart@poettering.net>
Tue, 31 Jul 2007 22:44:53 +0000 (22:44 +0000)
* add an int64_t argument to pa_asyncmsgq because it is very difficult to pass 64 values otherwise
* simplify subclassing in pa_object
* s/drop/unlink/ at some places
* port the native protocol to the lock-free core (not tested, compiles fine)
* move synchronisation of playback streams into pa_sink_input
* add "start_corked" field to pa_sink_input_new_data
* allow casting of NULL values in pa_object

git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1562 fefdeb5f-60dc-0310-8127-8f9354f1896f

27 files changed:
src/Makefile.am
src/modules/module-alsa-sink.c
src/modules/module-alsa-source.c
src/modules/module-null-sink.c
src/modules/module-oss.c
src/modules/module-pipe-sink.c
src/modules/module-pipe-source.c
src/pulsecore/asyncmsgq.c
src/pulsecore/asyncmsgq.h
src/pulsecore/core.c
src/pulsecore/msgobject.c
src/pulsecore/msgobject.h
src/pulsecore/native-common.h
src/pulsecore/object.c
src/pulsecore/object.h
src/pulsecore/protocol-native.c
src/pulsecore/protocol-simple.c
src/pulsecore/sink-input.c
src/pulsecore/sink-input.h
src/pulsecore/sink.c
src/pulsecore/sink.h
src/pulsecore/sound-file-stream.c
src/pulsecore/source-output.c
src/pulsecore/source-output.h
src/pulsecore/source.c
src/pulsecore/source.h
src/tests/asyncmsgq-test.c

index 1afe6d6e32f7864a4cb83c5d685d19a58dc396c1..4083ea55c518bc6124740fff7f76d5207fc77b48 100644 (file)
@@ -722,10 +722,10 @@ modlibexec_LTLIBRARIES = \
                libauthkey-prop.la \
                libstrlist.la \
                libprotocol-simple.la \
-               libprotocol-http.la 
+               libprotocol-http.la \
+               libprotocol-native.la
 
 #              libprotocol-esound.la
-#              libprotocol-native.la
 
 # We need to emulate sendmsg/recvmsg to support this on Win32
 if !OS_IS_WIN32
@@ -879,11 +879,10 @@ modlibexec_LTLIBRARIES += \
                module-volume-restore.la \
                module-rescue-streams.la \
                module-http-protocol-tcp.la \
-               module-sine.la
-
+               module-sine.la \
+               module-native-protocol-tcp.la \
+               module-native-protocol-fd.la
 #              module-esound-protocol-tcp.la \
-#              module-native-protocol-tcp.la \
-#              module-native-protocol-fd.la \
 #              module-combine.la \
 #              module-tunnel-sink.la \
 #              module-tunnel-source.la \
@@ -899,10 +898,10 @@ modlibexec_LTLIBRARIES += \
 if HAVE_AF_UNIX
 modlibexec_LTLIBRARIES += \
                module-cli-protocol-unix.la \
-               module-simple-protocol-unix.la
-               module-http-protocol-unix.la
-#              module-esound-protocol-unix.la \
-#              module-native-protocol-unix.la
+               module-simple-protocol-unix.la \
+               module-http-protocol-unix.la \
+               module-native-protocol-unix.la
+#              module-esound-protocol-unix.la 
 endif
 
 if HAVE_MKFIFO
@@ -1083,20 +1082,20 @@ module_http_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-h
 
 # Native protocol
 
-#module_native_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c
-#module_native_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
-#module_native_protocol_tcp_la_LDFLAGS = -module -avoid-version
-#module_native_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la
+module_native_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c
+module_native_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
+module_native_protocol_tcp_la_LDFLAGS = -module -avoid-version
+module_native_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la
 
-#module_native_protocol_unix_la_SOURCES = modules/module-protocol-stub.c
-#module_native_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
-#module_native_protocol_unix_la_LDFLAGS = -module -avoid-version
-#module_native_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la
+module_native_protocol_unix_la_SOURCES = modules/module-protocol-stub.c
+module_native_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
+module_native_protocol_unix_la_LDFLAGS = -module -avoid-version
+module_native_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la
 
-#module_native_protocol_fd_la_SOURCES = modules/module-native-protocol-fd.c
-#module_native_protocol_fd_la_CFLAGS = $(AM_CFLAGS)
-#module_native_protocol_fd_la_LDFLAGS = -module -avoid-version
-#module_native_protocol_fd_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la libiochannel.la
+module_native_protocol_fd_la_SOURCES = modules/module-native-protocol-fd.c
+module_native_protocol_fd_la_CFLAGS = $(AM_CFLAGS)
+module_native_protocol_fd_la_LDFLAGS = -module -avoid-version
+module_native_protocol_fd_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la libiochannel.la
 
 # EsounD protocol
 
index 9ca881d103c238cff28d25eedbd1580827cacdf8..551bad89a899f81282d4df5fe41d4d37ea01a174 100644 (file)
@@ -302,7 +302,7 @@ fail:
     return -1;
 }
 
-static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SINK(o)->userdata;
 
     switch (code) {
@@ -347,7 +347,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *
             break;
     }
 
-    return pa_sink_process_msg(o, code, data, chunk);
+    return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 
 static int mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) {
@@ -510,12 +510,13 @@ static void thread_func(void *userdata) {
         int code;
         void *data;
         pa_memchunk chunk;
+        int64_t offset;
         int r;
 
 /*         pa_log("loop");     */
         
         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
             int ret;
 
 /*             pa_log("processing msg"); */
@@ -525,7 +526,7 @@ static void thread_func(void *userdata) {
                 goto finish;
             }
 
-            ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
             pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         } 
@@ -660,7 +661,7 @@ static void thread_func(void *userdata) {
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 
 finish:
@@ -893,7 +894,7 @@ void pa__done(pa_core *c, pa_module*m) {
         pa_sink_disconnect(u->sink);
 
     if (u->thread) {
-        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
         pa_thread_free(u->thread);
     }
 
index 59414d3321ffd5d3bb67608a38fc314cd45d014c..c2dad6f9d55968bc51feef54d4a318f5d45b0dc6 100644 (file)
@@ -290,7 +290,7 @@ fail:
     return -1;
 }
 
-static int source_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SOURCE(o)->userdata;
 
     switch (code) {
@@ -335,7 +335,7 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk
             break;
     }
 
-    return pa_source_process_msg(o, code, data, chunk);
+    return pa_source_process_msg(o, code, data, offset, chunk);
 }
 
 static int mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) {
@@ -498,12 +498,13 @@ static void thread_func(void *userdata) {
         int code;
         void *data;
         int r;
+        int64_t offset;
         pa_memchunk chunk;
 
 /*         pa_log("loop");     */
         
         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
             int ret;
 
 /*             pa_log("processing msg"); */
@@ -513,7 +514,7 @@ static void thread_func(void *userdata) {
                 goto finish;
             }
 
-            ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
             pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         } 
@@ -634,7 +635,7 @@ static void thread_func(void *userdata) {
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 
 finish:
@@ -864,7 +865,7 @@ void pa__done(pa_core *c, pa_module*m) {
         pa_source_disconnect(u->source);
 
     if (u->thread) {
-        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
         pa_thread_free(u->thread);
     }
 
index bb0a50451e363420ded7f9334f0d6dcb612774fe..f0e3a061398bee2a280dccbbd2432399b612baeb 100644 (file)
@@ -83,7 +83,7 @@ static const char* const valid_modargs[] = {
     NULL
 };
 
-static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SINK(o)->userdata;
 
     switch (code) {
@@ -107,7 +107,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *
         }
     }
     
-    return pa_sink_process_msg(o, code, data, chunk);
+    return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 
 static void thread_func(void *userdata) {
@@ -131,9 +131,10 @@ static void thread_func(void *userdata) {
         pa_memchunk chunk;
         int r, timeout;
         struct timeval now;
+        int64_t offset;
 
         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
             int ret;
 
             if (!object && code == PA_MESSAGE_SHUTDOWN) {
@@ -141,7 +142,7 @@ static void thread_func(void *userdata) {
                 goto finish;
             }
 
-            ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
             pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         }
@@ -190,7 +191,7 @@ static void thread_func(void *userdata) {
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 
 finish:
@@ -271,7 +272,7 @@ void pa__done(pa_core *c, pa_module*m) {
         pa_sink_disconnect(u->sink);
 
     if (u->thread) {
-        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
         pa_thread_free(u->thread);
     }
 
index 63f4d40e89321db0360342cbec520fb712a9e360..a43bdb3cdc293f5b0ea5699a0c524b557e85e522 100644 (file)
@@ -581,7 +581,7 @@ fail:
     return -1;
 }
 
-static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SINK(o)->userdata;
     int do_trigger = 0, ret, quick = 1;
 
@@ -673,7 +673,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *
             break;
     }
 
-    ret = pa_sink_process_msg(o, code, data, chunk);
+    ret = pa_sink_process_msg(o, code, data, offset, chunk);
 
     if (do_trigger)
         trigger(u, quick);
@@ -681,7 +681,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *
     return ret;
 }
 
-static int source_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SOURCE(o)->userdata;
     int do_trigger = 0, ret, quick = 1;
 
@@ -770,7 +770,7 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk
             break;
     }
 
-    ret = pa_source_process_msg(o, code, data, chunk);
+    ret = pa_source_process_msg(o, code, data, offset, chunk);
 
     if (do_trigger)
         trigger(u, quick);
@@ -807,11 +807,12 @@ static void thread_func(void *userdata) {
         void *data;
         pa_memchunk chunk;
         int r;
+        int64_t offset;
 
 /*        pa_log("loop");    */
         
         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
             int ret;
 
 /*             pa_log("processing msg"); */
@@ -821,7 +822,7 @@ static void thread_func(void *userdata) {
                 goto finish;
             }
 
-            ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
             pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         } 
@@ -1051,7 +1052,7 @@ static void thread_func(void *userdata) {
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 
 finish:
@@ -1300,9 +1301,9 @@ go_on:
 
     /* Read mixer settings */
     if (u->source)
-        pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source), PA_SOURCE_MESSAGE_GET_VOLUME, &u->source->volume, NULL, NULL);
+        pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source), PA_SOURCE_MESSAGE_GET_VOLUME, &u->source->volume, 0, NULL, NULL);
     if (u->sink)
-        pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_GET_VOLUME, &u->sink->volume, NULL, NULL);
+        pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_GET_VOLUME, &u->sink->volume, 0, NULL, NULL);
 
     return 0;
 
@@ -1335,7 +1336,7 @@ void pa__done(pa_core *c, pa_module*m) {
         pa_source_disconnect(u->source);
 
     if (u->thread) {
-        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
         pa_thread_free(u->thread);
     }
 
index db8b2e103ecd099c1b2b045d55e6cd73b9b4a2bc..83ee06b7108f8f6faf92d623703574ce17be87de 100644 (file)
@@ -84,7 +84,7 @@ static const char* const valid_modargs[] = {
     NULL
 };
 
-static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SINK(o)->userdata;
 
     switch (code) {
@@ -103,7 +103,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *
         }
     }
     
-    return pa_sink_process_msg(o, code, data, chunk);
+    return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 
 static void thread_func(void *userdata) {
@@ -133,9 +133,10 @@ static void thread_func(void *userdata) {
         void *data;
         pa_memchunk chunk;
         int r;
+        int64_t offset;
 
         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
             int ret;
 
             if (!object && code == PA_MESSAGE_SHUTDOWN) {
@@ -143,7 +144,7 @@ static void thread_func(void *userdata) {
                 goto finish;
             }
 
-            ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
             pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         }
@@ -224,7 +225,7 @@ static void thread_func(void *userdata) {
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 
 finish:
@@ -326,7 +327,7 @@ void pa__done(pa_core *c, pa_module*m) {
         pa_sink_disconnect(u->sink);
 
     if (u->thread) {
-        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
         pa_thread_free(u->thread);
     }
 
index 5dbb1e7bcfca32dc8cdba6a231d1cff85288a3a6..a5f95f9a3604c76d8fa0545bba3c5fdd6f0f3ab2 100644 (file)
@@ -111,9 +111,10 @@ static void thread_func(void *userdata) {
         void *data;
         pa_memchunk chunk;
         int r;
+        int64_t offset;
 
         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
             int ret;
 
             if (!object && code == PA_MESSAGE_SHUTDOWN) {
@@ -121,7 +122,7 @@ static void thread_func(void *userdata) {
                 goto finish;
             }
 
-            ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
             pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         }
@@ -202,7 +203,7 @@ static void thread_func(void *userdata) {
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 
 finish:
@@ -303,7 +304,7 @@ void pa__done(pa_core *c, pa_module*m) {
         pa_source_disconnect(u->source);
 
     if (u->thread) {
-        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
         pa_thread_free(u->thread);
     }
 
index 1b6d8025d1e404fb937922a112f9d1c72aab022e..ed71d374afe5757cc70c1fd98815c19394b65f62 100644 (file)
@@ -46,6 +46,7 @@ struct asyncmsgq_item {
     pa_msgobject *object;
     void *userdata;
     pa_free_cb_t free_cb;
+    int64_t offset;
     pa_memchunk memchunk;
     pa_semaphore *semaphore;
     int ret;
@@ -96,7 +97,7 @@ void pa_asyncmsgq_free(pa_asyncmsgq *a) {
     pa_xfree(a);
 }
 
-void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
+void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
     struct asyncmsgq_item *i;
     pa_assert(a);
 
@@ -107,6 +108,7 @@ void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const vo
     i->object = object ? pa_msgobject_ref(object) : NULL;
     i->userdata = (void*) userdata;
     i->free_cb = free_cb;
+    i->offset = offset;
     if (chunk) {
         pa_assert(chunk->memblock);
         i->memchunk = *chunk;
@@ -121,7 +123,7 @@ void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const vo
     pa_mutex_unlock(a->mutex);
 }
 
-int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *chunk) {
+int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
     struct asyncmsgq_item i;
     pa_assert(a);
 
@@ -130,6 +132,7 @@ int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const voi
     i.userdata = (void*) userdata;
     i.free_cb = NULL;
     i.ret = -1;
+    i.offset = offset;
     if (chunk) {
         pa_assert(chunk->memblock);
         i.memchunk = *chunk;
@@ -148,7 +151,7 @@ int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const voi
     return i.ret;
 }
 
-int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, pa_memchunk *chunk, int wait) {
+int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, int wait) {
     pa_assert(a);
     pa_assert(code);
     pa_assert(!a->current);
@@ -163,6 +166,8 @@ int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **u
     *code = a->current->code;
     if (userdata)
         *userdata = a->current->userdata;
+    if (offset)
+        *offset = a->current->offset;
     if (object) {
         if ((*object = a->current->object))
             pa_msgobject_assert_ref(*object);
@@ -207,13 +212,14 @@ int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
     do {
         pa_msgobject *o;
         void *data;
+    int64_t offset;
         pa_memchunk chunk;
         int ret;
 
-        if (pa_asyncmsgq_get(a, &o, &c, &data, &chunk, 1) < 0)
+        if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, 1) < 0)
             return -1;
 
-        ret = pa_asyncmsgq_dispatch(o, c, data, &chunk);
+        ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
         pa_asyncmsgq_done(a, ret);
 
     } while (c != code);
@@ -239,10 +245,10 @@ void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
     pa_asyncq_after_poll(a->asyncq);
 }
 
-int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, pa_memchunk *memchunk) {
+int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
 
     if (object)
-        return object->process_msg(object, code, userdata, memchunk);
+        return object->process_msg(object, code, userdata, offset, memchunk);
 
     return 0;
 }
index 17b37e4b8ff02ee84f1ff43166e1451a0fa050be..b0f1a6e43dfa936039a956febf0df6bcf251f429 100644 (file)
@@ -57,11 +57,11 @@ typedef struct pa_asyncmsgq pa_asyncmsgq;
 pa_asyncmsgq* pa_asyncmsgq_new(size_t size);
 void pa_asyncmsgq_free(pa_asyncmsgq* q);
 
-void pa_asyncmsgq_post(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *memchunk, pa_free_cb_t userdata_free_cb);
-int pa_asyncmsgq_send(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *memchunk);
+void pa_asyncmsgq_post(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk, pa_free_cb_t userdata_free_cb);
+int pa_asyncmsgq_send(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk);
 
-int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **userdata, pa_memchunk *memchunk, int wait);
-int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, pa_memchunk *memchunk);
+int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *memchunk, int wait);
+int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk);
 void pa_asyncmsgq_done(pa_asyncmsgq *q, int ret);
 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code);
 
index a940bfc0ce93aed9dea7c50687edb811de527e94..1a0e50bb6eb24940d0ffb4fb04da9dc3c7dc9fe1 100644 (file)
@@ -49,9 +49,9 @@
 
 #include "core.h"
 
-static PA_DEFINE_CHECK_TYPE(pa_core, core_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(pa_core, pa_msgobject);
 
-static int core_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
+static int core_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
     pa_core *c = PA_CORE(o);
 
     pa_core_assert_ref(c);
@@ -79,13 +79,14 @@ static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_even
         pa_msgobject *object;
         int code;
         void *data;
+        int64_t offset;
         pa_memchunk chunk;
 
         /* Check whether there is a message for us to process */
-        while (pa_asyncmsgq_get(c->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+        while (pa_asyncmsgq_get(c->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
             int ret;
 
-            ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
             pa_asyncmsgq_done(c->asyncmsgq, ret);
         }
 
@@ -116,7 +117,7 @@ pa_core* pa_core_new(pa_mainloop_api *m, int shared) {
         }
     }
 
-    c = pa_msgobject_new(pa_core, core_check_type);
+    c = pa_msgobject_new(pa_core);
     c->parent.parent.free = core_free;
     c->parent.process_msg = core_process_msg;
 
index 6db630c5fe43f8b980399c78ab701aa78420be6d..f54e69f2cd27a3d9413749d847dca80df8f10cc4 100644 (file)
 
 #include "msgobject.h"
 
-PA_DEFINE_CHECK_TYPE(pa_msgobject, pa_msgobject_check_type, pa_object_check_type);
+PA_DEFINE_CHECK_TYPE(pa_msgobject, pa_object);
 
-pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name)) {
+pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(const char *type_name)) {
     pa_msgobject *o;
 
     pa_assert(size > sizeof(pa_msgobject));
     pa_assert(type_name);
 
-    o = PA_MSGOBJECT(pa_object_new_internal(size, type_name, check_type ? check_type : pa_msgobject_check_type));
+    if (!check_type)
+        check_type = pa_msgobject_check_type;
+
+    pa_assert(check_type(type_name));
+    pa_assert(check_type("pa_object"));
+    pa_assert(check_type("pa_msgobject"));
+
+    o = PA_MSGOBJECT(pa_object_new_internal(size, type_name, check_type));
     o->process_msg = NULL;
     return o;
 }
index 65761aea56a528c65c9babf6be1dfa646cfeded7..8221cc330f2193c8964ff451e4704efdca757b0c 100644 (file)
@@ -37,14 +37,14 @@ typedef struct pa_msgobject pa_msgobject;
 
 struct pa_msgobject {
     pa_object parent;
-    int (*process_msg)(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
+    int (*process_msg)(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
 };
 
-pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name));
+pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(const char *type_name));
 
-int pa_msgobject_check_type(pa_object *o, const char *type);
+int pa_msgobject_check_type(const char *type);
 
-#define pa_msgobject_new(type, check_type) ((type*) pa_msgobject_new_internal(sizeof(type), #type, check_type))
+#define pa_msgobject_new(type) ((type*) pa_msgobject_new_internal(sizeof(type), #type, type##_check_type))
 #define pa_msgobject_free ((void (*) (pa_msgobject* o)) pa_object_free)
 
 #define PA_MSGOBJECT(o) pa_msgobject_cast(o)
index f7a7da1de84bcf5b600d688dafe3a4ef5573251f..d22c8d122093d367aeccfb869d7e83fa8effd1ef 100644 (file)
@@ -115,6 +115,8 @@ enum {
     PA_COMMAND_MOVE_SINK_INPUT,
     PA_COMMAND_MOVE_SOURCE_OUTPUT,
 
+    PA_COMMAND_SET_SINK_INPUT_MUTE,
+
     PA_COMMAND_MAX
 };
 
index a983c5ae9d26551ed1fb0da78203b54b0857a1ac..23a45754eb5e520f50187148efc59c1fad38fad6 100644 (file)
 
 #include "object.h"
 
-pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name)) {
+pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(const char *type_name)) {
     pa_object *o;
 
     pa_assert(size > sizeof(pa_object));
     pa_assert(type_name);
 
+    if (!check_type)
+        check_type = pa_object_check_type;
+
+    pa_assert(check_type(type_name));
+    pa_assert(check_type("pa_object"));
+    
     o = pa_xmalloc(size);
     PA_REFCNT_INIT(o);
     o->type_name = type_name;
     o->free = pa_object_free;
-    o->check_type = check_type ? check_type : pa_object_check_type;
+    o->check_type = check_type;
 
     return o;
 }
@@ -59,8 +65,7 @@ void pa_object_unref(pa_object *o) {
     }
 }
 
-int pa_object_check_type(pa_object *o, const char *type_name) {
-    pa_assert(o);
+int pa_object_check_type(const char *type_name) {
     pa_assert(type_name);
     
     return type_name == "pa_object" || strcmp(type_name, "pa_object") == 0;
index 270f289d6521b8b207117d2128c3ad2e22811235..9c62f74a4ed0d06fc725a91b99d3b27e6f0fdab0 100644 (file)
@@ -38,20 +38,19 @@ struct pa_object {
     PA_REFCNT_DECLARE;
     const char *type_name;
     void (*free)(pa_object *o);
-    int (*check_type)(pa_object *o, const char *type_name);
+    int (*check_type)(const char *type_name);
 };
 
-pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name));
-#define pa_object_new(type, check_type) ((type*) pa_object_new_internal(sizeof(type), #type, check_type)
+pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(const char *type_name));
+#define pa_object_new(type) ((type*) pa_object_new_internal(sizeof(type), #type, type##_check_type)
 
 #define pa_object_free ((void (*) (pa_object* o)) pa_xfree)
 
-int pa_object_check_type(pa_object *o, const char *type);
+int pa_object_check_type(const char *type);
 
 static inline int pa_object_isinstance(void *o) {
     pa_object *obj = (pa_object*) o;
-    pa_assert(obj);
-    return obj->check_type(obj, "pa_object");
+    return obj ? obj->check_type("pa_object") : 0;
 }
 
 pa_object *pa_object_ref(pa_object *o);
@@ -63,19 +62,18 @@ static inline int pa_object_refcnt(pa_object *o) {
 
 static inline pa_object* pa_object_cast(void *o) {
     pa_object *obj = (pa_object*) o;
-    pa_assert(obj->check_type(obj, "pa_object"));
+    pa_assert(!obj || obj->check_type("pa_object"));
     return obj;
 }
 
-#define pa_object_assert_ref(o) pa_assert(pa_object_refcnt(o))
+#define pa_object_assert_ref(o) pa_assert(pa_object_refcnt(o) > 0)
 
 #define PA_OBJECT(o) pa_object_cast(o)
 
 #define PA_DECLARE_CLASS(c)                                             \
     static inline int c##_isinstance(void *o) {                         \
         pa_object *obj = (pa_object*) o;                                \
-        pa_assert(obj);                                                 \
-        return obj->check_type(obj, #c);                                \
+        return obj ? obj->check_type(#c) : 1;                           \
     }                                                                   \
     static inline c* c##_cast(void *o) {                                \
         pa_assert(c##_isinstance(o));                                   \
@@ -95,14 +93,13 @@ static inline pa_object* pa_object_cast(void *o) {
     }                                                                   \
     struct __stupid_useless_struct_to_allow_trailing_semicolon
 
-#define PA_DEFINE_CHECK_TYPE(c, func, parent)                           \
-    int func(pa_object *o, const char *type) {                          \
-        pa_assert(o);                                                   \
+#define PA_DEFINE_CHECK_TYPE(c, parent)                                 \
+    int c##_check_type(const char *type) {                              \
         pa_assert(type);                                                \
         if (type == #c ||                                               \
             strcmp(type, #c) == 0)                                      \
             return 1;                                                   \
-        return parent(o, type);                                         \
+        return parent##_check_type(type);                               \
     }                                                                   \
     struct __stupid_useless_struct_to_allow_trailing_semicolon
 
index 97345f00a04a794ed9207786cc8f0d57f4c39649..3be5eae84716e93165612b1770dbc0c0e4c6929a 100644 (file)
@@ -28,7 +28,6 @@
 
 #include <string.h>
 #include <stdio.h>
-#include <assert.h>
 #include <stdlib.h>
 #include <unistd.h>
 
 
 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
 
-struct connection;
+typedef struct connection connection;
 struct pa_protocol_native;
 
-struct record_stream {
-    struct connection *connection;
+typedef struct record_stream {
+    pa_msgobject parent;
+
+    connection *connection;
     uint32_t index;
+    
     pa_source_output *source_output;
     pa_memblockq *memblockq;
     size_t fragment_size;
-};
+} record_stream;
+
+typedef struct output_stream {
+    pa_msgobject parent;
+} output_stream;
 
-struct playback_stream {
-    int type;
-    struct connection *connection;
+typedef struct playback_stream {
+    output_stream parent;
+    
+    connection *connection;
     uint32_t index;
+    
     pa_sink_input *sink_input;
     pa_memblockq *memblockq;
-    size_t requested_bytes;
     int drain_request;
     uint32_t drain_tag;
     uint32_t syncid;
     int underrun;
 
-    /* Sync group members */
-    PA_LLIST_FIELDS(struct playback_stream);
-};
+    pa_atomic_t missing;
+    size_t last_missing;
+} playback_stream;
 
-struct upload_stream {
-    int type;
-    struct connection *connection;
+typedef struct upload_stream {
+    output_stream parent;
+    
+    connection *connection;
     uint32_t index;
+    
     pa_memchunk memchunk;
     size_t length;
     char *name;
     pa_sample_spec sample_spec;
     pa_channel_map channel_map;
-};
-
-struct output_stream {
-    int type;
-};
-
-enum {
-    UPLOAD_STREAM,
-    PLAYBACK_STREAM
-};
+} upload_stream;
 
 struct connection {
+    pa_msgobject parent;
+    
     int authorized;
     uint32_t version;
     pa_protocol_native *protocol;
@@ -132,10 +134,31 @@ struct connection {
     pa_time_event *auth_timeout_event;
 };
 
+
+PA_DECLARE_CLASS(record_stream);
+#define RECORD_STREAM(o) (record_stream_cast(o))
+static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
+
+PA_DECLARE_CLASS(output_stream);
+#define OUTPUT_STREAM(o) (output_stream_cast(o))
+static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
+
+PA_DECLARE_CLASS(playback_stream);
+#define PLAYBACK_STREAM(o) (playback_stream_cast(o))
+static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
+
+PA_DECLARE_CLASS(upload_stream);
+#define UPLOAD_STREAM(o) (upload_stream_cast(o))
+static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
+
+PA_DECLARE_CLASS(connection);
+#define CONNECTION(o) (connection_cast(o))
+static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);
+
 struct pa_protocol_native {
     pa_module *module;
-    int public;
     pa_core *core;
+    int public;
     pa_socket_server *server;
     pa_idxset *connections;
     uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
@@ -146,17 +169,39 @@ struct pa_protocol_native {
     pa_ip_acl *auth_ip_acl;
 };
 
+enum {
+    SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
+    SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
+    SINK_INPUT_MESSAGE_FLUSH,
+    SINK_INPUT_MESSAGE_TRIGGER,
+    SINK_INPUT_MESSAGE_SEEK,
+    SINK_INPUT_MESSAGE_PREBUF_FORCE
+};
+
+enum {
+    PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
+    PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
+    PLAYBACK_STREAM_MESSAGE_OVERFLOW,
+    PLAYBACK_STREAM_MESSAGE_DRAIN_ACK
+};
+
+enum {
+    RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
+};
+
 static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk);
-static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length);
+static void sink_input_drop_cb(pa_sink_input *i, size_t length);
 static void sink_input_kill_cb(pa_sink_input *i);
-static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i);
 
+static void send_memblock(connection *c);
 static void request_bytes(struct playback_stream*s);
 
 static void source_output_kill_cb(pa_source_output *o);
 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
 
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
+
 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
@@ -179,8 +224,7 @@ static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag,
 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
-static void command_flush_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
-static void command_trigger_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
@@ -239,12 +283,13 @@ static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
 
     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
+    [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
 
     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
-    [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_flush_playback_stream,
-    [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_prebuf_playback_stream,
-    [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_prebuf_playback_stream,
+    [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
+    [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
+    [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
 
     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
@@ -269,74 +314,145 @@ static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
 
 /* structure management */
 
-static struct upload_stream* upload_stream_new(
-    struct connection *c,
-    const pa_sample_spec *ss,
-    const pa_channel_map *map,
-    const char *name, size_t length) {
+static void upload_stream_unlink(upload_stream *s) {
+    pa_assert(s);
+
+    if (!s->connection)
+        return;
+
+    pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
+    upload_stream_unref(s);
+    s->connection = NULL;
+}
+
+static void upload_stream_free(pa_object *o) {
+    upload_stream *s = UPLOAD_STREAM(o);
+    pa_assert(s);
 
-    struct upload_stream *s;
-    assert(c && ss && name && length);
+    upload_stream_unlink(s);
 
-    s = pa_xnew(struct upload_stream, 1);
-    s->type = UPLOAD_STREAM;
+    pa_xfree(s->name);
+
+    if (s->memchunk.memblock)
+        pa_memblock_unref(s->memchunk.memblock);
+
+    pa_xfree(s);
+}
+
+static upload_stream* upload_stream_new(
+        connection *c,
+        const pa_sample_spec *ss,
+        const pa_channel_map *map,
+        const char *name, size_t length) {
+
+    upload_stream *s;
+    
+    pa_assert(c);
+    pa_assert(ss);
+    pa_assert(name);
+    pa_assert(length > 0);
+
+    s = pa_msgobject_new(upload_stream);
+    c->parent.parent.free = upload_stream_free;
     s->connection = c;
     s->sample_spec = *ss;
     s->channel_map = *map;
     s->name = pa_xstrdup(name);
-
-    s->memchunk.memblock = NULL;
-    s->memchunk.index = 0;
-    s->memchunk.length = 0;
-
+    pa_memchunk_reset(&s->memchunk);
     s->length = length;
 
     pa_idxset_put(c->output_streams, s, &s->index);
+    
     return s;
 }
 
-static void upload_stream_free(struct upload_stream *o) {
-    assert(o && o->connection);
+static void record_stream_unlink(record_stream *s) {
+    pa_assert(s);
 
-    pa_idxset_remove_by_data(o->connection->output_streams, o, NULL);
+    if (!s->connection)
+        return;
 
-    pa_xfree(o->name);
+    if (s->source_output) {
+        pa_source_output_disconnect(s->source_output);
+        pa_source_output_unref(s->source_output);
+        s->source_output = NULL;
+    }
+
+    pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
+    record_stream_unref(s);    
+    s->connection = NULL;
+}
+
+static void record_stream_free(pa_object *o) {
+    record_stream *s = RECORD_STREAM(o);
+    pa_assert(s);
+
+    record_stream_unlink(s);
+    
+    pa_memblockq_free(s->memblockq);
+    pa_xfree(s);
+}
 
-    if (o->memchunk.memblock)
-        pa_memblock_unref(o->memchunk.memblock);
+static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
+    record_stream *s = RECORD_STREAM(o);
+    record_stream_assert_ref(s);
+
+    switch (code) {
+        
+        case RECORD_STREAM_MESSAGE_POST_DATA:
+            
+            if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
+                pa_log_warn("Failed to push data into output queue.");
+                return -1;
+            }
+
+            if (!pa_pstream_is_pending(s->connection->pstream))
+                send_memblock(s->connection);
+            
+            pa_pstream_send_memblock(s->connection->pstream, s->index, 0, PA_SEEK_RELATIVE, chunk);
+            break;
+    }
 
-    pa_xfree(o);
+    return 0;
 }
 
-static struct record_stream* record_stream_new(
-    struct connection *c,
-    pa_source *source,
-    const pa_sample_spec *ss,
-    const pa_channel_map *map,
-    const char *name,
-    size_t maxlength,
-    size_t fragment_size) {
+static record_stream* record_stream_new(
+        connection *c,
+        pa_source *source,
+        const pa_sample_spec *ss,
+        const pa_channel_map *map,
+        const char *name,
+        size_t *maxlength,
+        size_t fragment_size,
+        int corked) {
 
-    struct record_stream *s;
+    record_stream *s;
     pa_source_output *source_output;
     size_t base;
     pa_source_output_new_data data;
 
-    assert(c && ss && name && maxlength);
+    pa_assert(c);
+    pa_assert(ss);
+    pa_assert(name);
+    pa_assert(maxlength);
+    pa_assert(*maxlength > 0);
 
     pa_source_output_new_data_init(&data);
+    data.module = c->protocol->module;
+    data.client = c->client;
     data.source = source;
     data.driver = __FILE__;
     data.name = name;
+    data.corked = corked;
     pa_source_output_new_data_set_sample_spec(&data, ss);
     pa_source_output_new_data_set_channel_map(&data, map);
-    data.module = c->protocol->module;
-    data.client = c->client;
 
     if (!(source_output = pa_source_output_new(c->protocol->core, &data, 0)))
         return NULL;
 
-    s = pa_xnew(struct record_stream, 1);
+    s = pa_msgobject_new(record_stream);
+    c->parent.parent.free = record_stream_free;
+    c->parent.process_msg = record_stream_process_msg;
     s->connection = c;
     s->source_output = source_output;
     s->source_output->push = source_output_push_cb;
@@ -346,58 +462,143 @@ static struct record_stream* record_stream_new(
 
     s->memblockq = pa_memblockq_new(
             0,
-            maxlength,
+            *maxlength,
             0,
             base = pa_frame_size(ss),
             1,
             0,
             NULL);
-    assert(s->memblockq);
 
     s->fragment_size = (fragment_size/base)*base;
-    if (!s->fragment_size)
+    if (s->fragment_size <= 0)
         s->fragment_size = base;
+    *maxlength = pa_memblockq_get_maxlength(s->memblockq);
 
     pa_idxset_put(c->record_streams, s, &s->index);
+
+    pa_source_output_put(s->source_output);
     return s;
 }
 
-static void record_stream_free(struct record_stream* r) {
-    assert(r && r->connection);
+static void playback_stream_unlink(playback_stream *s) {
+    pa_assert(s);
+
+    if (!s->connection)
+        return;
 
-    pa_idxset_remove_by_data(r->connection->record_streams, r, NULL);
-    pa_source_output_disconnect(r->source_output);
-    pa_source_output_unref(r->source_output);
-    pa_memblockq_free(r->memblockq);
-    pa_xfree(r);
+    if (s->sink_input) {
+        pa_sink_input_disconnect(s->sink_input);
+        pa_sink_input_unref(s->sink_input);
+        s->sink_input = NULL;
+    }
+
+    if (s->drain_request)
+        pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
+
+    pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
+    playback_stream_unref(s);    
+    s->connection = NULL;
+}
+
+static void playback_stream_free(pa_object* o) {
+    playback_stream *s = PLAYBACK_STREAM(o);
+    pa_assert(s);
+
+    playback_stream_unlink(s);
+    
+    pa_memblockq_free(s->memblockq);
+    pa_xfree(s);
 }
 
-static struct playback_stream* playback_stream_new(
-        struct connection *c,
+static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
+    playback_stream *s = PLAYBACK_STREAM(o);
+    playback_stream_assert_ref(s);
+
+    switch (code) {
+        case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
+            pa_tagstruct *t;
+            int32_t l;
+
+            if ((l = pa_atomic_load(&s->missing)) <= 0)
+                break;
+            
+            pa_assert_se(pa_atomic_sub(&s->missing, l) >= l);
+            
+            t = pa_tagstruct_new(NULL, 0);
+            pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
+            pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+            pa_tagstruct_putu32(t, s->index);
+            pa_tagstruct_putu32(t, l);
+            pa_pstream_send_tagstruct(s->connection->pstream, t);
+
+     /*     pa_log("Requesting %u bytes", l);  */
+            break;
+        }
+
+        case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
+            pa_tagstruct *t;
+
+            /* Report that we're empty */
+            t = pa_tagstruct_new(NULL, 0);
+            pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
+            pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+            pa_tagstruct_putu32(t, s->index);
+            pa_pstream_send_tagstruct(s->connection->pstream, t);
+            break;
+        }
+
+        case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
+            pa_tagstruct *t;
+
+            /* Notify the user we're overflowed*/
+            t = pa_tagstruct_new(NULL, 0);
+            pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
+            pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+            pa_tagstruct_putu32(t, s->index);
+            pa_pstream_send_tagstruct(s->connection->pstream, t);
+            break;
+        }
+
+        case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
+            pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
+            break;
+
+    }
+
+    return 0;
+}
+
+static playback_stream* playback_stream_new(
+        connection *c,
         pa_sink *sink,
         const pa_sample_spec *ss,
         const pa_channel_map *map,
         const char *name,
-        size_t maxlength,
-        size_t tlength,
-        size_t prebuf,
-        size_t minreq,
+        size_t *maxlength,
+        size_t *tlength,
+        size_t *prebuf,
+        size_t *minreq,
         pa_cvolume *volume,
-        uint32_t syncid) {
+        uint32_t syncid,
+        int corked,
+        size_t *missing) {
 
-    struct playback_stream *s, *ssync;
+    playback_stream *s, *ssync;
     pa_sink_input *sink_input;
     pa_memblock *silence;
     uint32_t idx;
     int64_t start_index;
     pa_sink_input_new_data data;
 
-    assert(c && ss && name && maxlength);
+    pa_assert(c);
+    pa_assert(ss);
+    pa_assert(name);
+    pa_assert(maxlength);
 
     /* Find syncid group */
     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
 
-        if (ssync->type != PLAYBACK_STREAM)
+        if (!playback_stream_isinstance(ssync))
             continue;
 
         if (ssync->syncid == syncid)
@@ -405,8 +606,13 @@ static struct playback_stream* playback_stream_new(
     }
 
     /* Synced streams must connect to the same sink */
-    if (ssync)
-        sink = ssync->sink_input->sink;
+    if (ssync) {
+
+        if (!sink)
+            sink = ssync->sink_input->sink;
+        else if (sink != ssync->sink_input->sink)
+            return NULL;
+    }
 
     pa_sink_input_new_data_init(&data);
     data.sink = sink;
@@ -417,146 +623,136 @@ static struct playback_stream* playback_stream_new(
     pa_sink_input_new_data_set_volume(&data, volume);
     data.module = c->protocol->module;
     data.client = c->client;
+    data.start_corked = corked;
+    data.sync_base = ssync ? ssync->sink_input : NULL;
 
     if (!(sink_input = pa_sink_input_new(c->protocol->core, &data, 0)))
         return NULL;
 
-    s = pa_xnew(struct playback_stream, 1);
-    s->type = PLAYBACK_STREAM;
+    s = pa_msgobject_new(playback_stream);
+    c->parent.parent.free = playback_stream_free;
+    c->parent.process_msg = playback_stream_process_msg;
     s->connection = c;
     s->syncid = syncid;
     s->sink_input = sink_input;
     s->underrun = 1;
 
+    s->sink_input->parent.process_msg = sink_input_process_msg;
     s->sink_input->peek = sink_input_peek_cb;
     s->sink_input->drop = sink_input_drop_cb;
     s->sink_input->kill = sink_input_kill_cb;
-    s->sink_input->get_latency = sink_input_get_latency_cb;
     s->sink_input->userdata = s;
 
-    if (ssync) {
-        /* Sync id found, now find head of list */
-        PA_LLIST_FIND_HEAD(struct playback_stream, ssync, &ssync);
-
-        /* Prepend ourselves */
-        PA_LLIST_PREPEND(struct playback_stream, ssync, s);
-
-        /* Set our start index to the current read index of the other grozp member(s) */
-        assert(ssync->next);
-        start_index = pa_memblockq_get_read_index(ssync->next->memblockq);
-    } else {
-        /* This ia a new sync group */
-        PA_LLIST_INIT(struct playback_stream, s);
-        start_index = 0;
-    }
+    start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
 
     silence = pa_silence_memblock_new(c->protocol->core->mempool, ss, 0);
 
     s->memblockq = pa_memblockq_new(
             start_index,
-            maxlength,
-            tlength,
+            *maxlength,
+            *tlength,
             pa_frame_size(ss),
-            prebuf,
-            minreq,
+            *prebuf,
+            *minreq,
             silence);
 
     pa_memblock_unref(silence);
 
-    s->requested_bytes = 0;
+    *maxlength = pa_memblockq_get_maxlength(s->memblockq);
+    *tlength = pa_memblockq_get_tlength(s->memblockq);
+    *prebuf = pa_memblockq_get_prebuf(s->memblockq);
+    *minreq = pa_memblockq_get_minreq(s->memblockq);
+    *missing = pa_memblockq_missing(s->memblockq);
+    
+    pa_atomic_store(&s->missing, 0);
+    s->last_missing = *missing;
     s->drain_request = 0;
 
     pa_idxset_put(c->output_streams, s, &s->index);
 
+    pa_sink_input_put(s->sink_input);
+    
     return s;
 }
 
-static void playback_stream_free(struct playback_stream* p) {
-    struct playback_stream *head;
-    assert(p && p->connection);
+static void connection_unlink(connection *c) {
+    record_stream *r;
+    output_stream *o;
 
-    if (p->drain_request)
-        pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERR_NOENTITY);
+    pa_assert(c);
 
-    PA_LLIST_FIND_HEAD(struct playback_stream, p, &head);
-    PA_LLIST_REMOVE(struct playback_stream, head, p);
-
-    pa_idxset_remove_by_data(p->connection->output_streams, p, NULL);
-    pa_sink_input_disconnect(p->sink_input);
-    pa_sink_input_unref(p->sink_input);
-    pa_memblockq_free(p->memblockq);
-    pa_xfree(p);
-}
-
-static void connection_free(struct connection *c) {
-    struct record_stream *r;
-    struct output_stream *o;
-    assert(c && c->protocol);
+    if (!c->protocol)
+        return;
 
-    pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
     while ((r = pa_idxset_first(c->record_streams, NULL)))
-        record_stream_free(r);
-    pa_idxset_free(c->record_streams, NULL, NULL);
+        record_stream_unlink(r);
 
     while ((o = pa_idxset_first(c->output_streams, NULL)))
-        if (o->type == PLAYBACK_STREAM)
-            playback_stream_free((struct playback_stream*) o);
+        if (playback_stream_isinstance(o))
+            playback_stream_unlink(PLAYBACK_STREAM(o));
         else
-            upload_stream_free((struct upload_stream*) o);
-    pa_idxset_free(c->output_streams, NULL, NULL);
-
-    pa_pdispatch_unref(c->pdispatch);
-    pa_pstream_close(c->pstream);
-    pa_pstream_unref(c->pstream);
-    pa_client_free(c->client);
+            upload_stream_unlink(UPLOAD_STREAM(o));
 
     if (c->subscription)
         pa_subscription_free(c->subscription);
 
-    if (c->auth_timeout_event)
+    if (c->pstream)
+        pa_pstream_close(c->pstream);
+
+    if (c->auth_timeout_event) {
         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
+        c->auth_timeout_event = NULL;
+    }
+    
+    pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
+    connection_unref(c);
+    c->protocol = NULL;
+}
+
+static void connection_free(pa_object *o) {
+    connection *c = CONNECTION(o);
+    
+    pa_assert(c);
+
+    connection_unlink(c);
+    
+    pa_idxset_free(c->record_streams, NULL, NULL);
+    pa_idxset_free(c->output_streams, NULL, NULL);
+
+    pa_pdispatch_unref(c->pdispatch);
+    pa_pstream_unref(c->pstream);
+    pa_client_free(c->client);
 
     pa_xfree(c);
 }
 
-static void request_bytes(struct playback_stream *s) {
-    pa_tagstruct *t;
-    size_t l;
-    assert(s);
+static void request_bytes(playback_stream *s) {
+    size_t new_missing, delta, previous_missing;
 
-    if (!(l = pa_memblockq_missing(s->memblockq)))
-        return;
-
-    if (l <= s->requested_bytes)
-        return;
+    playback_stream_assert_ref(s);
 
-    l -= s->requested_bytes;
+    new_missing = pa_memblockq_missing(s->memblockq);
 
-    if (l < pa_memblockq_get_minreq(s->memblockq))
+    if (new_missing <= s->last_missing)
         return;
 
-    s->requested_bytes += l;
-
-    t = pa_tagstruct_new(NULL, 0);
-    assert(t);
-    pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
-    pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
-    pa_tagstruct_putu32(t, s->index);
-    pa_tagstruct_putu32(t, l);
-    pa_pstream_send_tagstruct(s->connection->pstream, t);
+    delta = new_missing - s->last_missing;
+    s->last_missing = new_missing;
 
-/*     pa_log("Requesting %u bytes", l);  */
+    previous_missing = pa_atomic_add(&s->missing, delta);
+    if (previous_missing < pa_memblockq_get_minreq(s->memblockq) && previous_missing+delta >= pa_memblockq_get_minreq(s->memblockq))
+        pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
 }
 
-static void send_memblock(struct connection *c) {
+static void send_memblock(connection *c) {
     uint32_t start;
-    struct record_stream *r;
+    record_stream *r;
 
     start = PA_IDXSET_INVALID;
     for (;;) {
         pa_memchunk chunk;
 
-        if (!(r = pa_idxset_rrobin(c->record_streams, &c->rrobin_index)))
+        if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
             return;
 
         if (start == PA_IDXSET_INVALID)
@@ -571,7 +767,8 @@ static void send_memblock(struct connection *c) {
                 schunk.length = r->fragment_size;
 
             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
-            pa_memblockq_drop(r->memblockq, &chunk, schunk.length);
+            
+            pa_memblockq_drop(r->memblockq, schunk.length);
             pa_memblock_unref(schunk.memblock);
 
             return;
@@ -579,9 +776,9 @@ static void send_memblock(struct connection *c) {
     }
 }
 
-static void send_playback_stream_killed(struct playback_stream *p) {
+static void send_playback_stream_killed(playback_stream *p) {
     pa_tagstruct *t;
-    assert(p);
+    playback_stream_assert_ref(p);
 
     t = pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
@@ -590,9 +787,9 @@ static void send_playback_stream_killed(struct playback_stream *p) {
     pa_pstream_send_tagstruct(p->connection->pstream, t);
 }
 
-static void send_record_stream_killed(struct record_stream *r) {
+static void send_record_stream_killed(record_stream *r) {
     pa_tagstruct *t;
-    assert(r);
+    record_stream_assert_ref(r);
 
     t = pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
@@ -603,22 +800,123 @@ static void send_record_stream_killed(struct record_stream *r) {
 
 /*** sinkinput callbacks ***/
 
-static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
-    struct playback_stream *s;
-    assert(i && i->userdata && chunk);
-    s = i->userdata;
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
+    pa_sink_input *i = PA_SINK_INPUT(o);
+    playback_stream *s;
 
-    if (pa_memblockq_get_length(s->memblockq) <= 0 && !s->underrun) {
-        pa_tagstruct *t;
+    pa_sink_input_assert_ref(i);
+    s = PLAYBACK_STREAM(i->userdata);
+    playback_stream_assert_ref(s);
+
+    switch (code) {
+
+        case SINK_INPUT_MESSAGE_SEEK: 
+            pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata));
+            return 0;
+
+        case SINK_INPUT_MESSAGE_POST_DATA: {
+            pa_assert(chunk);
+
+            if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
+
+                pa_log_warn("Failed to push data into queue");
+                pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
+                pa_memblockq_seek(s->memblockq, chunk->length, PA_SEEK_RELATIVE);
+            }
+
+            s->underrun = 0;
+            return 0;
+        }
+
+        case SINK_INPUT_MESSAGE_DRAIN: {
+
+            pa_memblockq_prebuf_disable(s->memblockq);
+
+            if (!pa_memblockq_is_readable(s->memblockq))
+                pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
+            else {
+                s->drain_tag = PA_PTR_TO_UINT(userdata);
+                s->drain_request = 1;
+            }
+
+            return 0;
+        }
+
+        case SINK_INPUT_MESSAGE_FLUSH:
+        case SINK_INPUT_MESSAGE_PREBUF_FORCE:
+        case SINK_INPUT_MESSAGE_TRIGGER: {
+            
+            pa_sink_input *isync;
+            void (*func)(pa_memblockq *bq);
+
+            switch  (code) {
+                case SINK_INPUT_MESSAGE_FLUSH:
+                    func = pa_memblockq_flush;
+                    break;
+                    
+                case SINK_INPUT_MESSAGE_PREBUF_FORCE:
+                    func = pa_memblockq_prebuf_force;
+                    break;
+                    
+                case SINK_INPUT_MESSAGE_TRIGGER:
+                    func = pa_memblockq_prebuf_disable;
+                    break;
+
+                default:
+                    pa_assert_not_reached();
+            }
+            
+            func(s->memblockq);
+            s->underrun = 0;
+            request_bytes(s);
+
+            /* Do the same for all other members in the sync group */
+            for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
+                playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
+                func(ssync->memblockq);
+                ssync->underrun = 0;
+                request_bytes(ssync);
+            }
+
+            for (isync = i->sync_next; isync; isync = isync->sync_next) {
+                playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
+                func(ssync->memblockq);
+                ssync->underrun = 0;
+                request_bytes(ssync);
+            }
+            
+            return 0;
+        }
+            
+        case PA_SINK_INPUT_MESSAGE_SET_STATE:
+
+            pa_memblockq_prebuf_force(s->memblockq);
+            break;
+
+        case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
+            pa_usec_t *r = userdata;
+
+            *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
+
+            /* Fall through, the default handler will add in the extra
+             * latency added by the resampler */
+            break;
+        }
+    }
 
-        /* Report that we're empty */
+    return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
+}
+
+static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
+    playback_stream *s;
 
-        t = pa_tagstruct_new(NULL, 0);
-        pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
-        pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
-        pa_tagstruct_putu32(t, s->index);
-        pa_pstream_send_tagstruct(s->connection->pstream, t);
+    pa_sink_input_assert_ref(i);
+    s = PLAYBACK_STREAM(i->userdata);
+    playback_stream_assert_ref(s);
+    pa_assert(chunk);
 
+    if (pa_memblockq_get_length(s->memblockq) <= 0 && !s->underrun) {
+        pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
         s->underrun = 1;
     }
 
@@ -632,65 +930,67 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
     return 0;
 }
 
-static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) {
-    struct playback_stream *s;
-    assert(i && i->userdata && length);
-    s = i->userdata;
+static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
+    playback_stream *s;
 
-    pa_memblockq_drop(s->memblockq, chunk, length);
+    pa_sink_input_assert_ref(i);
+    s = PLAYBACK_STREAM(i->userdata);
+    playback_stream_assert_ref(s);
+    pa_assert(length > 0);
 
-    request_bytes(s);
+    pa_memblockq_drop(s->memblockq, length);
 
     if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
-        pa_pstream_send_simple_ack(s->connection->pstream, s->drain_tag);
         s->drain_request = 0;
+        pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
     }
 
+    request_bytes(s);
+
 /*     pa_log("after_drop: %u %u", pa_memblockq_get_length(s->memblockq), pa_memblockq_is_readable(s->memblockq));   */
 }
 
 static void sink_input_kill_cb(pa_sink_input *i) {
-    assert(i && i->userdata);
-    send_playback_stream_killed((struct playback_stream *) i->userdata);
-    playback_stream_free((struct playback_stream *) i->userdata);
-}
+    playback_stream *s;
 
-static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i) {
-    struct playback_stream *s;
-    assert(i && i->userdata);
-    s = i->userdata;
+    pa_sink_input_assert_ref(i);
+    s = PLAYBACK_STREAM(i->userdata);
+    playback_stream_assert_ref(s);
 
-    /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
-
-    return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
+    send_playback_stream_killed(s);
+    playback_stream_unlink(s);
 }
 
 /*** source_output callbacks ***/
 
 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
-    struct record_stream *s;
-    assert(o && o->userdata && chunk);
-    s = o->userdata;
+    record_stream *s;
 
-    if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
-        pa_log_warn("Failed to push data into output queue.");
-        return;
-    }
+    pa_source_output_assert_ref(o);
+    s = RECORD_STREAM(o->userdata);
+    record_stream_assert_ref(s);
+    pa_assert(chunk);
 
-    if (!pa_pstream_is_pending(s->connection->pstream))
-        send_memblock(s->connection);
+    pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
 }
 
 static void source_output_kill_cb(pa_source_output *o) {
-    assert(o && o->userdata);
-    send_record_stream_killed((struct record_stream *) o->userdata);
-    record_stream_free((struct record_stream *) o->userdata);
+    record_stream *s;
+
+    pa_source_output_assert_ref(o);
+    s = RECORD_STREAM(o->userdata);
+    record_stream_assert_ref(s);
+
+    send_record_stream_killed(s);
+    record_stream_unlink(s);
 }
 
 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
-    struct record_stream *s;
-    assert(o && o->userdata);
-    s = o->userdata;
+    record_stream *s;
+
+    pa_source_output_assert_ref(o);
+    s = RECORD_STREAM(o->userdata);
+    record_stream_assert_ref(s);
 
     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
 
@@ -699,9 +999,9 @@ static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
 
 /*** pdispatch callbacks ***/
 
-static void protocol_error(struct connection *c) {
+static void protocol_error(connection *c) {
     pa_log("protocol error, kicking client");
-    connection_free(c);
+    connection_unlink(c);
 }
 
 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
@@ -721,9 +1021,9 @@ static pa_tagstruct *reply_new(uint32_t tag) {
 }
 
 static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
-    struct playback_stream *s;
-    uint32_t maxlength, tlength, prebuf, minreq, sink_index, syncid;
+    connection *c = CONNECTION(userdata);
+    playback_stream *s;
+    uint32_t maxlength, tlength, prebuf, minreq, sink_index, syncid, missing;
     const char *name, *sink_name;
     pa_sample_spec ss;
     pa_channel_map map;
@@ -731,8 +1031,9 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
     pa_sink *sink = NULL;
     pa_cvolume volume;
     int corked;
-
-    assert(c && t && c->protocol && c->protocol->core);
+    
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_get(
             t,
@@ -773,34 +1074,33 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
     }
 
-    s = playback_stream_new(c, sink, &ss, &map, name, maxlength, tlength, prebuf, minreq, &volume, syncid);
+    s = playback_stream_new(c, sink, &ss, &map, name, &maxlength, &tlength, &prebuf, &minreq, &volume, syncid, corked, &missing);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
 
-    pa_sink_input_cork(s->sink_input, corked);
-
     reply = reply_new(tag);
     pa_tagstruct_putu32(reply, s->index);
-    assert(s->sink_input);
+    pa_assert(s->sink_input);
     pa_tagstruct_putu32(reply, s->sink_input->index);
-    pa_tagstruct_putu32(reply, s->requested_bytes = pa_memblockq_missing(s->memblockq));
+    pa_tagstruct_putu32(reply, missing);
 
     if (c->version >= 9) {
         /* Since 0.9 we support sending the buffer metrics back to the client */
 
-        pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_maxlength(s->memblockq));
-        pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_tlength(s->memblockq));
-        pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_prebuf(s->memblockq));
-        pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_minreq(s->memblockq));
+        pa_tagstruct_putu32(reply, (uint32_t) maxlength);
+        pa_tagstruct_putu32(reply, (uint32_t) tlength);
+        pa_tagstruct_putu32(reply, (uint32_t) prebuf);
+        pa_tagstruct_putu32(reply, (uint32_t) minreq);
     }
 
     pa_pstream_send_tagstruct(c->pstream, reply);
-    request_bytes(s);
 }
 
 static void command_delete_stream(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t channel;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &channel) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -810,39 +1110,52 @@ static void command_delete_stream(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
 
     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
 
-    if (command == PA_COMMAND_DELETE_PLAYBACK_STREAM) {
-        struct playback_stream *s;
-        if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != PLAYBACK_STREAM)) {
-            pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
-            return;
+    switch (command) {
+        
+        case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
+            playback_stream *s;
+            if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
+                pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
+                return;
+            }
+            
+            playback_stream_unlink(s);
+            break;
         }
-
-        playback_stream_free(s);
-    } else if (command == PA_COMMAND_DELETE_RECORD_STREAM) {
-        struct record_stream *s;
-        if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
-            pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
-            return;
+            
+        case PA_COMMAND_DELETE_RECORD_STREAM: {
+            record_stream *s;
+            if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
+                pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
+                return;
+            }
+            
+            record_stream_unlink(s);
+            break;
         }
 
-        record_stream_free(s);
-    } else {
-        struct upload_stream *s;
-        assert(command == PA_COMMAND_DELETE_UPLOAD_STREAM);
-        if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
-            pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
-            return;
+        case PA_COMMAND_DELETE_UPLOAD_STREAM: {
+            upload_stream *s;
+
+            if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
+                pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
+                return;
+            }
+            
+            upload_stream_unlink(s);
+            break;
         }
 
-        upload_stream_free(s);
+        default:
+            pa_assert_not_reached();
     }
 
     pa_pstream_send_simple_ack(c->pstream, tag);
 }
 
 static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
-    struct record_stream *s;
+    connection *c = CONNECTION(userdata);
+    record_stream *s;
     uint32_t maxlength, fragment_size;
     uint32_t source_index;
     const char *name, *source_name;
@@ -851,7 +1164,9 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
     pa_tagstruct *reply;
     pa_source *source = NULL;
     int corked;
-    assert(c && t && c->protocol && c->protocol->core);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_gets(t, &name) < 0 ||
         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
@@ -882,20 +1197,18 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
     }
 
-    s = record_stream_new(c, source, &ss, &map, name, maxlength, fragment_size);
+    s = record_stream_new(c, source, &ss, &map, name, &maxlength, fragment_size, corked);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
 
-    pa_source_output_cork(s->source_output, corked);
-
     reply = reply_new(tag);
     pa_tagstruct_putu32(reply, s->index);
-    assert(s->source_output);
+    pa_assert(s->source_output);
     pa_tagstruct_putu32(reply, s->source_output->index);
 
     if (c->version >= 9) {
         /* Since 0.9 we support sending the buffer metrics back to the client */
 
-        pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_maxlength(s->memblockq));
+        pa_tagstruct_putu32(reply, (uint32_t) maxlength);
         pa_tagstruct_putu32(reply, (uint32_t) s->fragment_size);
     }
 
@@ -903,9 +1216,11 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
 }
 
 static void command_exit(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
-    assert(c && t);
+    connection *c = CONNECTION(userdata);
 
+    connection_assert_ref(c);
+    pa_assert(t);
+    
     if (!pa_tagstruct_eof(t)) {
         protocol_error(c);
         return;
@@ -913,16 +1228,17 @@ static void command_exit(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
 
     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
 
-    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop);
     c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
 }
 
 static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     const void*cookie;
     pa_tagstruct *reply;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
@@ -1015,9 +1331,11 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
 }
 
 static void command_set_client_name(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     const char *name;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_gets(t, &name) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1032,10 +1350,12 @@ static void command_set_client_name(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSE
 }
 
 static void command_lookup(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     const char *name;
     uint32_t idx = PA_IDXSET_INVALID;
-    assert(c && t);
+    
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_gets(t, &name) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1052,7 +1372,7 @@ static void command_lookup(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uin
             idx = sink->index;
     } else {
         pa_source *source;
-        assert(command == PA_COMMAND_LOOKUP_SOURCE);
+        pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1)))
             idx = source->index;
     }
@@ -1068,10 +1388,12 @@ static void command_lookup(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uin
 }
 
 static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t idx;
-    struct playback_stream *s;
-    assert(c && t);
+    playback_stream *s;
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1082,29 +1404,18 @@ static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC
     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
     s = pa_idxset_get_by_index(c->output_streams, idx);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-    CHECK_VALIDITY(c->pstream, s->type == PLAYBACK_STREAM, tag, PA_ERR_NOENTITY);
-
-    s->drain_request = 0;
-
-    pa_memblockq_prebuf_disable(s->memblockq);
+    CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
 
-    if (!pa_memblockq_is_readable(s->memblockq)) {
-/*         pa_log("immediate drain: %u", pa_memblockq_get_length(s->memblockq));  */
-        pa_pstream_send_simple_ack(c->pstream, tag);
-    } else {
-/*         pa_log("slow drain triggered");  */
-        s->drain_request = 1;
-        s->drain_tag = tag;
-
-        pa_sink_notify(s->sink_input->sink);
-    }
+    pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
 }
 
 static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     pa_tagstruct *reply;
     const pa_mempool_stat *stat;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (!pa_tagstruct_eof(t)) {
         protocol_error(c);
@@ -1125,13 +1436,15 @@ static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
 }
 
 static void command_get_playback_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     pa_tagstruct *reply;
-    struct playback_stream *s;
+    playback_stream *s;
     struct timeval tv, now;
     uint32_t idx;
     pa_usec_t latency;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         pa_tagstruct_get_timeval(t, &tv) < 0 ||
@@ -1143,13 +1456,13 @@ static void command_get_playback_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
     s = pa_idxset_get_by_index(c->output_streams, idx);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-    CHECK_VALIDITY(c->pstream, s->type == PLAYBACK_STREAM, tag, PA_ERR_NOENTITY);
+    CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
 
     reply = reply_new(tag);
 
     latency = pa_sink_get_latency(s->sink_input->sink);
-    if (s->sink_input->resampled_chunk.memblock)
-        latency += pa_bytes_to_usec(s->sink_input->resampled_chunk.length, &s->sink_input->sample_spec);
+/*     if (s->sink_input->resampled_chunk.memblock) */  /* FIXME*/ 
+/*         latency += pa_bytes_to_usec(s->sink_input->resampled_chunk.length, &s->sink_input->sample_spec); */
     pa_tagstruct_put_usec(reply, latency);
 
     pa_tagstruct_put_usec(reply, 0);
@@ -1162,12 +1475,14 @@ static void command_get_playback_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
 }
 
 static void command_get_record_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     pa_tagstruct *reply;
-    struct record_stream *s;
+    record_stream *s;
     struct timeval tv, now;
     uint32_t idx;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         pa_tagstruct_get_timeval(t, &tv) < 0 ||
@@ -1192,14 +1507,16 @@ static void command_get_record_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UN
 }
 
 static void command_create_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
-    struct upload_stream *s;
+    connection *c = CONNECTION(userdata);
+    upload_stream *s;
     uint32_t length;
     const char *name;
     pa_sample_spec ss;
     pa_channel_map map;
     pa_tagstruct *reply;
-    assert(c && t && c->protocol && c->protocol->core);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_gets(t, &name) < 0 ||
         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
@@ -1228,11 +1545,13 @@ static void command_create_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
 }
 
 static void command_finish_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t channel;
-    struct upload_stream *s;
+    upload_stream *s;
     uint32_t idx;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &channel) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1244,23 +1563,25 @@ static void command_finish_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
 
     s = pa_idxset_get_by_index(c->output_streams, channel);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-    CHECK_VALIDITY(c->pstream, s->type == UPLOAD_STREAM, tag, PA_ERR_NOENTITY);
+    CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
 
     if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, &idx) < 0)
         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
     else
         pa_pstream_send_simple_ack(c->pstream, tag);
 
-    upload_stream_free(s);
+    upload_stream_unlink(s);
 }
 
 static void command_play_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t sink_index;
     pa_volume_t volume;
     pa_sink *sink;
     const char *name, *sink_name;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
         pa_tagstruct_gets(t, &sink_name) < 0 ||
@@ -1291,9 +1612,11 @@ static void command_play_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED ui
 }
 
 static void command_remove_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     const char *name;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_gets(t, &name) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1313,7 +1636,9 @@ static void command_remove_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED
 }
 
 static void sink_fill_tagstruct(pa_tagstruct *t, pa_sink *sink) {
-    assert(t && sink);
+    pa_assert(t);
+    pa_sink_assert_ref(sink);
+
     pa_tagstruct_put(
         t,
         PA_TAG_U32, sink->index,
@@ -1321,22 +1646,24 @@ static void sink_fill_tagstruct(pa_tagstruct *t, pa_sink *sink) {
         PA_TAG_STRING, sink->description,
         PA_TAG_SAMPLE_SPEC, &sink->sample_spec,
         PA_TAG_CHANNEL_MAP, &sink->channel_map,
-        PA_TAG_U32, sink->owner ? sink->owner->index : PA_INVALID_INDEX,
-        PA_TAG_CVOLUME, pa_sink_get_volume(sink, PA_MIXER_HARDWARE),
-        PA_TAG_BOOLEAN, pa_sink_get_mute(sink, PA_MIXER_HARDWARE),
+        PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
+        PA_TAG_CVOLUME, pa_sink_get_volume(sink),
+        PA_TAG_BOOLEAN, pa_sink_get_mute(sink),
         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
         PA_TAG_USEC, pa_sink_get_latency(sink),
         PA_TAG_STRING, sink->driver,
         PA_TAG_U32,
-        (sink->get_hw_volume ? PA_SINK_HW_VOLUME_CTRL : 0) |
-        (sink->get_latency ? PA_SINK_LATENCY : 0) |
+        (sink->get_volume ? PA_SINK_HW_VOLUME_CTRL : 0) |  /* FIXME */
+        (sink->get_latency ? PA_SINK_LATENCY : 0) |        /* FIXME */ 
         (sink->is_hardware ? PA_SINK_HARDWARE : 0),
         PA_TAG_INVALID);
 }
 
 static void source_fill_tagstruct(pa_tagstruct *t, pa_source *source) {
-    assert(t && source);
+    pa_assert(t);
+    pa_source_assert_ref(source);
+
     pa_tagstruct_put(
         t,
         PA_TAG_U32, source->index,
@@ -1344,22 +1671,24 @@ static void source_fill_tagstruct(pa_tagstruct *t, pa_source *source) {
         PA_TAG_STRING, source->description,
         PA_TAG_SAMPLE_SPEC, &source->sample_spec,
         PA_TAG_CHANNEL_MAP, &source->channel_map,
-        PA_TAG_U32, source->owner ? source->owner->index : PA_INVALID_INDEX,
-        PA_TAG_CVOLUME, pa_source_get_volume(source, PA_MIXER_HARDWARE),
-        PA_TAG_BOOLEAN, pa_source_get_mute(source, PA_MIXER_HARDWARE),
+        PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
+        PA_TAG_CVOLUME, pa_source_get_volume(source),
+        PA_TAG_BOOLEAN, pa_source_get_mute(source),
         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
         PA_TAG_USEC, pa_source_get_latency(source),
         PA_TAG_STRING, source->driver,
         PA_TAG_U32,
-        (source->get_hw_volume ? PA_SOURCE_HW_VOLUME_CTRL : 0) |
-        (source->get_latency ? PA_SOURCE_LATENCY : 0) |
+        (source->get_volume ? PA_SOURCE_HW_VOLUME_CTRL : 0) |     /* FIXME */
+        (source->get_latency ? PA_SOURCE_LATENCY : 0) |              /* FIXME */
         (source->is_hardware ? PA_SOURCE_HARDWARE : 0),
         PA_TAG_INVALID);
 }
 
 static void client_fill_tagstruct(pa_tagstruct *t, pa_client *client) {
-    assert(t && client);
+    pa_assert(t);
+    pa_assert(client);
+
     pa_tagstruct_putu32(t, client->index);
     pa_tagstruct_puts(t, client->name);
     pa_tagstruct_putu32(t, client->owner ? client->owner->index : PA_INVALID_INDEX);
@@ -1367,7 +1696,9 @@ static void client_fill_tagstruct(pa_tagstruct *t, pa_client *client) {
 }
 
 static void module_fill_tagstruct(pa_tagstruct *t, pa_module *module) {
-    assert(t && module);
+    pa_assert(t);
+    pa_assert(module);
+
     pa_tagstruct_putu32(t, module->index);
     pa_tagstruct_puts(t, module->name);
     pa_tagstruct_puts(t, module->argument);
@@ -1376,7 +1707,9 @@ static void module_fill_tagstruct(pa_tagstruct *t, pa_module *module) {
 }
 
 static void sink_input_fill_tagstruct(pa_tagstruct *t, pa_sink_input *s) {
-    assert(t && s);
+    pa_assert(t);
+    pa_sink_input_assert_ref(s);
+
     pa_tagstruct_putu32(t, s->index);
     pa_tagstruct_puts(t, s->name);
     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
@@ -1392,7 +1725,9 @@ static void sink_input_fill_tagstruct(pa_tagstruct *t, pa_sink_input *s) {
 }
 
 static void source_output_fill_tagstruct(pa_tagstruct *t, pa_source_output *s) {
-    assert(t && s);
+    pa_assert(t);
+    pa_source_output_assert_ref(s);
+
     pa_tagstruct_putu32(t, s->index);
     pa_tagstruct_puts(t, s->name);
     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
@@ -1407,7 +1742,9 @@ static void source_output_fill_tagstruct(pa_tagstruct *t, pa_source_output *s) {
 }
 
 static void scache_fill_tagstruct(pa_tagstruct *t, pa_scache_entry *e) {
-    assert(t && e);
+    pa_assert(t);
+    pa_assert(e);
+    
     pa_tagstruct_putu32(t, e->index);
     pa_tagstruct_puts(t, e->name);
     pa_tagstruct_put_cvolume(t, &e->volume);
@@ -1420,7 +1757,7 @@ static void scache_fill_tagstruct(pa_tagstruct *t, pa_scache_entry *e) {
 }
 
 static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t idx;
     pa_sink *sink = NULL;
     pa_source *source = NULL;
@@ -1431,7 +1768,9 @@ static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, u
     pa_scache_entry *sce = NULL;
     const char *name;
     pa_tagstruct *reply;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         (command != PA_COMMAND_GET_CLIENT_INFO &&
@@ -1466,7 +1805,7 @@ static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, u
     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
     else {
-        assert(command == PA_COMMAND_GET_SAMPLE_INFO);
+        pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
         if (idx != PA_INVALID_INDEX)
             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
         else
@@ -1497,12 +1836,14 @@ static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, u
 }
 
 static void command_get_info_list(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     pa_idxset *i;
     uint32_t idx;
     void *p;
     pa_tagstruct *reply;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (!pa_tagstruct_eof(t)) {
         protocol_error(c);
@@ -1526,7 +1867,7 @@ static void command_get_info_list(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
         i = c->protocol->core->source_outputs;
     else {
-        assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
+        pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
         i = c->protocol->core->scache;
     }
 
@@ -1545,7 +1886,7 @@ static void command_get_info_list(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
                 source_output_fill_tagstruct(reply, p);
             else {
-                assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
+                pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
                 scache_fill_tagstruct(reply, p);
             }
         }
@@ -1555,11 +1896,13 @@ static void command_get_info_list(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
 }
 
 static void command_get_server_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     pa_tagstruct *reply;
     char txt[256];
     const char *n;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (!pa_tagstruct_eof(t)) {
         protocol_error(c);
@@ -1587,8 +1930,10 @@ static void command_get_server_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSE
 
 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
     pa_tagstruct *t;
-    struct connection *c = userdata;
-    assert(c && core);
+    connection *c = CONNECTION(userdata);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     t = pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
@@ -1599,9 +1944,11 @@ static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint3
 }
 
 static void command_subscribe(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     pa_subscription_mask_t m;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &m) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1617,7 +1964,7 @@ static void command_subscribe(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint
 
     if (m != 0) {
         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
-        assert(c->subscription);
+        pa_assert(c->subscription);
     } else
         c->subscription = NULL;
 
@@ -1631,14 +1978,16 @@ static void command_set_volume(
         pa_tagstruct *t,
         void *userdata) {
 
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t idx;
     pa_cvolume volume;
     pa_sink *sink = NULL;
     pa_source *source = NULL;
     pa_sink_input *si = NULL;
     const char *name = NULL;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
@@ -1653,27 +2002,36 @@ static void command_set_volume(
     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || !name || (*name && pa_utf8_valid(name)), tag, PA_ERR_INVALID);
     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
 
-    if (command == PA_COMMAND_SET_SINK_VOLUME) {
-        if (idx != PA_INVALID_INDEX)
-            sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
-        else
-            sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1);
-    } else if (command == PA_COMMAND_SET_SOURCE_VOLUME) {
-        if (idx != (uint32_t) -1)
-            source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
-        else
-            source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1);
-    }  else {
-        assert(command == PA_COMMAND_SET_SINK_INPUT_VOLUME);
-        si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
+    switch (command) {
+        
+        case PA_COMMAND_SET_SINK_VOLUME:
+            if (idx != PA_INVALID_INDEX)
+                sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
+            else
+                sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1);
+            break;
+
+        case PA_COMMAND_SET_SOURCE_VOLUME:
+            if (idx != PA_INVALID_INDEX)
+                source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
+            else
+                source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1);
+            break;
+            
+        case PA_COMMAND_SET_SINK_INPUT_VOLUME:
+            si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
+            break;
+
+        default:
+            pa_assert_not_reached();
     }
 
     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
 
     if (sink)
-        pa_sink_set_volume(sink, PA_MIXER_HARDWARE, &volume);
+        pa_sink_set_volume(sink, &volume);
     else if (source)
-        pa_source_set_volume(source, PA_MIXER_HARDWARE, &volume);
+        pa_source_set_volume(source, &volume);
     else if (si)
         pa_sink_input_set_volume(si, &volume);
 
@@ -1687,16 +2045,20 @@ static void command_set_mute(
         pa_tagstruct *t,
         void *userdata) {
 
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t idx;
     int mute;
     pa_sink *sink = NULL;
     pa_source *source = NULL;
+    pa_sink_input *si = NULL;
     const char *name = NULL;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
-        pa_tagstruct_gets(t, &name) < 0 ||
+        (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
+        (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
         pa_tagstruct_get_boolean(t, &mute) ||
         !pa_tagstruct_eof(t)) {
         protocol_error(c);
@@ -1706,35 +2068,53 @@ static void command_set_mute(
     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || !name || (*name && pa_utf8_valid(name)), tag, PA_ERR_INVALID);
 
-    if (command == PA_COMMAND_SET_SINK_MUTE) {
-        if (idx != PA_INVALID_INDEX)
-            sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
-        else
-            sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1);
-    } else {
-        assert(command == PA_COMMAND_SET_SOURCE_MUTE);
-        if (idx != (uint32_t) -1)
-            source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
-        else
-            source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1);
+    switch (command) {
+        
+        case PA_COMMAND_SET_SINK_MUTE:
+
+            if (idx != PA_INVALID_INDEX)
+                sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
+            else
+                sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1);
+
+            break;
+
+        case PA_COMMAND_SET_SOURCE_MUTE:
+            if (idx != PA_INVALID_INDEX)
+                source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
+            else
+                source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1);
+
+            break;
+
+        case PA_COMMAND_SET_SINK_INPUT_MUTE:
+            si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
+            break;
+
+        default:
+            pa_assert_not_reached();
     }
 
-    CHECK_VALIDITY(c->pstream, sink || source, tag, PA_ERR_NOENTITY);
+    CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
 
     if (sink)
-        pa_sink_set_mute(sink, PA_MIXER_HARDWARE, mute);
+        pa_sink_set_mute(sink, mute);
     else if (source)
-        pa_source_set_mute(source, PA_MIXER_HARDWARE, mute);
+        pa_source_set_mute(source, mute);
+    else if (si)
+        pa_sink_input_set_mute(si, mute);
 
     pa_pstream_send_simple_ack(c->pstream, tag);
 }
 
 static void command_cork_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t idx;
     int b;
-    struct playback_stream *s, *ssync;
-    assert(c && t);
+    playback_stream *s;
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         pa_tagstruct_get_boolean(t, &b) < 0 ||
@@ -1747,73 +2127,19 @@ static void command_cork_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
     s = pa_idxset_get_by_index(c->output_streams, idx);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-    CHECK_VALIDITY(c->pstream, s->type == PLAYBACK_STREAM, tag, PA_ERR_NOENTITY);
+    CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
 
     pa_sink_input_cork(s->sink_input, b);
-    pa_memblockq_prebuf_force(s->memblockq);
-
-    /* Do the same for all other members in the sync group */
-    for (ssync = s->prev; ssync; ssync = ssync->prev) {
-        pa_sink_input_cork(ssync->sink_input, b);
-        pa_memblockq_prebuf_force(ssync->memblockq);
-    }
-
-    for (ssync = s->next; ssync; ssync = ssync->next) {
-        pa_sink_input_cork(ssync->sink_input, b);
-        pa_memblockq_prebuf_force(ssync->memblockq);
-    }
-
     pa_pstream_send_simple_ack(c->pstream, tag);
 }
 
-static void command_flush_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+static void command_trigger_or_flush_or_prebuf_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
+    connection *c = CONNECTION(userdata);
     uint32_t idx;
-    struct playback_stream *s, *ssync;
-    assert(c && t);
-
-    if (pa_tagstruct_getu32(t, &idx) < 0 ||
-        !pa_tagstruct_eof(t)) {
-        protocol_error(c);
-        return;
-    }
-
-    CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
-    CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
-    s = pa_idxset_get_by_index(c->output_streams, idx);
-    CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-    CHECK_VALIDITY(c->pstream, s->type == PLAYBACK_STREAM, tag, PA_ERR_NOENTITY);
-
-    pa_memblockq_flush(s->memblockq);
-    s->underrun = 0;
+    playback_stream *s;
 
-    /* Do the same for all other members in the sync group */
-    for (ssync = s->prev; ssync; ssync = ssync->prev) {
-        pa_memblockq_flush(ssync->memblockq);
-        ssync->underrun = 0;
-    }
-
-    for (ssync = s->next; ssync; ssync = ssync->next) {
-        pa_memblockq_flush(ssync->memblockq);
-        ssync->underrun = 0;
-    }
-
-    pa_pstream_send_simple_ack(c->pstream, tag);
-    pa_sink_notify(s->sink_input->sink);
-    request_bytes(s);
-
-    for (ssync = s->prev; ssync; ssync = ssync->prev)
-        request_bytes(ssync);
-
-    for (ssync = s->next; ssync; ssync = ssync->next)
-        request_bytes(ssync);
-}
-
-static void command_trigger_or_prebuf_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
-    uint32_t idx;
-    struct playback_stream *s;
-    assert(c && t);
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1825,32 +2151,36 @@ static void command_trigger_or_prebuf_playback_stream(PA_GCC_UNUSED pa_pdispatch
     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
     s = pa_idxset_get_by_index(c->output_streams, idx);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-    CHECK_VALIDITY(c->pstream, s->type == PLAYBACK_STREAM, tag, PA_ERR_NOENTITY);
+    CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
 
     switch (command) {
+        case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
+            pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
+            break;
+            
         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
-            pa_memblockq_prebuf_force(s->memblockq);
+            pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
             break;
 
         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
-            pa_memblockq_prebuf_disable(s->memblockq);
+            pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
             break;
 
         default:
-            abort();
+            pa_assert_not_reached();
     }
 
-    pa_sink_notify(s->sink_input->sink);
     pa_pstream_send_simple_ack(c->pstream, tag);
-    request_bytes(s);
 }
 
 static void command_cork_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t idx;
-    struct record_stream *s;
+    record_stream *s;
     int b;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         pa_tagstruct_get_boolean(t, &b) < 0 ||
@@ -1869,11 +2199,13 @@ static void command_cork_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UN
 }
 
 static void command_flush_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t idx;
-    struct record_stream *s;
-    assert(c && t);
+    record_stream *s;
 
+    connection_assert_ref(c);
+    pa_assert(t);
+    
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         !pa_tagstruct_eof(t)) {
         protocol_error(c);
@@ -1889,9 +2221,11 @@ static void command_flush_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_U
 }
 
 static void command_set_default_sink_or_source(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     const char *s;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_gets(t, &s) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1907,10 +2241,12 @@ static void command_set_default_sink_or_source(PA_GCC_UNUSED pa_pdispatch *pd, u
 }
 
 static void command_set_stream_name(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t idx;
     const char *name;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         pa_tagstruct_gets(t, &name) < 0 ||
@@ -1923,16 +2259,16 @@ static void command_set_stream_name(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t com
     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
 
     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
-        struct playback_stream *s;
+        playback_stream *s;
 
         s = pa_idxset_get_by_index(c->output_streams, idx);
         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-        CHECK_VALIDITY(c->pstream, s->type == PLAYBACK_STREAM, tag, PA_ERR_NOENTITY);
+        CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
 
         pa_sink_input_set_name(s->sink_input, name);
 
     } else {
-        struct record_stream *s;
+        record_stream *s;
 
         s = pa_idxset_get_by_index(c->record_streams, idx);
         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
@@ -1944,9 +2280,11 @@ static void command_set_stream_name(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t com
 }
 
 static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t idx;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1973,7 +2311,7 @@ static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint3
     } else {
         pa_source_output *s;
 
-        assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
+        pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
 
         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
@@ -1985,12 +2323,14 @@ static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint3
 }
 
 static void command_load_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     pa_module *m;
     const char *name, *argument;
     pa_tagstruct *reply;
-    assert(c && t);
 
+    connection_assert_ref(c);
+    pa_assert(t);
+    
     if (pa_tagstruct_gets(t, &name) < 0 ||
         pa_tagstruct_gets(t, &argument) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -2013,10 +2353,12 @@ static void command_load_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED ui
 }
 
 static void command_unload_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t idx;
     pa_module *m;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -2033,12 +2375,14 @@ static void command_unload_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED
 }
 
 static void command_add_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     const char *name, *module, *argument;
     uint32_t type;
     uint32_t idx;
     pa_tagstruct *reply;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_gets(t, &name) < 0 ||
         pa_tagstruct_getu32(t, &type) < 0 ||
@@ -2066,11 +2410,13 @@ static void command_add_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED u
 }
 
 static void command_remove_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     const char *name = NULL;
     uint32_t type, idx = PA_IDXSET_INVALID;
     int r;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if ((pa_tagstruct_getu32(t, &idx) < 0 &&
         (pa_tagstruct_gets(t, &name) < 0 ||
@@ -2095,7 +2441,7 @@ static void command_remove_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSE
 }
 
 static void autoload_fill_tagstruct(pa_tagstruct *t, const pa_autoload_entry *e) {
-    assert(t && e);
+    pa_assert(t && e);
 
     pa_tagstruct_putu32(t, e->index);
     pa_tagstruct_puts(t, e->name);
@@ -2105,12 +2451,14 @@ static void autoload_fill_tagstruct(pa_tagstruct *t, const pa_autoload_entry *e)
 }
 
 static void command_get_autoload_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     const pa_autoload_entry *a = NULL;
     uint32_t type, idx;
     const char *name;
     pa_tagstruct *reply;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if ((pa_tagstruct_getu32(t, &idx) < 0 &&
         (pa_tagstruct_gets(t, &name) < 0 ||
@@ -2137,9 +2485,11 @@ static void command_get_autoload_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNU
 }
 
 static void command_get_autoload_info_list(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     pa_tagstruct *reply;
-    assert(c && t);
+    
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (!pa_tagstruct_eof(t)) {
         protocol_error(c);
@@ -2162,12 +2512,12 @@ static void command_get_autoload_info_list(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
 }
 
 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = CONNECTION(userdata);
     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
     const char *name = NULL;
 
-    assert(c);
-    assert(t);
+    connection_assert_ref(c);
+    pa_assert(t);
 
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         pa_tagstruct_getu32(t, &idx_device) < 0 ||
@@ -2218,69 +2568,49 @@ static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag
     }
 
     pa_pstream_send_simple_ack(c->pstream, tag);
-
 }
 
 /*** pstream callbacks ***/
 
 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
-    struct connection *c = userdata;
-    assert(p && packet && packet->data && c);
+    connection *c = CONNECTION(userdata);
+
+    pa_assert(p);
+    pa_assert(packet);
+    connection_assert_ref(c);
 
     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
         pa_log("invalid packet.");
-        connection_free(c);
+        connection_unlink(c);
     }
 }
 
 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
-    struct connection *c = userdata;
-    struct output_stream *stream;
-    assert(p && chunk && userdata);
-
-    if (!(stream = pa_idxset_get_by_index(c->output_streams, channel))) {
+    connection *c = CONNECTION(userdata);
+    output_stream *stream;
+    
+    pa_assert(p);
+    pa_assert(chunk);
+    connection_assert_ref(c);
+
+    if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
         pa_log("client sent block for invalid stream.");
         /* Ignoring */
         return;
     }
 
-    if (stream->type == PLAYBACK_STREAM) {
-        struct playback_stream *ps = (struct playback_stream*) stream;
-        if (chunk->length >= ps->requested_bytes)
-            ps->requested_bytes = 0;
-        else
-            ps->requested_bytes -= chunk->length;
-
-        pa_memblockq_seek(ps->memblockq, offset, seek);
-
-        if (pa_memblockq_push_align(ps->memblockq, chunk) < 0) {
-            pa_tagstruct *t;
-
-            pa_log_warn("failed to push data into queue");
-
-            /* Pushing this block into the queue failed, so we simulate
-             * it by skipping ahead */
-
-            pa_memblockq_seek(ps->memblockq, chunk->length, PA_SEEK_RELATIVE);
-
-            /* Notify the user */
-            t = pa_tagstruct_new(NULL, 0);
-            pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
-            pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
-            pa_tagstruct_putu32(t, ps->index);
-            pa_pstream_send_tagstruct(p, t);
-        }
-
-        ps->underrun = 0;
-
-        pa_sink_notify(ps->sink_input->sink);
-
+    if (playback_stream_isinstance(stream)) {
+        playback_stream *ps = PLAYBACK_STREAM(stream);
+        
+        if (seek != PA_SEEK_RELATIVE || offset != 0)
+            pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
+        
+        pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
+        
     } else {
-        struct upload_stream *u = (struct upload_stream*) stream;
+        upload_stream *u = UPLOAD_STREAM(stream);
         size_t l;
 
-        assert(u->type == UPLOAD_STREAM);
-
         if (!u->memchunk.memblock) {
             if (u->length == chunk->length) {
                 u->memchunk = *chunk;
@@ -2292,7 +2622,7 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
             }
         }
 
-        assert(u->memchunk.memblock);
+        pa_assert(u->memchunk.memblock);
 
         l = u->length;
         if (l > chunk->length)
@@ -2317,17 +2647,21 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
 }
 
 static void pstream_die_callback(pa_pstream *p, void *userdata) {
-    struct connection *c = userdata;
-    assert(p && c);
-    connection_free(c);
+    connection *c = CONNECTION(userdata);
+    
+    pa_assert(p);
+    connection_assert_ref(c);
 
+    connection_unlink(c);
 /*    pa_log("connection died.");*/
 }
 
 
 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
-    struct connection *c = userdata;
-    assert(p && c);
+    connection *c = CONNECTION(userdata);
+
+    pa_assert(p);
+    connection_assert_ref(c);
 
     send_memblock(c);
 }
@@ -2335,25 +2669,32 @@ static void pstream_drain_callback(pa_pstream *p, void *userdata) {
 /*** client callbacks ***/
 
 static void client_kill_cb(pa_client *c) {
-    assert(c && c->userdata);
-    connection_free(c->userdata);
+    pa_assert(c);
+    
+    connection_unlink(CONNECTION(c->userdata));
 }
 
 /*** socket server callbacks ***/
 
 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) {
-    struct connection *c = userdata;
-    assert(m && tv && c && c->auth_timeout_event == e);
+    connection *c = CONNECTION(userdata);
+    
+    pa_assert(m);
+    pa_assert(tv);
+    connection_assert_ref(c);
+    pa_assert(c->auth_timeout_event == e);
 
     if (!c->authorized)
-        connection_free(c);
+        connection_unlink(c);
 }
 
 static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, void *userdata) {
     pa_protocol_native *p = userdata;
-    struct connection *c;
+    connection *c;
     char cname[256], pname[128];
-    assert(io && p);
+    
+    pa_assert(io);
+    pa_assert(p);
 
     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
@@ -2361,7 +2702,8 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
         return;
     }
 
-    c = pa_xmalloc(sizeof(struct connection));
+    c = pa_msgobject_new(connection);
+    c->parent.parent.free = connection_free;
 
     c->authorized = !!p->public;
 
@@ -2382,15 +2724,15 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
     c->protocol = p;
     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
     pa_snprintf(cname, sizeof(cname), "Native client (%s)", pname);
-    assert(p->core);
+    pa_assert(p->core);
     c->client = pa_client_new(p->core, __FILE__, cname);
-    assert(c->client);
+    pa_assert(c->client);
     c->client->kill = client_kill_cb;
     c->client->userdata = c;
     c->client->owner = p->module;
 
     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
-    assert(c->pstream);
+    pa_assert(c->pstream);
 
     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
@@ -2398,11 +2740,11 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
 
     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
-    assert(c->pdispatch);
+    pa_assert(c->pdispatch);
 
     c->record_streams = pa_idxset_new(NULL, NULL);
     c->output_streams = pa_idxset_new(NULL, NULL);
-    assert(c->record_streams && c->output_streams);
+    pa_assert(c->record_streams && c->output_streams);
 
     c->rrobin_index = PA_IDXSET_INVALID;
     c->subscription = NULL;
@@ -2420,7 +2762,7 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
 /*** module entry points ***/
 
 static int load_key(pa_protocol_native*p, const char*fn) {
-    assert(p);
+    pa_assert(p);
 
     p->auth_cookie_in_property = 0;
 
@@ -2450,8 +2792,8 @@ static pa_protocol_native* protocol_new_internal(pa_core *c, pa_module *m, pa_mo
     int public = 0;
     const char *acl;
 
-    assert(c);
-    assert(ma);
+    pa_assert(c);
+    pa_assert(ma);
 
     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &public) < 0) {
         pa_log("auth-anonymous= expects a boolean argument.");
@@ -2492,7 +2834,7 @@ static pa_protocol_native* protocol_new_internal(pa_core *c, pa_module *m, pa_mo
         goto fail;
 
     p->connections = pa_idxset_new(NULL, NULL);
-    assert(p->connections);
+    pa_assert(p->connections);
 
     return p;
 
@@ -2527,11 +2869,11 @@ pa_protocol_native* pa_protocol_native_new(pa_core *core, pa_socket_server *serv
 }
 
 void pa_protocol_native_free(pa_protocol_native *p) {
-    struct connection *c;
-    assert(p);
+    connection *c;
+    pa_assert(p);
 
     while ((c = pa_idxset_first(p->connections, NULL)))
-        connection_free(c);
+        connection_unlink(c);
     pa_idxset_free(p->connections, NULL, NULL);
 
     if (p->server) {
@@ -2563,7 +2905,12 @@ void pa_protocol_native_free(pa_protocol_native *p) {
     pa_xfree(p);
 }
 
-pa_protocol_native* pa_protocol_native_new_iochannel(pa_core*core, pa_iochannel *io, pa_module *m, pa_modargs *ma) {
+pa_protocol_native* pa_protocol_native_new_iochannel(
+        pa_core*core,
+        pa_iochannel *io,
+        pa_module *m,
+        pa_modargs *ma) {
+    
     pa_protocol_native *p;
 
     if (!(p = protocol_new_internal(core, m, ma)))
index 8f9aed58c8904d33b0778a9d88dd6ced160c0adf..0ded5d26557cd489dcb1b6b263d29bcd43eeece8 100644 (file)
@@ -67,7 +67,7 @@ typedef struct connection {
 
 PA_DECLARE_CLASS(connection);
 #define CONNECTION(o) (connection_cast(o))
-static PA_DEFINE_CHECK_TYPE(connection, connection_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);
                      
 struct pa_protocol_simple {
     pa_module *module;
@@ -91,9 +91,9 @@ enum {
 };
 
 enum {
-    MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
-    MESSAGE_POST_DATA,         /* data from source output to main loop */
-    MESSAGE_DROP_CONNECTION    /* Please drop a aconnection now */
+    CONNECTION_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
+    CONNECTION_MESSAGE_POST_DATA,         /* data from source output to main loop */
+    CONNECTION_MESSAGE_DROP_CONNECTION    /* Please drop a aconnection now */
 };
 
 
@@ -102,29 +102,12 @@ enum {
 #define RECORD_BUFFER_SECONDS (5)
 #define RECORD_BUFFER_FRAGMENTS (100)
 
-static void connection_free(pa_object *o) {
-    connection *c = CONNECTION(o);
+static void connection_unlink(connection *c) {
     pa_assert(c);
 
-    if (c->playback.current_memblock)
-        pa_memblock_unref(c->playback.current_memblock);
-
-    if (c->io)
-        pa_iochannel_free(c->io);
-    if (c->input_memblockq)
-        pa_memblockq_free(c->input_memblockq);
-    if (c->output_memblockq)
-        pa_memblockq_free(c->output_memblockq);
-
-    pa_xfree(c);
-}
-
-static void connection_drop(connection *c) {
-    pa_assert(c);
-    
-    if (!pa_idxset_remove_by_data(c->protocol->connections, c, NULL))
+    if (!c->protocol)
         return;
-
+    
     if (c->sink_input) {
         pa_sink_input_disconnect(c->sink_input);
         pa_sink_input_unref(c->sink_input);
@@ -142,9 +125,30 @@ static void connection_drop(connection *c) {
         c->client = NULL;
     }
 
+    pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
+    c->protocol = NULL;
     connection_unref(c);
 }
 
+static void connection_free(pa_object *o) {
+    connection *c = CONNECTION(o);
+    pa_assert(c);
+
+    connection_unref(c);
+    
+    if (c->playback.current_memblock)
+        pa_memblock_unref(c->playback.current_memblock);
+
+    if (c->io)
+        pa_iochannel_free(c->io);
+    if (c->input_memblockq)
+        pa_memblockq_free(c->input_memblockq);
+    if (c->output_memblockq)
+        pa_memblockq_free(c->output_memblockq);
+
+    pa_xfree(c);
+}
+
 static int do_read(connection *c) {
     pa_memchunk chunk;
     ssize_t r;
@@ -190,7 +194,7 @@ static int do_read(connection *c) {
 
     c->playback.memblock_index += r;
 
-    pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, &chunk, NULL);
+    pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL);
     pa_atomic_sub(&c->playback.missing, r);
 
     return 0;
@@ -263,28 +267,28 @@ fail:
         pa_iochannel_free(c->io);
         c->io = NULL;
 
-        pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, NULL, NULL);
+        pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL);
     } else
-        connection_drop(c);
+        connection_unlink(c);
 }
 
-static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) {
+static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
     connection *c = CONNECTION(o);
     connection_assert_ref(c);
 
     switch (code) {
-        case MESSAGE_REQUEST_DATA:
+        case CONNECTION_MESSAGE_REQUEST_DATA:
             do_work(c);
             break;
             
-        case MESSAGE_POST_DATA:
+        case CONNECTION_MESSAGE_POST_DATA:
 /*             pa_log("got data %u", chunk->length); */
             pa_memblockq_push_align(c->output_memblockq, chunk);
             do_work(c);
             break;
 
-        case MESSAGE_DROP_CONNECTION:
-            connection_drop(c);
+        case CONNECTION_MESSAGE_DROP_CONNECTION:
+            connection_unlink(c);
             break;
     }
 
@@ -294,13 +298,13 @@ static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_m
 /*** sink_input callbacks ***/
 
 /* Called from thread context */
-static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
     pa_sink_input *i = PA_SINK_INPUT(o);
     connection*c;
 
-    pa_assert(i);
-    c = i->userdata;
-    pa_assert(c);
+    pa_sink_input_assert_ref(i);
+    c = CONNECTION(i->userdata);
+    connection_assert_ref(c);
 
     switch (code) {
 
@@ -330,7 +334,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_
         }
 
         default:
-            return pa_sink_input_process_msg(o, code, userdata, chunk);
+            return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
     }
 }
 
@@ -349,7 +353,7 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
 /*     pa_log("peeked %u %i", r >= 0 ? chunk->length: 0, r); */
 
     if (c->dead && r < 0)
-        pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, NULL, NULL, NULL);
+        pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_DROP_CONNECTION, NULL, 0, NULL, NULL);
 
     return r;
 }
@@ -369,19 +373,20 @@ static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
 
     if (new > old) {
         if (pa_atomic_add(&c->playback.missing, new - old) <= 0)
-            pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_REQUEST_DATA, NULL, NULL, NULL);
+            pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
     }
 }
 
 /* Called from main context */
 static void sink_input_kill_cb(pa_sink_input *i) {
-    pa_assert(i);
+    pa_sink_input_assert_ref(i);
 
-    connection_drop(CONNECTION(i->userdata));
+    connection_unlink(CONNECTION(i->userdata));
 }
 
 /*** source_output callbacks ***/
 
+/* Called from thread context */
 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
     connection *c;
 
@@ -390,24 +395,22 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
     pa_assert(c);
     pa_assert(chunk);
 
-    pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_POST_DATA, NULL, chunk, NULL);
+    pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
 }
 
+/* Called from main context */
 static void source_output_kill_cb(pa_source_output *o) {
-    connection*c;
+    pa_source_output_assert_ref(o);
 
-    pa_assert(o);
-    c = o->userdata;
-    pa_assert(c);
-
-    connection_drop(c);
+    connection_unlink(CONNECTION(o->userdata));
 }
 
+/* Called from main context */
 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
     connection*c;
 
     pa_assert(o);
-    c = o->userdata;
+    c = CONNECTION(o->userdata);
     pa_assert(c);
 
     return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
@@ -419,16 +422,16 @@ static void client_kill_cb(pa_client *client) {
     connection*c;
 
     pa_assert(client);
-    c = client->userdata;
+    c = CONNECTION(client->userdata);
     pa_assert(c);
 
-    connection_drop(c);
+    connection_unlink(c);
 }
 
 /*** pa_iochannel callbacks ***/
 
 static void io_callback(pa_iochannel*io, void *userdata) {
-    connection *c = userdata;
+    connection *c = CONNECTION(userdata);
 
     pa_assert(io);
     pa_assert(c);
@@ -453,7 +456,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
         return;
     }
 
-    c = pa_msgobject_new(connection, connection_check_type);
+    c = pa_msgobject_new(connection);
     c->parent.parent.free = connection_free;
     c->parent.process_msg = connection_process_msg;
     c->io = io;
@@ -547,7 +550,6 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
         pa_source_output_put(c->source_output);
     }
 
-
     pa_iochannel_set_callback(c->io, io_callback, c);
     pa_idxset_put(p->connections, c, NULL);
 
@@ -555,7 +557,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
 
 fail:
     if (c)
-        connection_drop(c);
+        connection_unlink(c);
 }
 
 pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) {
@@ -618,7 +620,7 @@ void pa_protocol_simple_free(pa_protocol_simple *p) {
 
     if (p->connections) {
         while((c = pa_idxset_first(p->connections, NULL)))
-            connection_drop(c);
+            connection_unlink(c);
 
         pa_idxset_free(p->connections, NULL, NULL);
     }
index e4e931f4dd2efa98512b0b69fc1330bbeb412cd5..0a7033d00b65174d0fd843b3ca9c7fa907b3db7e 100644 (file)
@@ -45,7 +45,7 @@
 #define MOVE_BUFFER_LENGTH (1024*1024)
 #define SILENCE_BUFFER_LENGTH (64*1024)
 
-static PA_DEFINE_CHECK_TYPE(pa_sink_input, sink_input_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(pa_sink_input, pa_msgobject);
 
 static void sink_input_free(pa_object *o);
 
@@ -110,6 +110,7 @@ pa_sink_input* pa_sink_input_new(
 
     pa_return_null_if_fail(data->sink);
     pa_return_null_if_fail(pa_sink_get_state(data->sink) != PA_SINK_DISCONNECTED);
+    pa_return_null_if_fail(!data->sync_base || (data->sync_base->sink == data->sink && pa_sink_input_get_state(data->sync_base) == PA_SINK_INPUT_CORKED));
 
     if (!data->sample_spec_is_set)
         data->sample_spec = data->sink->sample_spec;
@@ -161,12 +162,12 @@ pa_sink_input* pa_sink_input_new(
         data->resample_method = pa_resampler_get_method(resampler);
     }
 
-    i = pa_msgobject_new(pa_sink_input, sink_input_check_type);
+    i = pa_msgobject_new(pa_sink_input);
     i->parent.parent.free = sink_input_free;
     i->parent.process_msg = pa_sink_input_process_msg;
 
     i->core = core;
-    i->state = PA_SINK_INPUT_RUNNING;
+    i->state = data->start_corked ? PA_SINK_INPUT_CORKED : PA_SINK_INPUT_RUNNING;
     i->flags = flags;
     i->name = pa_xstrdup(data->name);
     i->driver = pa_xstrdup(data->driver);
@@ -181,6 +182,16 @@ pa_sink_input* pa_sink_input_new(
     i->volume = data->volume;
     i->muted = data->muted;
 
+    if (data->sync_base) {
+        i->sync_next = data->sync_base->sync_next;
+        i->sync_prev = data->sync_base;
+
+        if (data->sync_base->sync_next)
+            data->sync_base->sync_next->sync_prev = i;
+        data->sync_base->sync_next = i;
+    } else 
+        i->sync_next = i->sync_prev = NULL;
+    
     i->peek = NULL;
     i->drop = NULL;
     i->kill = NULL;
@@ -213,6 +224,7 @@ pa_sink_input* pa_sink_input_new(
 }
 
 static int sink_input_set_state(pa_sink_input *i, pa_sink_input_state_t state) {
+    pa_sink_input *ssync;
     pa_assert(i);
 
     if (state == PA_SINK_INPUT_DRAINED)
@@ -221,10 +233,15 @@ static int sink_input_set_state(pa_sink_input *i, pa_sink_input_state_t state) {
     if (i->state == state)
         return 0;
 
-    if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+    if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
         return -1;
 
     i->state = state;
+    for (ssync = i->sync_prev; ssync; ssync = ssync->sync_prev)
+        ssync->state = state;
+    for (ssync = i->sync_next; ssync; ssync = ssync->sync_next)
+        ssync->state = state;
+    
     return 0;
 }
 
@@ -232,10 +249,16 @@ void pa_sink_input_disconnect(pa_sink_input *i) {
     pa_assert(i);
     pa_return_if_fail(i->state != PA_SINK_INPUT_DISCONNECTED);
 
-    pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_REMOVE_INPUT, i, NULL);
+    if (i->sync_prev)
+        i->sync_prev->sync_next = i->sync_next;
+    if (i->sync_next)
+        i->sync_next->sync_prev = i->sync_prev;
+        
+    i->sync_prev = i->sync_next = NULL;
+    
+    pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_REMOVE_INPUT, i, 0, NULL);
     pa_idxset_remove_by_data(i->sink->core->sink_inputs, i, NULL);
     pa_idxset_remove_by_data(i->sink->inputs, i, NULL);
-    pa_sink_input_unref(i);
 
     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_REMOVE, i->index);
 
@@ -248,6 +271,7 @@ void pa_sink_input_disconnect(pa_sink_input *i) {
     i->kill = NULL;
     i->get_latency = NULL;
     i->underrun = NULL;
+    pa_sink_input_unref(i);
 }
 
 static void sink_input_free(pa_object *o) {
@@ -281,7 +305,7 @@ void pa_sink_input_put(pa_sink_input *i) {
     i->thread_info.volume = i->volume;
     i->thread_info.muted = i->muted;
 
-    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, pa_sink_input_ref(i), NULL, (pa_free_cb_t) pa_sink_input_unref);
+    pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, i, 0, NULL);
     pa_sink_update_status(i->sink);
 
     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, i->index);
@@ -299,7 +323,7 @@ pa_usec_t pa_sink_input_get_latency(pa_sink_input *i) {
 
     pa_sink_input_assert_ref(i);
 
-    if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
+    if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_GET_LATENCY, &r, 0, NULL) < 0)
         r = 0;
 
     if (i->get_latency)
@@ -509,7 +533,7 @@ void pa_sink_input_set_volume(pa_sink_input *i, const pa_cvolume *volume) {
 
     i->volume = *volume;
 
-    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree);
+    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), 0, NULL, pa_xfree);
     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
 }
 
@@ -528,7 +552,7 @@ void pa_sink_input_set_mute(pa_sink_input *i, int mute) {
 
     i->muted = mute;
 
-    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
+    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), 0, NULL, NULL);
     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
 }
 
@@ -553,7 +577,7 @@ int pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate) {
 
     i->sample_spec.rate = rate;
 
-    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, NULL);
+    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), 0, NULL, NULL);
 
     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
     return 0;
@@ -741,7 +765,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
 /*     return 0; */
 }
 
-int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
+int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
     pa_sink_input *i = PA_SINK_INPUT(o);
 
     pa_sink_input_assert_ref(i);
@@ -776,12 +800,28 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memc
         }
 
         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
+            pa_sink_input *ssync;
+            
             if ((PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_DRAINED || PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_RUNNING) &&
                 (i->thread_info.state != PA_SINK_INPUT_DRAINED) && (i->thread_info.state != PA_SINK_INPUT_RUNNING))
                 pa_atomic_store(&i->thread_info.drained, 1);
             
             i->thread_info.state = PA_PTR_TO_UINT(userdata);
 
+            for (ssync = i->thread_info.sync_prev; ssync; ssync = ssync->thread_info.sync_prev) {
+                if ((PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_DRAINED || PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_RUNNING) &&
+                    (ssync->thread_info.state != PA_SINK_INPUT_DRAINED) && (ssync->thread_info.state != PA_SINK_INPUT_RUNNING))
+                    pa_atomic_store(&ssync->thread_info.drained, 1);
+                ssync->thread_info.state = PA_PTR_TO_UINT(userdata);
+            }
+            
+            for (ssync = i->thread_info.sync_next; ssync; ssync = ssync->thread_info.sync_next) {
+                if ((PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_DRAINED || PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_RUNNING) &&
+                    (ssync->thread_info.state != PA_SINK_INPUT_DRAINED) && (ssync->thread_info.state != PA_SINK_INPUT_RUNNING))
+                    pa_atomic_store(&ssync->thread_info.drained, 1);
+                ssync->thread_info.state = PA_PTR_TO_UINT(userdata);
+            }
+            
             return 0;
         }
     }
index fe62917af7575d71ee26a6971169f74f133a7380..af3db95e31dec99c88e631a4d9983f01b84b5de9 100644 (file)
@@ -71,6 +71,8 @@ struct pa_sink_input {
     pa_sample_spec sample_spec;
     pa_channel_map channel_map;
 
+    pa_sink_input *sync_prev, *sync_next;
+    
     pa_cvolume volume;
     int muted;
 
@@ -97,6 +99,8 @@ struct pa_sink_input {
         /*         size_t move_silence; */
         pa_memblock *silence_memblock;               /* may be NULL */
 
+        pa_sink_input *sync_prev, *sync_next;
+        
         pa_cvolume volume;
         int muted;
     } thread_info;
@@ -133,6 +137,9 @@ typedef struct pa_sink_input_new_data {
     int muted_is_set;
 
     pa_resample_method_t resample_method;
+
+    int start_corked;
+    pa_sink_input *sync_base;
 } pa_sink_input_new_data;
 
 pa_sink_input_new_data* pa_sink_input_new_data_init(pa_sink_input_new_data *data);
@@ -179,6 +186,6 @@ pa_sink_input_state_t pa_sink_input_get_state(pa_sink_input *i);
 
 int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume);
 void pa_sink_input_drop(pa_sink_input *i, size_t length);
-int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
+int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
 
 #endif
index 7f2a8b39e08192d29440244f04ae1f641b7437b9..5ab01cb4befcb73ad602fce4fd9c8fe3c45a43bf 100644 (file)
@@ -48,7 +48,7 @@
 #define MAX_MIX_CHANNELS 32
 #define SILENCE_BUFFER_LENGTH (64*1024)
 
-static PA_DEFINE_CHECK_TYPE(pa_sink, sink_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(pa_sink, pa_msgobject);
 
 static void sink_free(pa_object *s);
 
@@ -80,7 +80,7 @@ pa_sink* pa_sink_new(
     pa_return_null_if_fail(!driver || pa_utf8_valid(driver));
     pa_return_null_if_fail(name && pa_utf8_valid(name) && *name);
 
-    s = pa_msgobject_new(pa_sink, sink_check_type);
+    s = pa_msgobject_new(pa_sink);
 
     if (!(name = pa_namereg_register(core, name, PA_NAMEREG_SINK, s, fail))) {
         pa_xfree(s);
@@ -161,7 +161,7 @@ static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
         if ((ret = s->set_state(s, state)) < 0)
             return -1;
 
-    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
         return -1;
 
     s->state = state;
@@ -264,7 +264,7 @@ int pa_sink_suspend(pa_sink *s, int suspend) {
 void pa_sink_ping(pa_sink *s) {
     pa_sink_assert_ref(s);
 
-    pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_PING, NULL, NULL, NULL);
+    pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_PING, NULL, 0, NULL, NULL);
 }
 
 static unsigned fill_mix_info(pa_sink *s, pa_mix_info *info, unsigned maxinfo) {
@@ -530,7 +530,7 @@ pa_usec_t pa_sink_get_latency(pa_sink *s) {
     if (s->get_latency)
         return s->get_latency(s);
 
-    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_LATENCY, &usec, NULL) < 0)
+    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
         return 0;
 
     return usec;
@@ -549,7 +549,7 @@ void pa_sink_set_volume(pa_sink *s, const pa_cvolume *volume) {
         s->set_volume = NULL;
 
     if (!s->set_volume)
-        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree);
+        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), 0, NULL, pa_xfree);
 
     if (changed)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -566,7 +566,7 @@ const pa_cvolume *pa_sink_get_volume(pa_sink *s) {
         s->get_volume = NULL;
 
     if (!s->get_volume && s->refresh_volume)
-        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_VOLUME, &s->volume, NULL);
+        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_VOLUME, &s->volume, 0, NULL);
 
     if (!pa_cvolume_equal(&old_volume, &s->volume))
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -585,7 +585,7 @@ void pa_sink_set_mute(pa_sink *s, int mute) {
         s->set_mute = NULL;
 
     if (!s->set_mute)
-        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
+        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), 0, NULL, NULL);
 
     if (changed)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -602,7 +602,7 @@ int pa_sink_get_mute(pa_sink *s) {
         s->get_mute = NULL;
 
     if (!s->get_mute && s->refresh_mute)
-        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_MUTE, &s->muted, NULL);
+        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_MUTE, &s->muted, 0, NULL);
 
     if (old_muted != s->muted)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -660,21 +660,58 @@ unsigned pa_sink_used_by(pa_sink *s) {
     return ret;
 }
 
-int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
+int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
     pa_sink *s = PA_SINK(o);
     pa_sink_assert_ref(s);
 
     switch ((pa_sink_message_t) code) {
+        
         case PA_SINK_MESSAGE_ADD_INPUT: {
             pa_sink_input *i = userdata;
             pa_hashmap_put(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index), pa_sink_input_ref(i));
+
+            /* Since the caller sleeps in pa_sink_input_put(), we can
+             * safely access data outside of thread_info even though
+             * it is mutable */
+
+            if ((i->thread_info.sync_prev = i->sync_prev)) {
+                pa_assert(i->sink == i->thread_info.sync_prev->sink);
+                pa_assert(i->sync_prev->sync_next == i);
+                i->thread_info.sync_prev->thread_info.sync_next = i;
+            }
+
+            if ((i->thread_info.sync_next = i->sync_next)) {
+                pa_assert(i->sink == i->thread_info.sync_next->sink);
+                pa_assert(i->sync_next->sync_prev == i);
+                i->thread_info.sync_next->thread_info.sync_prev = i;
+            }
+
             return 0;
         }
 
         case PA_SINK_MESSAGE_REMOVE_INPUT: {
             pa_sink_input *i = userdata;
+
+            /* Since the caller sleeps in pa_sink_input_disconnect(),
+             * we can safely access data outside of thread_info even
+             * though it is mutable */
+
+            pa_assert(!i->thread_info.sync_prev);
+            pa_assert(!i->thread_info.sync_next);
+            
+            if (i->thread_info.sync_prev) {
+                i->thread_info.sync_prev->thread_info.sync_next = i->thread_info.sync_prev->sync_next;
+                i->thread_info.sync_prev = NULL;
+            }
+
+            if (i->thread_info.sync_next) {
+                i->thread_info.sync_next->thread_info.sync_prev = i->thread_info.sync_next->sync_prev;
+                i->thread_info.sync_next = NULL;
+            }
+            
             if (pa_hashmap_remove(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index)))
                 pa_sink_input_unref(i);
+            
             return 0;
         }
 
@@ -698,6 +735,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *
             return 0;
 
         case PA_SINK_MESSAGE_SET_STATE:
+            
             s->thread_info.state = PA_PTR_TO_UINT(userdata);
             return 0;
 
index 958279c58b0f54b773a69b38faf58ffb26f0db10..494bb6a9114a07cf21d2de657e6dd83f172d007d 100644 (file)
@@ -156,7 +156,7 @@ void pa_sink_render_full(pa_sink *s, size_t length, pa_memchunk *result);
 void pa_sink_render_into(pa_sink*s, pa_memchunk *target);
 void pa_sink_render_into_full(pa_sink *s, pa_memchunk *target);
 
-int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
+int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
 
 static inline int PA_SINK_OPENED(pa_sink_state_t x) {
     return x == PA_SINK_RUNNING || x == PA_SINK_IDLE;
index 946af3e6482d90b9d97712dcc1c51689d73d577d..f2204f26746c23b26859a5b461b6a12e9ec3885e 100644 (file)
@@ -56,7 +56,7 @@ enum {
 
 PA_DECLARE_CLASS(file_stream);
 #define FILE_STREAM(o) (file_stream_cast(o))
-static PA_DEFINE_CHECK_TYPE(file_stream, file_stream_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(file_stream, pa_msgobject);
 
 static void file_stream_free(pa_object *o) {
     file_stream *u = FILE_STREAM(o);
@@ -85,7 +85,7 @@ static void file_stream_drop(file_stream *u) {
     }
 }
 
-static int file_stream_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) {
+static int file_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
     file_stream *u = FILE_STREAM(o);
     file_stream_assert_ref(u);
     
@@ -154,7 +154,7 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
                 pa_memblock_unref(u->memchunk.memblock);
                 pa_memchunk_reset(&u->memchunk);
                 
-                pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u), MESSAGE_DROP_FILE_STREAM, NULL, NULL, NULL);
+                pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u), MESSAGE_DROP_FILE_STREAM, NULL, 0, NULL, NULL);
 
                 sf_close(u->sndfile);
                 u->sndfile = NULL;
@@ -224,7 +224,7 @@ int pa_play_file(
     pa_assert(sink);
     pa_assert(fname);
 
-    u = pa_msgobject_new(file_stream, file_stream_check_type);
+    u = pa_msgobject_new(file_stream);
     u->parent.parent.free = file_stream_free;
     u->parent.process_msg = file_stream_process_msg;
     u->core = sink->core;
index ee76a6e01db8fa67c7af9f1f82ccfc2ee286c5e6..c3ecf3a2bb7f5820dccba8e9d7de093995b1515a 100644 (file)
@@ -38,7 +38,7 @@
 
 #include "source-output.h"
 
-static PA_DEFINE_CHECK_TYPE(pa_source_output, source_output_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(pa_source_output, pa_msgobject);
 
 static void source_output_free(pa_object* mo);
 
@@ -130,12 +130,12 @@ pa_source_output* pa_source_output_new(
         data->resample_method = pa_resampler_get_method(resampler);
     }
 
-    o = pa_msgobject_new(pa_source_output, source_output_check_type);
+    o = pa_msgobject_new(pa_source_output);
     o->parent.parent.free = source_output_free;
     o->parent.process_msg = pa_source_output_process_msg;
 
     o->core = core;
-    o->state = PA_SOURCE_OUTPUT_RUNNING;
+    o->state = data->corked ? PA_SOURCE_OUTPUT_CORKED : PA_SOURCE_OUTPUT_RUNNING;
     o->flags = flags;
     o->name = pa_xstrdup(data->name);
     o->driver = pa_xstrdup(data->driver);
@@ -176,7 +176,7 @@ static int source_output_set_state(pa_source_output *o, pa_source_output_state_t
     if (o->state == state)
         return 0;
 
-    if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+    if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
         return -1;
 
     o->state = state;
@@ -187,7 +187,7 @@ void pa_source_output_disconnect(pa_source_output*o) {
     pa_assert(o);
     pa_return_if_fail(o->state != PA_SOURCE_OUTPUT_DISCONNECTED);
 
-    pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, NULL);
+    pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
 
     pa_idxset_remove_by_data(o->source->core->source_outputs, o, NULL);
     pa_idxset_remove_by_data(o->source->outputs, o, NULL);
@@ -225,7 +225,7 @@ static void source_output_free(pa_object* mo) {
 void pa_source_output_put(pa_source_output *o) {
     pa_source_output_assert_ref(o);
 
-    pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, pa_source_output_ref(o), NULL, (pa_free_cb_t) pa_source_output_unref);
+    pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, pa_source_output_ref(o), 0, NULL, (pa_free_cb_t) pa_source_output_unref);
     pa_source_update_status(o->source);
 
     pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index);
@@ -243,7 +243,7 @@ pa_usec_t pa_source_output_get_latency(pa_source_output *o) {
 
     pa_source_output_assert_ref(o);
 
-    if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
+    if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, &r, 0, NULL) < 0)
         r = 0;
 
     if (o->get_latency)
@@ -293,7 +293,7 @@ int pa_source_output_set_rate(pa_source_output *o, uint32_t rate) {
 
     o->sample_spec.rate = rate;
 
-    pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, NULL);
+    pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), 0, NULL, NULL);
 
     pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, o->index);
     return 0;
@@ -380,7 +380,7 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) {
 /*     return 0; */
 }
 
-int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, pa_memchunk* chunk) {
+int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, int64_t offset, pa_memchunk* chunk) {
     pa_source_output *o = PA_SOURCE_OUTPUT(mo);
 
     pa_source_output_assert_ref(o);
index 7b6afe812d49dc71f0a118d2cd404e9efd97d08b..9f982a9ad43a19f81b03e25aec22a39e9b2ec73e 100644 (file)
@@ -103,6 +103,8 @@ typedef struct pa_source_output_new_data {
     int channel_map_is_set;
 
     pa_resample_method_t resample_method;
+
+    int corked;
 } pa_source_output_new_data;
 
 pa_source_output_new_data* pa_source_output_new_data_init(pa_source_output_new_data *data);
@@ -142,6 +144,6 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest);
 /* To be used exclusively by the source driver thread */
 
 void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk);
-int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, pa_memchunk *chunk);
+int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
 
 #endif
index 6ca81727628ef2b5c2cd29323e9e9492e0a379d8..eaf1335e8169b2e8dd06736ba61a1147c87db796 100644 (file)
@@ -42,7 +42,7 @@
 
 #include "source.h"
 
-static PA_DEFINE_CHECK_TYPE(pa_source, source_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(pa_source, pa_msgobject);
 
 static void source_free(pa_object *o);
 
@@ -73,7 +73,7 @@ pa_source* pa_source_new(
     pa_return_null_if_fail(!driver || pa_utf8_valid(driver));
     pa_return_null_if_fail(pa_utf8_valid(name) && *name);
 
-    s = pa_msgobject_new(pa_source, source_check_type);
+    s = pa_msgobject_new(pa_source);
 
     if (!(name = pa_namereg_register(core, name, PA_NAMEREG_SOURCE, s, fail))) {
         pa_xfree(s);
@@ -140,7 +140,7 @@ static int source_set_state(pa_source *s, pa_source_state_t state) {
         if ((ret = s->set_state(s, state)) < 0)
             return -1;
 
-    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
         return -1;
 
     s->state = state;
@@ -222,7 +222,7 @@ int pa_source_suspend(pa_source *s, int suspend) {
 void pa_source_ping(pa_source *s) {
     pa_source_assert_ref(s);
 
-    pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_PING, NULL, NULL, NULL);
+    pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_PING, NULL, 0, NULL, NULL);
 }
 
 void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
@@ -266,7 +266,7 @@ pa_usec_t pa_source_get_latency(pa_source *s) {
     if (s->get_latency)
         return s->get_latency(s);
 
-    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY, &usec, NULL) < 0)
+    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
         return 0;
 
     return usec;
@@ -285,7 +285,7 @@ void pa_source_set_volume(pa_source *s, const pa_cvolume *volume) {
         s->set_volume = NULL;
 
     if (!s->set_volume)
-        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree);
+        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), 0, NULL, pa_xfree);
 
     if (changed)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -301,7 +301,7 @@ const pa_cvolume *pa_source_get_volume(pa_source *s) {
         s->get_volume = NULL;
 
     if (!s->get_volume && s->refresh_volume)
-        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_VOLUME, &s->volume, NULL);
+        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_VOLUME, &s->volume, 0, NULL);
 
     if (!pa_cvolume_equal(&old_volume, &s->volume))
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -320,7 +320,7 @@ void pa_source_set_mute(pa_source *s, int mute) {
         s->set_mute = NULL;
 
     if (!s->set_mute)
-        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
+        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), 0, NULL, NULL);
 
     if (changed)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -337,7 +337,7 @@ int pa_source_get_mute(pa_source *s) {
         s->get_mute = NULL;
 
     if (!s->get_mute && s->refresh_muted)
-        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MUTE, &s->muted, NULL);
+        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MUTE, &s->muted, 0, NULL);
 
     if (old_muted != s->muted)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -384,7 +384,7 @@ unsigned pa_source_used_by(pa_source *s) {
     return pa_idxset_size(s->outputs);
 }
 
-int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, pa_memchunk *chunk) {
+int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
     pa_source *s = PA_SOURCE(object);
     pa_source_assert_ref(s);
 
index e2b02cebdb8980a996ce4f9d8fdb8f5b095853be..fe59e584bb09885d56af0dc1f95c17bcaffae114 100644 (file)
@@ -146,7 +146,7 @@ unsigned pa_source_used_by(pa_source *s);
 /* To be used exclusively by the source driver thread */
 
 void pa_source_post(pa_source*s, const pa_memchunk *b);
-int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
+int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, int64_t, pa_memchunk *chunk);
 
 static inline int PA_SOURCE_OPENED(pa_source_state_t x) {
     return x == PA_SOURCE_RUNNING || x == PA_SOURCE_IDLE;
index 847d5be1881c57d0e191d4deedc8ef2ccef2ca05..baf93a0c36ba48124b655aa15653fabff33a8f0c 100644 (file)
@@ -49,7 +49,7 @@ static void the_thread(void *_q) {
     do {
         int code = 0;
 
-        pa_assert_se(pa_asyncmsgq_get(q, NULL, &code, NULL, NULL, 1) == 0);
+        pa_assert_se(pa_asyncmsgq_get(q, NULL, &code, NULL, NULL, NULL, 1) == 0);
 
         switch (code) {
 
@@ -85,22 +85,22 @@ int main(int argc, char *argv[]) {
     pa_assert_se(t = pa_thread_new(the_thread, q));
 
     printf("Operation A post\n");
-    pa_asyncmsgq_post(q, NULL, OPERATION_A, NULL, NULL, NULL);
+    pa_asyncmsgq_post(q, NULL, OPERATION_A, NULL, 0, NULL, NULL);
 
     pa_thread_yield();
 
     printf("Operation B post\n");
-    pa_asyncmsgq_post(q, NULL, OPERATION_B, NULL, NULL, NULL);
+    pa_asyncmsgq_post(q, NULL, OPERATION_B, NULL, 0, NULL, NULL);
     
     pa_thread_yield();
 
     printf("Operation C send\n");
-    pa_asyncmsgq_send(q, NULL, OPERATION_C, NULL, NULL);
+    pa_asyncmsgq_send(q, NULL, OPERATION_C, NULL, 0, NULL);
 
     pa_thread_yield();
 
     printf("Quit post\n");
-    pa_asyncmsgq_post(q, NULL, QUIT, NULL, NULL, NULL);
+    pa_asyncmsgq_post(q, NULL, QUIT, NULL, 0, NULL, NULL);
 
     pa_thread_free(t);