]> code.delx.au - pulseaudio/blobdiff - src/modules/module-tunnel.c
alsa-mixer: Make sure capture source and input source use right path
[pulseaudio] / src / modules / module-tunnel.c
index 5c7a6e55c72922b5d591ee1bb8fed52df34510d3..176c2c00d859635f0067a3bdab669f98fe8cb248 100644 (file)
@@ -6,7 +6,7 @@
 
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as published
 
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as published
-  by the Free Software Foundation; either version 2 of the License,
+  by the Free Software Foundation; either version 2.1 of the License,
   or (at your option) any later version.
 
   PulseAudio is distributed in the hope that it will be useful, but
   or (at your option) any later version.
 
   PulseAudio is distributed in the hope that it will be useful, but
@@ -31,6 +31,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 
 #include <stdio.h>
 #include <stdlib.h>
 
+#include <pulse/rtclock.h>
 #include <pulse/timeval.h>
 #include <pulse/util.h>
 #include <pulse/version.h>
 #include <pulse/timeval.h>
 #include <pulse/util.h>
 #include <pulse/version.h>
 #include <pulsecore/time-smoother.h>
 #include <pulsecore/thread.h>
 #include <pulsecore/thread-mq.h>
 #include <pulsecore/time-smoother.h>
 #include <pulsecore/thread.h>
 #include <pulsecore/thread-mq.h>
-#include <pulsecore/rtclock.h>
+#include <pulsecore/core-rtclock.h>
 #include <pulsecore/core-error.h>
 #include <pulsecore/proplist-util.h>
 #include <pulsecore/auth-cookie.h>
 #include <pulsecore/core-error.h>
 #include <pulsecore/proplist-util.h>
 #include <pulsecore/auth-cookie.h>
+#include <pulsecore/mcalign.h>
 
 #ifdef TUNNEL_SINK
 #include "module-tunnel-sink-symdef.h"
 
 #ifdef TUNNEL_SINK
 #include "module-tunnel-sink-symdef.h"
 #ifdef TUNNEL_SINK
 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
 PA_MODULE_USAGE(
 #ifdef TUNNEL_SINK
 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
 PA_MODULE_USAGE(
+        "sink_name=<name for the local sink> "
+        "sink_properties=<properties for the local sink> "
         "server=<address> "
         "sink=<remote sink name> "
         "cookie=<filename> "
         "format=<sample format> "
         "channels=<number of channels> "
         "rate=<sample rate> "
         "server=<address> "
         "sink=<remote sink name> "
         "cookie=<filename> "
         "format=<sample format> "
         "channels=<number of channels> "
         "rate=<sample rate> "
-        "sink_name=<name for the local sink> "
         "channel_map=<channel map>");
 #else
 PA_MODULE_DESCRIPTION("Tunnel module for sources");
 PA_MODULE_USAGE(
         "channel_map=<channel map>");
 #else
 PA_MODULE_DESCRIPTION("Tunnel module for sources");
 PA_MODULE_USAGE(
+        "source_name=<name for the local source> "
+        "source_properties=<properties for the local source> "
         "server=<address> "
         "source=<remote source name> "
         "cookie=<filename> "
         "format=<sample format> "
         "channels=<number of channels> "
         "rate=<sample rate> "
         "server=<address> "
         "source=<remote source name> "
         "cookie=<filename> "
         "format=<sample format> "
         "channels=<number of channels> "
         "rate=<sample rate> "
-        "source_name=<name for the local source> "
         "channel_map=<channel map>");
 #endif
 
         "channel_map=<channel map>");
 #endif
 
@@ -97,9 +101,11 @@ static const char* const valid_modargs[] = {
     "rate",
 #ifdef TUNNEL_SINK
     "sink_name",
     "rate",
 #ifdef TUNNEL_SINK
     "sink_name",
+    "sink_properties",
     "sink",
 #else
     "source_name",
     "sink",
 #else
     "source_name",
+    "source_properties",
     "source",
 #endif
     "channel_map",
     "source",
 #endif
     "channel_map",
@@ -108,7 +114,7 @@ static const char* const valid_modargs[] = {
 
 #define DEFAULT_TIMEOUT 5
 
 
 #define DEFAULT_TIMEOUT 5
 
-#define LATENCY_INTERVAL 10
+#define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
 
 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
 
 
 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
 
@@ -145,6 +151,8 @@ static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t t
 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 
 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
 #ifdef TUNNEL_SINK
 
 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
 #ifdef TUNNEL_SINK
@@ -159,7 +167,12 @@ static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
     [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
     [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
     [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
     [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
     [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
     [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
-    [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved
+    [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
+    [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
+    [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
+    [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
+    [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
+    [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
 };
 
 struct userdata {
 };
 
 struct userdata {
@@ -182,6 +195,7 @@ struct userdata {
 #else
     char *source_name;
     pa_source *source;
 #else
     char *source_name;
     pa_source *source;
+    pa_mcalign *mcalign;
 #endif
 
     pa_auth_cookie *auth_cookie;
 #endif
 
     pa_auth_cookie *auth_cookie;
@@ -196,8 +210,8 @@ struct userdata {
     pa_bool_t remote_corked:1;
     pa_bool_t remote_suspended:1;
 
     pa_bool_t remote_corked:1;
     pa_bool_t remote_suspended:1;
 
-    pa_usec_t transport_usec;
-    pa_bool_t transport_usec_valid;
+    pa_usec_t transport_usec; /* maintained in the main thread */
+    pa_usec_t thread_transport_usec; /* maintained in the IO thread */
 
     uint32_t ignore_latency_before;
 
 
     uint32_t ignore_latency_before;
 
@@ -221,6 +235,11 @@ struct userdata {
 
 static void request_latency(struct userdata *u);
 
 
 static void request_latency(struct userdata *u);
 
+/* Called from main context */
+static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
+    pa_log_debug("Got stream or client event.");
+}
+
 /* Called from main context */
 static void command_stream_killed(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
     struct userdata *u = userdata;
 /* Called from main context */
 static void command_stream_killed(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
     struct userdata *u = userdata;
@@ -261,11 +280,14 @@ static void command_suspended(pa_pdispatch *pd,  uint32_t command,  uint32_t tag
     if (pa_tagstruct_getu32(t, &channel) < 0 ||
         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
         !pa_tagstruct_eof(t)) {
     if (pa_tagstruct_getu32(t, &channel) < 0 ||
         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
         !pa_tagstruct_eof(t)) {
-        pa_log("Invalid packet");
+
+        pa_log("Invalid packet.");
         pa_module_unload_request(u->module, TRUE);
         return;
     }
 
         pa_module_unload_request(u->module, TRUE);
         return;
     }
 
+    pa_log_debug("Server reports device suspend.");
+
 #ifdef TUNNEL_SINK
     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
 #else
 #ifdef TUNNEL_SINK
     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
 #else
@@ -278,13 +300,78 @@ static void command_suspended(pa_pdispatch *pd,  uint32_t command,  uint32_t tag
 /* Called from main context */
 static void command_moved(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
     struct userdata *u = userdata;
 /* Called from main context */
 static void command_moved(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
     struct userdata *u = userdata;
+    uint32_t channel, di;
+    const char *dn;
+    pa_bool_t suspended;
 
     pa_assert(pd);
     pa_assert(t);
     pa_assert(u);
     pa_assert(u->pdispatch == pd);
 
 
     pa_assert(pd);
     pa_assert(t);
     pa_assert(u);
     pa_assert(u->pdispatch == pd);
 
+    if (pa_tagstruct_getu32(t, &channel) < 0 ||
+        pa_tagstruct_getu32(t, &di) < 0 ||
+        pa_tagstruct_gets(t, &dn) < 0 ||
+        pa_tagstruct_get_boolean(t, &suspended) < 0) {
+
+        pa_log_error("Invalid packet.");
+        pa_module_unload_request(u->module, TRUE);
+        return;
+    }
+
     pa_log_debug("Server reports a stream move.");
     pa_log_debug("Server reports a stream move.");
+
+#ifdef TUNNEL_SINK
+    pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
+#else
+    pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
+#endif
+
+    request_latency(u);
+}
+
+static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
+    struct userdata *u = userdata;
+    uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
+    pa_usec_t usec;
+
+    pa_assert(pd);
+    pa_assert(t);
+    pa_assert(u);
+    pa_assert(u->pdispatch == pd);
+
+    if (pa_tagstruct_getu32(t, &channel) < 0 ||
+        pa_tagstruct_getu32(t, &maxlength) < 0) {
+
+        pa_log_error("Invalid packet.");
+        pa_module_unload_request(u->module, TRUE);
+        return;
+    }
+
+    if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
+        if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
+            pa_tagstruct_get_usec(t, &usec) < 0) {
+
+            pa_log_error("Invalid packet.");
+            pa_module_unload_request(u->module, TRUE);
+            return;
+        }
+    } else {
+        if (pa_tagstruct_getu32(t, &tlength) < 0 ||
+            pa_tagstruct_getu32(t, &prebuf) < 0 ||
+            pa_tagstruct_getu32(t, &minreq) < 0 ||
+            pa_tagstruct_get_usec(t, &usec) < 0) {
+
+            pa_log_error("Invalid packet.");
+            pa_module_unload_request(u->module, TRUE);
+            return;
+        }
+    }
+
+#ifdef TUNNEL_SINK
+    pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
+#endif
+
     request_latency(u);
 }
 
     request_latency(u);
 }
 
@@ -306,26 +393,37 @@ static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa
 #endif
 
 /* Called from IO thread context */
 #endif
 
 /* Called from IO thread context */
-static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
+static void check_smoother_status(struct userdata *u, pa_bool_t past)  {
     pa_usec_t x;
     pa_usec_t x;
+
     pa_assert(u);
 
     pa_assert(u);
 
-    if (u->remote_corked == cork)
-        return;
+    x = pa_rtclock_now();
 
 
-    u->remote_corked = cork;
-    x = pa_rtclock_usec();
+    /* Correct by the time the requested issued needs to travel to the
+     * other side.  This is a valid thread-safe access, because the
+     * main thread is waiting for us */
 
 
-    /* Correct by the time this needs to travel to the other side.
-     * This is a valid thread-safe access, because the main thread is
-     * waiting for us */
-    if (u->transport_usec_valid)
-        x += u->transport_usec;
+    if (past)
+        x -= u->thread_transport_usec;
+    else
+        x += u->thread_transport_usec;
 
     if (u->remote_suspended || u->remote_corked)
         pa_smoother_pause(u->smoother, x);
     else
 
     if (u->remote_suspended || u->remote_corked)
         pa_smoother_pause(u->smoother, x);
     else
-        pa_smoother_resume(u->smoother, x);
+        pa_smoother_resume(u->smoother, x, TRUE);
+}
+
+/* Called from IO thread context */
+static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
+    pa_assert(u);
+
+    if (u->remote_corked == cork)
+        return;
+
+    u->remote_corked = cork;
+    check_smoother_status(u, FALSE);
 }
 
 /* Called from main context */
 }
 
 /* Called from main context */
@@ -352,26 +450,13 @@ static void stream_cork(struct userdata *u, pa_bool_t cork) {
 
 /* Called from IO thread context */
 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
 
 /* Called from IO thread context */
 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
-    pa_usec_t x;
     pa_assert(u);
 
     if (u->remote_suspended == suspend)
         return;
 
     u->remote_suspended = suspend;
     pa_assert(u);
 
     if (u->remote_suspended == suspend)
         return;
 
     u->remote_suspended = suspend;
-
-    x = pa_rtclock_usec();
-
-    /* Correct by the time this needed to travel from the other side.
-     * This is a valid thread-safe access, because the main thread is
-     * waiting for us */
-    if (u->transport_usec_valid)
-        x -= u->transport_usec;
-
-    if (u->remote_suspended || u->remote_corked)
-        pa_smoother_pause(u->smoother, x);
-    else
-        pa_smoother_resume(u->smoother, x);
+    check_smoother_status(u, TRUE);
 }
 
 #ifdef TUNNEL_SINK
 }
 
 #ifdef TUNNEL_SINK
@@ -418,7 +503,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
             pa_usec_t yl, yr, *usec = data;
 
             yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
             pa_usec_t yl, yr, *usec = data;
 
             yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
-            yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
+            yr = pa_smoother_get(u->smoother, pa_rtclock_now());
 
             *usec = yl > yr ? yl - yr : 0;
             return 0;
 
             *usec = yl > yr ? yl - yr : 0;
             return 0;
@@ -446,12 +531,15 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 
             y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
 
 
             y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
 
-            if (y > (pa_usec_t) offset || offset < 0)
+            if (y > (pa_usec_t) offset)
                 y -= (pa_usec_t) offset;
             else
                 y = 0;
 
                 y -= (pa_usec_t) offset;
             else
                 y = 0;
 
-            pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
+            pa_smoother_put(u->smoother, pa_rtclock_now(), y);
+
+            /* We can access this freely here, since the main thread is waiting for us */
+            u->thread_transport_usec = u->transport_usec;
 
             return 0;
         }
 
             return 0;
         }
@@ -522,20 +610,29 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
             pa_usec_t yr, yl, *usec = data;
 
             yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
             pa_usec_t yr, yl, *usec = data;
 
             yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
-            yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
+            yr = pa_smoother_get(u->smoother, pa_rtclock_now());
 
             *usec = yr > yl ? yr - yl : 0;
             return 0;
         }
 
 
             *usec = yr > yl ? yr - yl : 0;
             return 0;
         }
 
-        case SOURCE_MESSAGE_POST:
+        case SOURCE_MESSAGE_POST: {
+            pa_memchunk c;
+
+            pa_mcalign_push(u->mcalign, chunk);
+
+            while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
+
+                if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
+                    pa_source_post(u->source, &c);
 
 
-            if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
-                pa_source_post(u->source, chunk);
+                pa_memblock_unref(c.memblock);
 
 
-            u->counter += (int64_t) chunk->length;
+                u->counter += (int64_t) c.length;
+            }
 
             return 0;
 
             return 0;
+        }
 
         case SOURCE_MESSAGE_REMOTE_SUSPEND:
 
 
         case SOURCE_MESSAGE_REMOTE_SUSPEND:
 
@@ -546,13 +643,12 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
             pa_usec_t y;
 
             y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
             pa_usec_t y;
 
             y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
+            y += (pa_usec_t) offset;
 
 
-            if (offset >= 0 || y > (pa_usec_t) -offset)
-                y += (pa_usec_t) offset;
-            else
-                y = 0;
+            pa_smoother_put(u->smoother, pa_rtclock_now(), y);
 
 
-            pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
+            /* We can access this freely here, since the main thread is waiting for us */
+            u->thread_transport_usec = u->transport_usec;
 
             return 0;
         }
 
             return 0;
         }
@@ -599,14 +695,13 @@ static void thread_func(void *userdata) {
     pa_log_debug("Thread starting up");
 
     pa_thread_mq_install(&u->thread_mq);
     pa_log_debug("Thread starting up");
 
     pa_thread_mq_install(&u->thread_mq);
-    pa_rtpoll_install(u->rtpoll);
 
     for (;;) {
         int ret;
 
 #ifdef TUNNEL_SINK
         if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
 
     for (;;) {
         int ret;
 
 #ifdef TUNNEL_SINK
         if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
-            if (u->sink->thread_info.rewind_requested)
+            if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
                 pa_sink_process_rewind(u->sink, 0);
 #endif
 
                 pa_sink_process_rewind(u->sink, 0);
 #endif
 
@@ -646,7 +741,7 @@ static void command_request(pa_pdispatch *pd, uint32_t command,  uint32_t tag, p
     }
 
     if (channel != u->channel) {
     }
 
     if (channel != u->channel) {
-        pa_log("Recieved data for invalid channel");
+        pa_log("Received data for invalid channel");
         goto fail;
     }
 
         goto fail;
     }
 
@@ -662,7 +757,7 @@ fail:
 /* Called from main context */
 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
     struct userdata *u = userdata;
 /* Called from main context */
 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
     struct userdata *u = userdata;
-    pa_usec_t sink_usec, source_usec, transport_usec;
+    pa_usec_t sink_usec, source_usec;
     pa_bool_t playing;
     int64_t write_index, read_index;
     struct timeval local, remote, now;
     pa_bool_t playing;
     int64_t write_index, read_index;
     struct timeval local, remote, now;
@@ -709,7 +804,6 @@ static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint
     }
 
     if (tag < u->ignore_latency_before) {
     }
 
     if (tag < u->ignore_latency_before) {
-        request_latency(u);
         return;
     }
 
         return;
     }
 
@@ -725,7 +819,6 @@ static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint
 #endif
     } else
         u->transport_usec = pa_timeval_diff(&now, &local)/2;
 #endif
     } else
         u->transport_usec = pa_timeval_diff(&now, &local)/2;
-    u->transport_usec_valid = TRUE;
 
     /* First, take the device's delay */
 #ifdef TUNNEL_SINK
 
     /* First, take the device's delay */
 #ifdef TUNNEL_SINK
@@ -745,9 +838,9 @@ static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint
     /* Our measurements are already out of date, hence correct by the     *
      * transport latency */
 #ifdef TUNNEL_SINK
     /* Our measurements are already out of date, hence correct by the     *
      * transport latency */
 #ifdef TUNNEL_SINK
-    delay -= (int64_t) transport_usec;
+    delay -= (int64_t) u->transport_usec;
 #else
 #else
-    delay += (int64_t) transport_usec;
+    delay += (int64_t) u->transport_usec;
 #endif
 
     /* Now correct by what we have have read/written since we requested the update */
 #endif
 
     /* Now correct by what we have have read/written since we requested the update */
@@ -786,8 +879,7 @@ static void request_latency(struct userdata *u) {
     pa_tagstruct_putu32(t, tag = u->ctag++);
     pa_tagstruct_putu32(t, u->channel);
 
     pa_tagstruct_putu32(t, tag = u->ctag++);
     pa_tagstruct_putu32(t, u->channel);
 
-    pa_gettimeofday(&now);
-    pa_tagstruct_put_timeval(t, &now);
+    pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
 
     pa_pstream_send_tagstruct(u->pstream, t);
     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
 
     pa_pstream_send_tagstruct(u->pstream, t);
     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
@@ -797,9 +889,8 @@ static void request_latency(struct userdata *u) {
 }
 
 /* Called from main context */
 }
 
 /* Called from main context */
-static void timeout_callback(pa_mainloop_api *m, pa_time_event*e,  const struct timeval *tv, void *userdata) {
+static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
     struct userdata *u = userdata;
     struct userdata *u = userdata;
-    struct timeval ntv;
 
     pa_assert(m);
     pa_assert(e);
 
     pa_assert(m);
     pa_assert(e);
@@ -807,9 +898,7 @@ static void timeout_callback(pa_mainloop_api *m, pa_time_event*e,  const struct
 
     request_latency(u);
 
 
     request_latency(u);
 
-    pa_gettimeofday(&ntv);
-    ntv.tv_sec += LATENCY_INTERVAL;
-    m->time_restart(e, &ntv);
+    pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
 }
 
 /* Called from main context */
 }
 
 /* Called from main context */
@@ -861,6 +950,7 @@ static void update_description(struct userdata *u) {
 static void server_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
     struct userdata *u = userdata;
     pa_sample_spec ss;
 static void server_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
     struct userdata *u = userdata;
     pa_sample_spec ss;
+    pa_channel_map cm;
     const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
     uint32_t cookie;
 
     const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
     uint32_t cookie;
 
@@ -882,7 +972,9 @@ static void server_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa
         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
         pa_tagstruct_gets(t, &default_sink_name) < 0 ||
         pa_tagstruct_gets(t, &default_source_name) < 0 ||
         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
         pa_tagstruct_gets(t, &default_sink_name) < 0 ||
         pa_tagstruct_gets(t, &default_source_name) < 0 ||
-        pa_tagstruct_getu32(t, &cookie) < 0) {
+        pa_tagstruct_getu32(t, &cookie) < 0 ||
+        (u->version >= 15 &&
+         pa_tagstruct_get_channel_map(t, &cm) < 0)) {
 
         pa_log("Parse failure");
         goto fail;
 
         pa_log("Parse failure");
         goto fail;
@@ -963,6 +1055,47 @@ static void sink_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_t
         }
     }
 
         }
     }
 
+    if (u->version >= 15) {
+        pa_volume_t base_volume;
+        uint32_t state, n_volume_steps, card;
+
+        if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
+            pa_tagstruct_getu32(t, &state) < 0 ||
+            pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
+            pa_tagstruct_getu32(t, &card) < 0) {
+
+            pa_log("Parse failure");
+            goto fail;
+        }
+    }
+
+    if (u->version >= 16) {
+        uint32_t n_ports;
+        const char *s;
+
+        if (pa_tagstruct_getu32(t, &n_ports)) {
+            pa_log("Parse failure");
+            goto fail;
+        }
+
+        for (uint32_t j = 0; j < n_ports; j++) {
+            uint32_t priority;
+
+            if (pa_tagstruct_gets(t, &s) < 0 || /* name */
+                pa_tagstruct_gets(t, &s) < 0 || /* description */
+                pa_tagstruct_getu32(t, &priority) < 0) {
+
+                pa_log("Parse failure");
+                goto fail;
+            }
+        }
+
+        if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
+            pa_log("Parse failure");
+            goto fail;
+        }
+    }
+
     if (!pa_tagstruct_eof(t)) {
         pa_log("Packet too long");
         goto fail;
     if (!pa_tagstruct_eof(t)) {
         pa_log("Packet too long");
         goto fail;
@@ -991,7 +1124,7 @@ static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag
     uint32_t idx, owner_module, client, sink;
     pa_usec_t buffer_usec, sink_usec;
     const char *name, *driver, *resample_method;
     uint32_t idx, owner_module, client, sink;
     pa_usec_t buffer_usec, sink_usec;
     const char *name, *driver, *resample_method;
-    pa_bool_t mute;
+    pa_bool_t mute = FALSE;
     pa_sample_spec sample_spec;
     pa_channel_map channel_map;
     pa_cvolume volume;
     pa_sample_spec sample_spec;
     pa_channel_map channel_map;
     pa_cvolume volume;
@@ -1056,15 +1189,14 @@ static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag
     pa_assert(u->sink);
 
     if ((u->version < 11 || !!mute == !!u->sink->muted) &&
     pa_assert(u->sink);
 
     if ((u->version < 11 || !!mute == !!u->sink->muted) &&
-        pa_cvolume_equal(&volume, &u->sink->virtual_volume))
+        pa_cvolume_equal(&volume, &u->sink->real_volume))
         return;
 
         return;
 
-    memcpy(&u->sink->virtual_volume, &volume, sizeof(pa_cvolume));
+    pa_sink_volume_changed(u->sink, &volume);
 
     if (u->version >= 11)
 
     if (u->version >= 11)
-        u->sink->muted = !!mute;
+        pa_sink_mute_changed(u->sink, mute);
 
 
-    pa_subscription_post(u->sink->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, u->sink->index);
     return;
 
 fail:
     return;
 
 fail:
@@ -1126,6 +1258,47 @@ static void source_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa
         }
     }
 
         }
     }
 
+    if (u->version >= 15) {
+        pa_volume_t base_volume;
+        uint32_t state, n_volume_steps, card;
+
+        if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
+            pa_tagstruct_getu32(t, &state) < 0 ||
+            pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
+            pa_tagstruct_getu32(t, &card) < 0) {
+
+            pa_log("Parse failure");
+            goto fail;
+        }
+    }
+
+    if (u->version >= 16) {
+        uint32_t n_ports;
+        const char *s;
+
+        if (pa_tagstruct_getu32(t, &n_ports)) {
+            pa_log("Parse failure");
+            goto fail;
+        }
+
+        for (uint32_t j = 0; j < n_ports; j++) {
+            uint32_t priority;
+
+            if (pa_tagstruct_gets(t, &s) < 0 || /* name */
+                pa_tagstruct_gets(t, &s) < 0 || /* description */
+                pa_tagstruct_getu32(t, &priority) < 0) {
+
+                pa_log("Parse failure");
+                goto fail;
+            }
+        }
+
+        if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
+            pa_log("Parse failure");
+            goto fail;
+        }
+    }
+
     if (!pa_tagstruct_eof(t)) {
         pa_log("Packet too long");
         goto fail;
     if (!pa_tagstruct_eof(t)) {
         pa_log("Packet too long");
         goto fail;
@@ -1226,12 +1399,11 @@ static void command_subscribe_event(pa_pdispatch *pd,  uint32_t command,  uint32
 /* Called from main context */
 static void start_subscribe(struct userdata *u) {
     pa_tagstruct *t;
 /* Called from main context */
 static void start_subscribe(struct userdata *u) {
     pa_tagstruct *t;
-    uint32_t tag;
     pa_assert(u);
 
     t = pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
     pa_assert(u);
 
     t = pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
-    pa_tagstruct_putu32(t, tag = u->ctag++);
+    pa_tagstruct_putu32(t, u->ctag++);
     pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
 #ifdef TUNNEL_SINK
                         PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
     pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
 #ifdef TUNNEL_SINK
                         PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
@@ -1246,7 +1418,6 @@ static void start_subscribe(struct userdata *u) {
 /* Called from main context */
 static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
     struct userdata *u = userdata;
 /* Called from main context */
 static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
     struct userdata *u = userdata;
-    struct timeval ntv;
 #ifdef TUNNEL_SINK
     uint32_t bytes;
 #endif
 #ifdef TUNNEL_SINK
     uint32_t bytes;
 #endif
@@ -1314,11 +1485,11 @@ static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t
         if (pa_tagstruct_get_usec(t, &usec) < 0)
             goto parse_error;
 
         if (pa_tagstruct_get_usec(t, &usec) < 0)
             goto parse_error;
 
-#ifdef TUNNEL_SINK
-        pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0);
-#else
-        pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0);
-#endif
+/* #ifdef TUNNEL_SINK */
+/*         pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
+/* #else */
+/*         pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
+/* #endif */
     }
 
     if (!pa_tagstruct_eof(t))
     }
 
     if (!pa_tagstruct_eof(t))
@@ -1328,9 +1499,7 @@ static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t
     request_info(u);
 
     pa_assert(!u->time_event);
     request_info(u);
 
     pa_assert(!u->time_event);
-    pa_gettimeofday(&ntv);
-    ntv.tv_sec += LATENCY_INTERVAL;
-    u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
+    u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
 
     request_latency(u);
 
 
     request_latency(u);
 
@@ -1391,11 +1560,17 @@ static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t
     pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
 
 #ifdef TUNNEL_SINK
     pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
 
 #ifdef TUNNEL_SINK
+    pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
+    pa_sink_update_proplist(u->sink, 0, NULL);
+
     pa_snprintf(name, sizeof(name), "%s for %s@%s",
                 u->sink_name,
                 pa_get_user_name(un, sizeof(un)),
                 pa_get_host_name(hn, sizeof(hn)));
 #else
     pa_snprintf(name, sizeof(name), "%s for %s@%s",
                 u->sink_name,
                 pa_get_user_name(un, sizeof(un)),
                 pa_get_host_name(hn, sizeof(hn)));
 #else
+    pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
+    pa_source_update_proplist(u->source, 0, NULL);
+
     pa_snprintf(name, sizeof(name), "%s for %s@%s",
                 u->source_name,
                 pa_get_user_name(un, sizeof(un)),
     pa_snprintf(name, sizeof(name), "%s for %s@%s",
                 u->source_name,
                 pa_get_user_name(un, sizeof(un)),
@@ -1404,14 +1579,14 @@ static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t
 
     reply = pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
 
     reply = pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
-    pa_tagstruct_putu32(reply, tag = u->ctag++);
+    pa_tagstruct_putu32(reply, u->ctag++);
 
     if (u->version >= 13) {
         pa_proplist *pl;
         pl = pa_proplist_new();
 
     if (u->version >= 13) {
         pa_proplist *pl;
         pl = pa_proplist_new();
-        pa_init_proplist(pl);
         pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
         pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
         pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
         pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
+        pa_init_proplist(pl);
         pa_tagstruct_put_proplist(reply, pl);
         pa_proplist_free(pl);
     } else
         pa_tagstruct_put_proplist(reply, pl);
         pa_proplist_free(pl);
     } else
@@ -1503,6 +1678,14 @@ static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t
         pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
     }
 
         pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
     }
 
+    if (u->version >= 15) {
+#ifdef TUNNEL_SINK
+        pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
+#endif
+        pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
+        pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
+    }
+
     pa_pstream_send_tagstruct(u->pstream, reply);
     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
 
     pa_pstream_send_tagstruct(u->pstream, reply);
     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
 
@@ -1550,7 +1733,7 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
     pa_assert(u);
 
     if (channel != u->channel) {
     pa_assert(u);
 
     if (channel != u->channel) {
-        pa_log("Recieved memory block on bad channel.");
+        pa_log("Received memory block on bad channel.");
         pa_module_unload_request(u->module, TRUE);
         return;
     }
         pa_module_unload_request(u->module, TRUE);
         return;
     }
@@ -1559,7 +1742,6 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
 
     u->counter_delta += (int64_t) chunk->length;
 }
 
     u->counter_delta += (int64_t) chunk->length;
 }
-
 #endif
 
 /* Called from main context */
 #endif
 
 /* Called from main context */
@@ -1582,7 +1764,7 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata
     }
 
     u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
     }
 
     u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
-    u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
+    u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
 
     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
     pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
 
     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
     pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
@@ -1624,7 +1806,6 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata
 static void sink_set_volume(pa_sink *sink) {
     struct userdata *u;
     pa_tagstruct *t;
 static void sink_set_volume(pa_sink *sink) {
     struct userdata *u;
     pa_tagstruct *t;
-    uint32_t tag;
 
     pa_assert(sink);
     u = sink->userdata;
 
     pa_assert(sink);
     u = sink->userdata;
@@ -1632,9 +1813,9 @@ static void sink_set_volume(pa_sink *sink) {
 
     t = pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
 
     t = pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
-    pa_tagstruct_putu32(t, tag = u->ctag++);
+    pa_tagstruct_putu32(t, u->ctag++);
     pa_tagstruct_putu32(t, u->device_index);
     pa_tagstruct_putu32(t, u->device_index);
-    pa_tagstruct_put_cvolume(t, &sink->virtual_volume);
+    pa_tagstruct_put_cvolume(t, &sink->real_volume);
     pa_pstream_send_tagstruct(u->pstream, t);
 }
 
     pa_pstream_send_tagstruct(u->pstream, t);
 }
 
@@ -1642,7 +1823,6 @@ static void sink_set_volume(pa_sink *sink) {
 static void sink_set_mute(pa_sink *sink) {
     struct userdata *u;
     pa_tagstruct *t;
 static void sink_set_mute(pa_sink *sink) {
     struct userdata *u;
     pa_tagstruct *t;
-    uint32_t tag;
 
     pa_assert(sink);
     u = sink->userdata;
 
     pa_assert(sink);
     u = sink->userdata;
@@ -1653,7 +1833,7 @@ static void sink_set_mute(pa_sink *sink) {
 
     t = pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
 
     t = pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
-    pa_tagstruct_putu32(t, tag = u->ctag++);
+    pa_tagstruct_putu32(t, u->ctag++);
     pa_tagstruct_putu32(t, u->device_index);
     pa_tagstruct_put_boolean(t, !!sink->muted);
     pa_pstream_send_tagstruct(u->pstream, t);
     pa_tagstruct_putu32(t, u->device_index);
     pa_tagstruct_put_boolean(t, !!sink->muted);
     pa_pstream_send_tagstruct(u->pstream, t);
@@ -1695,13 +1875,19 @@ int pa__init(pa_module*m) {
     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
     u->source = NULL;
 #endif
     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
     u->source = NULL;
 #endif
-    u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10);
+    u->smoother = pa_smoother_new(
+            PA_USEC_PER_SEC,
+            PA_USEC_PER_SEC*2,
+            TRUE,
+            TRUE,
+            10,
+            pa_rtclock_now(),
+            FALSE);
     u->ctag = 1;
     u->device_index = u->channel = PA_INVALID_INDEX;
     u->time_event = NULL;
     u->ignore_latency_before = 0;
     u->ctag = 1;
     u->device_index = u->channel = PA_INVALID_INDEX;
     u->time_event = NULL;
     u->ignore_latency_before = 0;
-    u->transport_usec = 0;
-    u->transport_usec_valid = FALSE;
+    u->transport_usec = u->thread_transport_usec = 0;
     u->remote_suspended = u->remote_corked = FALSE;
     u->counter = u->counter_delta = 0;
 
     u->remote_suspended = u->remote_corked = FALSE;
     u->counter = u->counter_delta = 0;
 
@@ -1717,12 +1903,13 @@ int pa__init(pa_module*m) {
     }
 
     ss = m->core->default_sample_spec;
     }
 
     ss = m->core->default_sample_spec;
+    map = m->core->default_channel_map;
     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
         pa_log("Invalid sample format specification");
         goto fail;
     }
 
     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
         pa_log("Invalid sample format specification");
         goto fail;
     }
 
-    if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
+    if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
         pa_log("Failed to connect to server '%s'", u->server_name);
         goto fail;
     }
         pa_log("Failed to connect to server '%s'", u->server_name);
         goto fail;
     }
@@ -1732,7 +1919,7 @@ int pa__init(pa_module*m) {
 #ifdef TUNNEL_SINK
 
     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
 #ifdef TUNNEL_SINK
 
     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
-        dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
+        dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
 
     pa_sink_new_data_init(&data);
     data.driver = __FILE__;
 
     pa_sink_new_data_init(&data);
     data.driver = __FILE__;
@@ -1746,7 +1933,13 @@ int pa__init(pa_module*m) {
     if (u->sink_name)
         pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
 
     if (u->sink_name)
         pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
 
-    u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL);
+    if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
+        pa_log("Invalid properties");
+        pa_sink_new_data_done(&data);
+        goto fail;
+    }
+
+    u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
     pa_sink_new_data_done(&data);
 
     if (!u->sink) {
     pa_sink_new_data_done(&data);
 
     if (!u->sink) {
@@ -1762,7 +1955,7 @@ int pa__init(pa_module*m) {
 
     u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
 
 
     u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
 
-    pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0);
+/*     pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
 
     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
     pa_sink_set_rtpoll(u->sink, u->rtpoll);
 
     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
     pa_sink_set_rtpoll(u->sink, u->rtpoll);
@@ -1770,7 +1963,7 @@ int pa__init(pa_module*m) {
 #else
 
     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
 #else
 
     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
-        dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
+        dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
 
     pa_source_new_data_init(&data);
     data.driver = __FILE__;
 
     pa_source_new_data_init(&data);
     data.driver = __FILE__;
@@ -1784,6 +1977,12 @@ int pa__init(pa_module*m) {
     if (u->source_name)
         pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
 
     if (u->source_name)
         pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
 
+    if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
+        pa_log("Invalid properties");
+        pa_source_new_data_done(&data);
+        goto fail;
+    }
+
     u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
     pa_source_new_data_done(&data);
 
     u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
     pa_source_new_data_done(&data);
 
@@ -1796,26 +1995,26 @@ int pa__init(pa_module*m) {
     u->source->set_state = source_set_state;
     u->source->userdata = u;
 
     u->source->set_state = source_set_state;
     u->source->userdata = u;
 
-    pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0);
+/*     pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
 
     pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
     pa_source_set_rtpoll(u->source, u->rtpoll);
 
     pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
     pa_source_set_rtpoll(u->source, u->rtpoll);
+
+    u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
 #endif
 
     pa_xfree(dn);
 
     u->time_event = NULL;
 
 #endif
 
     pa_xfree(dn);
 
     u->time_event = NULL;
 
-    u->maxlength = 0;
+    u->maxlength = (uint32_t) -1;
 #ifdef TUNNEL_SINK
 #ifdef TUNNEL_SINK
-    u->tlength = u->minreq = u->prebuf = 0;
+    u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
 #else
 #else
-    u->fragsize = 0;
+    u->fragsize = (uint32_t) -1;
 #endif
 
 #endif
 
-    pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
-
-    if (!(u->thread = pa_thread_new(thread_func, u))) {
+    if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
         pa_log("Failed to create thread.");
         goto fail;
     }
         pa_log("Failed to create thread.");
         goto fail;
     }
@@ -1895,6 +2094,11 @@ void pa__done(pa_module*m) {
     if (u->time_event)
         u->core->mainloop->time_free(u->time_event);
 
     if (u->time_event)
         u->core->mainloop->time_free(u->time_event);
 
+#ifndef TUNNEL_SINK
+    if (u->mcalign)
+        pa_mcalign_free(u->mcalign);
+#endif
+
 #ifdef TUNNEL_SINK
     pa_xfree(u->sink_name);
 #else
 #ifdef TUNNEL_SINK
     pa_xfree(u->sink_name);
 #else