]> code.delx.au - pulseaudio/blobdiff - src/modules/module-tunnel.c
dbusiface-module: Implement the Module D-Bus interface.
[pulseaudio] / src / modules / module-tunnel.c
index 6f525da3c8bf8566d518607d6319c7fc41aab496..5ccb81d0659c2c3cac989d781a222e9641375e94 100644 (file)
@@ -31,6 +31,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 
+#include <pulse/rtclock.h>
 #include <pulse/timeval.h>
 #include <pulse/util.h>
 #include <pulse/version.h>
 #include <pulsecore/time-smoother.h>
 #include <pulsecore/thread.h>
 #include <pulsecore/thread-mq.h>
-#include <pulsecore/rtclock.h>
+#include <pulsecore/core-rtclock.h>
 #include <pulsecore/core-error.h>
 #include <pulsecore/proplist-util.h>
 #include <pulsecore/auth-cookie.h>
+#include <pulsecore/mcalign.h>
 
 #ifdef TUNNEL_SINK
 #include "module-tunnel-sink-symdef.h"
@@ -112,7 +114,7 @@ static const char* const valid_modargs[] = {
 
 #define DEFAULT_TIMEOUT 5
 
-#define LATENCY_INTERVAL 10
+#define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
 
 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
 
@@ -193,6 +195,7 @@ struct userdata {
 #else
     char *source_name;
     pa_source *source;
+    pa_mcalign *mcalign;
 #endif
 
     pa_auth_cookie *auth_cookie;
@@ -395,7 +398,7 @@ static void check_smoother_status(struct userdata *u, pa_bool_t past)  {
 
     pa_assert(u);
 
-    x = pa_rtclock_usec();
+    x = pa_rtclock_now();
 
     /* Correct by the time the requested issued needs to travel to the
      * other side.  This is a valid thread-safe access, because the
@@ -500,7 +503,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
             pa_usec_t yl, yr, *usec = data;
 
             yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
-            yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
+            yr = pa_smoother_get(u->smoother, pa_rtclock_now());
 
             *usec = yl > yr ? yl - yr : 0;
             return 0;
@@ -533,7 +536,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
             else
                 y = 0;
 
-            pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
+            pa_smoother_put(u->smoother, pa_rtclock_now(), y);
 
             /* We can access this freely here, since the main thread is waiting for us */
             u->thread_transport_usec = u->transport_usec;
@@ -607,20 +610,29 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
             pa_usec_t yr, yl, *usec = data;
 
             yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
-            yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
+            yr = pa_smoother_get(u->smoother, pa_rtclock_now());
 
             *usec = yr > yl ? yr - yl : 0;
             return 0;
         }
 
-        case SOURCE_MESSAGE_POST:
+        case SOURCE_MESSAGE_POST: {
+            pa_memchunk c;
 
-            if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
-                pa_source_post(u->source, chunk);
+            pa_mcalign_push(u->mcalign, chunk);
 
-            u->counter += (int64_t) chunk->length;
+            while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
+
+                if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
+                    pa_source_post(u->source, &c);
+
+                pa_memblock_unref(c.memblock);
+
+                u->counter += (int64_t) c.length;
+            }
 
             return 0;
+        }
 
         case SOURCE_MESSAGE_REMOTE_SUSPEND:
 
@@ -633,7 +645,7 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
             y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
             y += (pa_usec_t) offset;
 
-            pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
+            pa_smoother_put(u->smoother, pa_rtclock_now(), y);
 
             /* We can access this freely here, since the main thread is waiting for us */
             u->thread_transport_usec = u->transport_usec;
@@ -683,7 +695,6 @@ static void thread_func(void *userdata) {
     pa_log_debug("Thread starting up");
 
     pa_thread_mq_install(&u->thread_mq);
-    pa_rtpoll_install(u->rtpoll);
 
     for (;;) {
         int ret;
@@ -730,7 +741,7 @@ static void command_request(pa_pdispatch *pd, uint32_t command,  uint32_t tag, p
     }
 
     if (channel != u->channel) {
-        pa_log("Recieved data for invalid channel");
+        pa_log("Received data for invalid channel");
         goto fail;
     }
 
@@ -878,9 +889,8 @@ static void request_latency(struct userdata *u) {
 }
 
 /* Called from main context */
-static void timeout_callback(pa_mainloop_api *m, pa_time_event*e,  const struct timeval *tv, void *userdata) {
+static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
     struct userdata *u = userdata;
-    struct timeval ntv;
 
     pa_assert(m);
     pa_assert(e);
@@ -888,9 +898,7 @@ static void timeout_callback(pa_mainloop_api *m, pa_time_event*e,  const struct
 
     request_latency(u);
 
-    pa_gettimeofday(&ntv);
-    ntv.tv_sec += LATENCY_INTERVAL;
-    m->time_restart(e, &ntv);
+    pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
 }
 
 /* Called from main context */
@@ -1154,7 +1162,7 @@ static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag
     pa_assert(u->sink);
 
     if ((u->version < 11 || !!mute == !!u->sink->muted) &&
-        pa_cvolume_equal(&volume, &u->sink->virtual_volume))
+        pa_cvolume_equal(&volume, &u->sink->real_volume))
         return;
 
     pa_sink_volume_changed(u->sink, &volume);
@@ -1357,7 +1365,6 @@ static void start_subscribe(struct userdata *u) {
 /* Called from main context */
 static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
     struct userdata *u = userdata;
-    struct timeval ntv;
 #ifdef TUNNEL_SINK
     uint32_t bytes;
 #endif
@@ -1439,9 +1446,7 @@ static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t
     request_info(u);
 
     pa_assert(!u->time_event);
-    pa_gettimeofday(&ntv);
-    ntv.tv_sec += LATENCY_INTERVAL;
-    u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
+    u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
 
     request_latency(u);
 
@@ -1675,7 +1680,7 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
     pa_assert(u);
 
     if (channel != u->channel) {
-        pa_log("Recieved memory block on bad channel.");
+        pa_log("Received memory block on bad channel.");
         pa_module_unload_request(u->module, TRUE);
         return;
     }
@@ -1706,7 +1711,7 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata
     }
 
     u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
-    u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
+    u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
 
     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
     pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
@@ -1758,7 +1763,7 @@ static void sink_set_volume(pa_sink *sink) {
     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
     pa_tagstruct_putu32(t, tag = u->ctag++);
     pa_tagstruct_putu32(t, u->device_index);
-    pa_tagstruct_put_cvolume(t, &sink->virtual_volume);
+    pa_tagstruct_put_cvolume(t, &sink->real_volume);
     pa_pstream_send_tagstruct(u->pstream, t);
 }
 
@@ -1825,7 +1830,7 @@ int pa__init(pa_module*m) {
             TRUE,
             TRUE,
             10,
-            pa_rtclock_usec(),
+            pa_rtclock_now(),
             FALSE);
     u->ctag = 1;
     u->device_index = u->channel = PA_INVALID_INDEX;
@@ -1853,7 +1858,7 @@ int pa__init(pa_module*m) {
         goto fail;
     }
 
-    if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
+    if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
         pa_log("Failed to connect to server '%s'", u->server_name);
         goto fail;
     }
@@ -1943,6 +1948,8 @@ int pa__init(pa_module*m) {
 
     pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
     pa_source_set_rtpoll(u->source, u->rtpoll);
+
+    u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
 #endif
 
     pa_xfree(dn);
@@ -2036,6 +2043,11 @@ void pa__done(pa_module*m) {
     if (u->time_event)
         u->core->mainloop->time_free(u->time_event);
 
+#ifndef TUNNEL_SINK
+    if (u->mcalign)
+        pa_mcalign_free(u->mcalign);
+#endif
+
 #ifdef TUNNEL_SINK
     pa_xfree(u->sink_name);
 #else