]> code.delx.au - pulseaudio/blobdiff - src/pulse/stream.c
Make connect-stress test compile for win32
[pulseaudio] / src / pulse / stream.c
index 72d49e11e91514e4bdc1b13117cdda8b67543c8f..aac18a314130618b02679fbd1cb5b6b35c6f34c0 100644 (file)
 #include <pulse/timeval.h>
 #include <pulse/rtclock.h>
 #include <pulse/xmalloc.h>
+#include <pulse/fork-detect.h>
 
 #include <pulsecore/pstream-util.h>
 #include <pulsecore/log.h>
 #include <pulsecore/hashmap.h>
 #include <pulsecore/macro.h>
 #include <pulsecore/core-rtclock.h>
+#include <pulsecore/core-util.h>
 
-#include "fork-detect.h"
 #include "internal.h"
+#include "stream.h"
 
 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
@@ -199,7 +201,7 @@ static void stream_unlink(pa_stream *s) {
         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
 
     if (s->channel_valid) {
-        pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
+        pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
         s->channel = 0;
         s->channel_valid = FALSE;
     }
@@ -354,7 +356,7 @@ void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag,
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
+    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -387,9 +389,26 @@ static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t
 
     if (s->suspended || s->corked || force_stop)
         pa_smoother_pause(s->smoother, x);
-    else if (force_start || s->buffer_attr.prebuf == 0)
-        pa_smoother_resume(s->smoother, x, TRUE);
+    else if (force_start || s->buffer_attr.prebuf == 0) {
+
+        if (!s->timing_info_valid &&
+            !aposteriori &&
+            !force_start &&
+            !force_stop &&
+            s->context->version >= 13) {
+
+            /* If the server supports STARTED events we take them as
+             * indications when audio really starts/stops playing, if
+             * we don't have any timing info yet -- instead of trying
+             * to be smart and guessing the server time. Otherwise the
+             * unknown transport delay add too much noise to our time
+             * calculations. */
+
+            return;
+        }
 
+        pa_smoother_resume(s->smoother, x, TRUE);
+    }
 
     /* Please note that we have no idea if playback actually started
      * if prebuf is non-zero! */
@@ -457,7 +476,7 @@ void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, p
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
+    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -540,7 +559,7 @@ void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
+    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -592,7 +611,7 @@ void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t ta
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
+    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -634,7 +653,7 @@ void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag,
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(c->playback_streams, channel)))
+    if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -680,7 +699,7 @@ void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, p
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
+    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -716,7 +735,7 @@ void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tag
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(c->playback_streams, channel)))
+    if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -724,6 +743,8 @@ void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tag
 
     s->requested_bytes += bytes;
 
+    /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
+
     if (s->requested_bytes > 0 && s->write_callback)
         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
 
@@ -750,7 +771,7 @@ void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(c->playback_streams, channel)))
+    if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -769,7 +790,7 @@ void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32
             s->underflow_callback(s, s->underflow_userdata);
     }
 
- finish:
+finish:
     pa_context_unref(c);
 }
 
@@ -835,10 +856,28 @@ static void create_stream_complete(pa_stream *s) {
     check_smoother_status(s, TRUE, FALSE, FALSE);
 }
 
-static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
+static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
+    const char *e;
+
     pa_assert(s);
     pa_assert(attr);
-    pa_assert(ss);
+
+    if ((e = getenv("PULSE_LATENCY_MSEC"))) {
+        uint32_t ms;
+
+        if (pa_atou(e, &ms) < 0 || ms <= 0)
+            pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
+        else {
+            attr->maxlength = (uint32_t) -1;
+            attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
+            attr->minreq = (uint32_t) -1;
+            attr->prebuf = (uint32_t) -1;
+            attr->fragsize = attr->tlength;
+        }
+
+        if (flags)
+            *flags |= PA_STREAM_ADJUST_LATENCY;
+    }
 
     if (s->context->version >= 13)
         return;
@@ -853,7 +892,7 @@ static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_s
         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
 
     if (attr->tlength == (uint32_t) -1)
-        attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
+        attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
 
     if (attr->minreq == (uint32_t) -1)
         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
@@ -867,7 +906,7 @@ static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_s
 
 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
     pa_stream *s = userdata;
-    uint32_t requested_bytes;
+    uint32_t requested_bytes = 0;
 
     pa_assert(pd);
     pa_assert(s);
@@ -886,7 +925,7 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag,
 
     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
         s->channel == PA_INVALID_INDEX ||
-        ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
+        ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
         pa_context_fail(s->context, PA_ERR_PROTOCOL);
         goto finish;
@@ -980,7 +1019,7 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag,
     }
 
     s->channel_valid = TRUE;
-    pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
+    pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
 
     create_stream_complete(s);
 
@@ -1025,7 +1064,10 @@ static int create_stream(
                                               PA_STREAM_EARLY_REQUESTS|
                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
                                               PA_STREAM_START_UNMUTED|
-                                              PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
+                                              PA_STREAM_FAIL_ON_SUSPEND|
+                                              PA_STREAM_RELATIVE_VOLUME|
+                                              PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
+
 
     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
@@ -1044,15 +1086,16 @@ static int create_stream(
     pa_stream_ref(s);
 
     s->direction = direction;
-    s->flags = flags;
-    s->corked = !!(flags & PA_STREAM_START_CORKED);
 
     if (sync_stream)
         s->syncid = sync_stream->syncid;
 
     if (attr)
         s->buffer_attr = *attr;
-    automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
+    patch_buffer_attr(s, &s->buffer_attr, &flags);
+
+    s->flags = flags;
+    s->corked = !!(flags & PA_STREAM_START_CORKED);
 
     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
         pa_usec_t x;
@@ -1158,6 +1201,19 @@ static int create_stream(
         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
     }
 
+    if (s->context->version >= 17) {
+
+        if (s->direction == PA_STREAM_PLAYBACK)
+            pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
+
+    }
+
+    if (s->context->version >= 18) {
+
+        if (s->direction == PA_STREAM_PLAYBACK)
+            pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
+    }
+
     pa_pstream_send_tagstruct(s->context->pstream, t);
     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
 
@@ -1337,6 +1393,8 @@ int pa_stream_write(
      * that's OK, the server side applies the same error */
     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
 
+    /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
+
     if (s->direction == PA_STREAM_PLAYBACK) {
 
         /* Update latency request correction */
@@ -1457,6 +1515,11 @@ pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *us
     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
 
+    /* Ask for a timing update before we cork/uncork to get the best
+     * accuracy for the transport latency suitable for the
+     * check_smoother_status() call in the started callback */
+    request_auto_timing_update(s, TRUE);
+
     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
 
     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
@@ -1464,6 +1527,10 @@ pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *us
     pa_pstream_send_tagstruct(s->context->pstream, t);
     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
 
+    /* This might cause the read index to conitnue again, hence
+     * let's request a timing update */
+    request_auto_timing_update(s, TRUE);
+
     return o;
 }
 
@@ -1666,8 +1733,8 @@ static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command,
                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
         }
 
-        /* Update smoother */
-        if (o->stream->smoother) {
+        /* Update smoother if we're not corked */
+        if (o->stream->smoother && !o->stream->corked) {
             pa_usec_t u, x;
 
             u = x = pa_rtclock_now() - i->transport_usec;
@@ -2012,6 +2079,11 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi
     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
 
+    /* Ask for a timing update before we cork/uncork to get the best
+     * accuracy for the transport latency suitable for the
+     * check_smoother_status() call in the started callback */
+    request_auto_timing_update(s, TRUE);
+
     s->corked = b;
 
     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
@@ -2027,8 +2099,8 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi
 
     check_smoother_status(s, FALSE, FALSE, FALSE);
 
-    /* This might cause the indexes to hang/start again, hence
-     * let's request a timing update */
+    /* This might cause the indexes to hang/start again, hence let's
+     * request a timing update, after the cork/uncork, too */
     request_auto_timing_update(s, TRUE);
 
     return o;
@@ -2065,6 +2137,11 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use
     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
 
+    /* Ask for a timing update *before* the flush, so that the
+     * transport usec is as up to date as possible when we get the
+     * underflow message and update the smoother status*/
+    request_auto_timing_update(s, TRUE);
+
     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
         return NULL;
 
@@ -2085,6 +2162,11 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use
          * index, but the read index might jump. */
         invalidate_indexes(s, TRUE, FALSE);
 
+    /* Note that we do not update requested_bytes here. This is
+     * because we cannot really know how data actually was dropped
+     * from the write index due to this. This 'error' will be applied
+     * by both client and server and hence we should be fine. */
+
     return o;
 }
 
@@ -2099,6 +2181,11 @@ pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *us
     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
 
+    /* Ask for a timing update before we cork/uncork to get the best
+     * accuracy for the transport latency suitable for the
+     * check_smoother_status() call in the started callback */
+    request_auto_timing_update(s, TRUE);
+
     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
         return NULL;
 
@@ -2120,6 +2207,11 @@ pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *u
     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
 
+    /* Ask for a timing update before we cork/uncork to get the best
+     * accuracy for the transport latency suitable for the
+     * check_smoother_status() call in the started callback */
+    request_auto_timing_update(s, TRUE);
+
     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
         return NULL;
 
@@ -2361,6 +2453,7 @@ pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr
     pa_operation *o;
     pa_tagstruct *t;
     uint32_t tag;
+    pa_buffer_attr copy;
 
     pa_assert(s);
     pa_assert(PA_REFCNT_VALUE(s) >= 1);
@@ -2371,6 +2464,11 @@ pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr
     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
 
+    /* Ask for a timing update before we cork/uncork to get the best
+     * accuracy for the transport latency suitable for the
+     * check_smoother_status() call in the started callback */
+    request_auto_timing_update(s, TRUE);
+
     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
 
     t = pa_tagstruct_command(
@@ -2379,6 +2477,10 @@ pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr
             &tag);
     pa_tagstruct_putu32(t, s->channel);
 
+    copy = *attr;
+    patch_buffer_attr(s, &copy, NULL);
+    attr = &copy;
+
     pa_tagstruct_putu32(t, attr->maxlength);
 
     if (s->direction == PA_STREAM_PLAYBACK)