]> code.delx.au - pulseaudio/blobdiff - src/pulsecore/sink.c
use single array for storing pa_core hook lists, add sink state changed hook, drop...
[pulseaudio] / src / pulsecore / sink.c
index 11effe2f4a8749c611fb64fb4dc7ad75ef23bb23..929542ccfb014c253fa3005040b4b2d6e57c0a3b 100644 (file)
@@ -46,8 +46,9 @@
 #include "sink.h"
 
 #define MAX_MIX_CHANNELS 32
+#define SILENCE_BUFFER_LENGTH (64*1024)
 
-static PA_DEFINE_CHECK_TYPE(pa_sink, sink_check_type, pa_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(pa_sink, pa_msgobject);
 
 static void sink_free(pa_object *s);
 
@@ -79,7 +80,7 @@ pa_sink* pa_sink_new(
     pa_return_null_if_fail(!driver || pa_utf8_valid(driver));
     pa_return_null_if_fail(name && pa_utf8_valid(name) && *name);
 
-    s = pa_msgobject_new(pa_sink, sink_check_type);
+    s = pa_msgobject_new(pa_sink);
 
     if (!(name = pa_namereg_register(core, name, PA_NAMEREG_SINK, s, fail))) {
         pa_xfree(s);
@@ -145,6 +146,8 @@ pa_sink* pa_sink_new(
 
     pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_NEW, s->index);
 
+    pa_hook_fire(&core->hooks[PA_CORE_HOOK_SINK_NEW_POST], s);
+
     return s;
 }
 
@@ -160,10 +163,12 @@ static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
         if ((ret = s->set_state(s, state)) < 0)
             return -1;
 
-    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
         return -1;
 
     s->state = state;
+    
+    pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SINK_STATE_CHANGED], s);
     return 0;
 }
 
@@ -173,11 +178,11 @@ void pa_sink_disconnect(pa_sink* s) {
     pa_assert(s);
     pa_return_if_fail(s->state != PA_SINK_DISCONNECTED);
 
+    pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SINK_DISCONNECT], s);
+    
     pa_namereg_unregister(s->core, s->name);
     pa_idxset_remove_by_data(s->core->sinks, s, NULL);
 
-    pa_hook_fire(&s->core->hook_sink_disconnect, s);
-
     while ((i = pa_idxset_first(s->inputs, NULL))) {
         pa_assert(i != j);
         pa_sink_input_kill(i);
@@ -197,6 +202,8 @@ void pa_sink_disconnect(pa_sink* s) {
     s->set_state = NULL;
 
     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_REMOVE, s->index);
+
+    pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SINK_DISCONNECT_POST], s);
 }
 
 static void sink_free(pa_object *o) {
@@ -263,7 +270,7 @@ int pa_sink_suspend(pa_sink *s, int suspend) {
 void pa_sink_ping(pa_sink *s) {
     pa_sink_assert_ref(s);
 
-    pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_PING, NULL, NULL, NULL);
+    pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_PING, NULL, 0, NULL, NULL);
 }
 
 static unsigned fill_mix_info(pa_sink *s, pa_mix_info *info, unsigned maxinfo) {
@@ -319,12 +326,13 @@ static void inputs_drop(pa_sink *s, pa_mix_info *info, unsigned n, size_t length
                 break;
             }
 
-            if (++p > n)
+            p++;
+            if (p >= n)
                 p = 0;
         }
 
         /* Drop read data */
-        pa_sink_input_drop(i, m ? &m->chunk : NULL, length);
+        pa_sink_input_drop(i, length);
 
         if (m) {
             pa_sink_input_unref(m->userdata);
@@ -360,10 +368,13 @@ void pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) {
 
     pa_sink_ref(s);
 
-    n = fill_mix_info(s, info, MAX_MIX_CHANNELS);
+    n = s->thread_info.state == PA_SINK_RUNNING ? fill_mix_info(s, info, MAX_MIX_CHANNELS) : 0;
 
     if (n == 0) {
 
+        if (length > SILENCE_BUFFER_LENGTH)
+            length = SILENCE_BUFFER_LENGTH;
+
         if (!s->silence || pa_memblock_get_length(s->silence) < length) {
             if (s->silence)
                 pa_memblock_unref(s->silence);
@@ -403,7 +414,8 @@ void pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) {
         result->index = 0;
     }
 
-    inputs_drop(s, info, n, result->length);
+    if (s->thread_info.state == PA_SINK_RUNNING)
+        inputs_drop(s, info, n, result->length);
 
     if (s->monitor_source)
         pa_source_post(s->monitor_source, result);
@@ -422,7 +434,7 @@ void pa_sink_render_into(pa_sink*s, pa_memchunk *target) {
 
     pa_sink_ref(s);
 
-    n = fill_mix_info(s, info, MAX_MIX_CHANNELS);
+    n = s->thread_info.state == PA_SINK_RUNNING ? fill_mix_info(s, info, MAX_MIX_CHANNELS) : 0;
 
     if (n == 0) {
         pa_silence_memchunk(target, &s->sample_spec);
@@ -467,7 +479,8 @@ void pa_sink_render_into(pa_sink*s, pa_memchunk *target) {
         pa_memblock_release(target->memblock);
     }
 
-    inputs_drop(s, info, n, target->length);
+    if (s->thread_info.state == PA_SINK_RUNNING)
+        inputs_drop(s, info, n, target->length);
 
     if (s->monitor_source)
         pa_source_post(s->monitor_source, target);
@@ -523,7 +536,7 @@ pa_usec_t pa_sink_get_latency(pa_sink *s) {
     if (s->get_latency)
         return s->get_latency(s);
 
-    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_LATENCY, &usec, NULL) < 0)
+    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
         return 0;
 
     return usec;
@@ -542,7 +555,7 @@ void pa_sink_set_volume(pa_sink *s, const pa_cvolume *volume) {
         s->set_volume = NULL;
 
     if (!s->set_volume)
-        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree);
+        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), 0, NULL, pa_xfree);
 
     if (changed)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -559,7 +572,7 @@ const pa_cvolume *pa_sink_get_volume(pa_sink *s) {
         s->get_volume = NULL;
 
     if (!s->get_volume && s->refresh_volume)
-        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_VOLUME, &s->volume, NULL);
+        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_VOLUME, &s->volume, 0, NULL);
 
     if (!pa_cvolume_equal(&old_volume, &s->volume))
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -578,7 +591,7 @@ void pa_sink_set_mute(pa_sink *s, int mute) {
         s->set_mute = NULL;
 
     if (!s->set_mute)
-        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
+        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), 0, NULL, NULL);
 
     if (changed)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -595,7 +608,7 @@ int pa_sink_get_mute(pa_sink *s) {
         s->get_mute = NULL;
 
     if (!s->get_mute && s->refresh_mute)
-        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_MUTE, &s->muted, NULL);
+        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_MUTE, &s->muted, 0, NULL);
 
     if (old_muted != s->muted)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -653,20 +666,58 @@ unsigned pa_sink_used_by(pa_sink *s) {
     return ret;
 }
 
-int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
+int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
     pa_sink *s = PA_SINK(o);
     pa_sink_assert_ref(s);
 
     switch ((pa_sink_message_t) code) {
+        
         case PA_SINK_MESSAGE_ADD_INPUT: {
-            pa_sink_input *i = userdata;
+            pa_sink_input *i = PA_SINK_INPUT(userdata);
             pa_hashmap_put(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index), pa_sink_input_ref(i));
+
+            /* Since the caller sleeps in pa_sink_input_put(), we can
+             * safely access data outside of thread_info even though
+             * it is mutable */
+
+            if ((i->thread_info.sync_prev = i->sync_prev)) {
+                pa_assert(i->sink == i->thread_info.sync_prev->sink);
+                pa_assert(i->sync_prev->sync_next == i);
+                i->thread_info.sync_prev->thread_info.sync_next = i;
+            }
+
+            if ((i->thread_info.sync_next = i->sync_next)) {
+                pa_assert(i->sink == i->thread_info.sync_next->sink);
+                pa_assert(i->sync_next->sync_prev == i);
+                i->thread_info.sync_next->thread_info.sync_prev = i;
+            }
+
             return 0;
         }
 
         case PA_SINK_MESSAGE_REMOVE_INPUT: {
-            pa_sink_input *i = userdata;
-            pa_hashmap_remove(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index));
+            pa_sink_input *i = PA_SINK_INPUT(userdata);
+
+            /* Since the caller sleeps in pa_sink_input_disconnect(),
+             * we can safely access data outside of thread_info even
+             * though it is mutable */
+
+            pa_assert(!i->thread_info.sync_prev);
+            pa_assert(!i->thread_info.sync_next);
+            
+            if (i->thread_info.sync_prev) {
+                i->thread_info.sync_prev->thread_info.sync_next = i->thread_info.sync_prev->sync_next;
+                i->thread_info.sync_prev = NULL;
+            }
+
+            if (i->thread_info.sync_next) {
+                i->thread_info.sync_next->thread_info.sync_prev = i->thread_info.sync_next->sync_prev;
+                i->thread_info.sync_next = NULL;
+            }
+            
+            if (pa_hashmap_remove(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index)))
+                pa_sink_input_unref(i);
+            
             return 0;
         }
 
@@ -690,6 +741,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *
             return 0;
 
         case PA_SINK_MESSAGE_SET_STATE:
+            
             s->thread_info.state = PA_PTR_TO_UINT(userdata);
             return 0;