]> code.delx.au - pulseaudio/blobdiff - src/pulse/stream.c
Make connect-stress test compile for win32
[pulseaudio] / src / pulse / stream.c
index 4dea56707227e75205b19e98d66be3c3820b6ff8..aac18a314130618b02679fbd1cb5b6b35c6f34c0 100644 (file)
 #include <pulse/timeval.h>
 #include <pulse/rtclock.h>
 #include <pulse/xmalloc.h>
+#include <pulse/fork-detect.h>
 
 #include <pulsecore/pstream-util.h>
 #include <pulsecore/log.h>
 #include <pulsecore/hashmap.h>
 #include <pulsecore/macro.h>
 #include <pulsecore/core-rtclock.h>
+#include <pulsecore/core-util.h>
 
-#include "fork-detect.h"
 #include "internal.h"
+#include "stream.h"
 
 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
@@ -199,7 +201,7 @@ static void stream_unlink(pa_stream *s) {
         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
 
     if (s->channel_valid) {
-        pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
+        pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
         s->channel = 0;
         s->channel_valid = FALSE;
     }
@@ -354,7 +356,7 @@ void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag,
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
+    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -474,7 +476,7 @@ void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, p
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
+    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -557,7 +559,7 @@ void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
+    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -609,7 +611,7 @@ void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t ta
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
+    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -651,7 +653,7 @@ void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag,
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(c->playback_streams, channel)))
+    if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -697,7 +699,7 @@ void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, p
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
+    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -733,7 +735,7 @@ void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tag
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(c->playback_streams, channel)))
+    if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -741,6 +743,8 @@ void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tag
 
     s->requested_bytes += bytes;
 
+    /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
+
     if (s->requested_bytes > 0 && s->write_callback)
         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
 
@@ -767,7 +771,7 @@ void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32
         goto finish;
     }
 
-    if (!(s = pa_dynarray_get(c->playback_streams, channel)))
+    if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
         goto finish;
 
     if (s->state != PA_STREAM_READY)
@@ -786,7 +790,7 @@ void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32
             s->underflow_callback(s, s->underflow_userdata);
     }
 
- finish:
+finish:
     pa_context_unref(c);
 }
 
@@ -852,10 +856,28 @@ static void create_stream_complete(pa_stream *s) {
     check_smoother_status(s, TRUE, FALSE, FALSE);
 }
 
-static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
+static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
+    const char *e;
+
     pa_assert(s);
     pa_assert(attr);
-    pa_assert(ss);
+
+    if ((e = getenv("PULSE_LATENCY_MSEC"))) {
+        uint32_t ms;
+
+        if (pa_atou(e, &ms) < 0 || ms <= 0)
+            pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
+        else {
+            attr->maxlength = (uint32_t) -1;
+            attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
+            attr->minreq = (uint32_t) -1;
+            attr->prebuf = (uint32_t) -1;
+            attr->fragsize = attr->tlength;
+        }
+
+        if (flags)
+            *flags |= PA_STREAM_ADJUST_LATENCY;
+    }
 
     if (s->context->version >= 13)
         return;
@@ -870,7 +892,7 @@ static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_s
         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
 
     if (attr->tlength == (uint32_t) -1)
-        attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
+        attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
 
     if (attr->minreq == (uint32_t) -1)
         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
@@ -903,7 +925,7 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag,
 
     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
         s->channel == PA_INVALID_INDEX ||
-        ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
+        ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
         pa_context_fail(s->context, PA_ERR_PROTOCOL);
         goto finish;
@@ -997,7 +1019,7 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag,
     }
 
     s->channel_valid = TRUE;
-    pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
+    pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
 
     create_stream_complete(s);
 
@@ -1043,7 +1065,9 @@ static int create_stream(
                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
                                               PA_STREAM_START_UNMUTED|
                                               PA_STREAM_FAIL_ON_SUSPEND|
-                                              PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
+                                              PA_STREAM_RELATIVE_VOLUME|
+                                              PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
+
 
     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
@@ -1062,15 +1086,16 @@ static int create_stream(
     pa_stream_ref(s);
 
     s->direction = direction;
-    s->flags = flags;
-    s->corked = !!(flags & PA_STREAM_START_CORKED);
 
     if (sync_stream)
         s->syncid = sync_stream->syncid;
 
     if (attr)
         s->buffer_attr = *attr;
-    automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
+    patch_buffer_attr(s, &s->buffer_attr, &flags);
+
+    s->flags = flags;
+    s->corked = !!(flags & PA_STREAM_START_CORKED);
 
     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
         pa_usec_t x;
@@ -1183,6 +1208,12 @@ static int create_stream(
 
     }
 
+    if (s->context->version >= 18) {
+
+        if (s->direction == PA_STREAM_PLAYBACK)
+            pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
+    }
+
     pa_pstream_send_tagstruct(s->context->pstream, t);
     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
 
@@ -1362,6 +1393,8 @@ int pa_stream_write(
      * that's OK, the server side applies the same error */
     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
 
+    /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
+
     if (s->direction == PA_STREAM_PLAYBACK) {
 
         /* Update latency request correction */
@@ -1700,8 +1733,8 @@ static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command,
                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
         }
 
-        /* Update smoother */
-        if (o->stream->smoother) {
+        /* Update smoother if we're not corked */
+        if (o->stream->smoother && !o->stream->corked) {
             pa_usec_t u, x;
 
             u = x = pa_rtclock_now() - i->transport_usec;
@@ -2129,6 +2162,11 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use
          * index, but the read index might jump. */
         invalidate_indexes(s, TRUE, FALSE);
 
+    /* Note that we do not update requested_bytes here. This is
+     * because we cannot really know how data actually was dropped
+     * from the write index due to this. This 'error' will be applied
+     * by both client and server and hence we should be fine. */
+
     return o;
 }
 
@@ -2415,6 +2453,7 @@ pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr
     pa_operation *o;
     pa_tagstruct *t;
     uint32_t tag;
+    pa_buffer_attr copy;
 
     pa_assert(s);
     pa_assert(PA_REFCNT_VALUE(s) >= 1);
@@ -2438,6 +2477,10 @@ pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr
             &tag);
     pa_tagstruct_putu32(t, s->channel);
 
+    copy = *attr;
+    patch_buffer_attr(s, &copy, NULL);
+    attr = &copy;
+
     pa_tagstruct_putu32(t, attr->maxlength);
 
     if (s->direction == PA_STREAM_PLAYBACK)