X-Git-Url: https://code.delx.au/pulseaudio/blobdiff_plain/c815441ba19e92f88395de8763df4a1e57950f7b..6e319e5182b2d9d8d034ee2c9c0ca5027787b0ce:/src/pulse/stream.c diff --git a/src/pulse/stream.c b/src/pulse/stream.c index 4dea5670..aac18a31 100644 --- a/src/pulse/stream.c +++ b/src/pulse/stream.c @@ -32,15 +32,17 @@ #include #include #include +#include #include #include #include #include #include +#include -#include "fork-detect.h" #include "internal.h" +#include "stream.h" #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC) #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC) @@ -199,7 +201,7 @@ static void stream_unlink(pa_stream *s) { pa_pdispatch_unregister_reply(s->context->pdispatch, s); if (s->channel_valid) { - pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL); + pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel)); s->channel = 0; s->channel_valid = FALSE; } @@ -354,7 +356,7 @@ void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, goto finish; } - if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel))) + if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel)))) goto finish; if (s->state != PA_STREAM_READY) @@ -474,7 +476,7 @@ void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, p goto finish; } - if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel))) + if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel)))) goto finish; if (s->state != PA_STREAM_READY) @@ -557,7 +559,7 @@ void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t goto finish; } - if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel))) + if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel)))) goto finish; if (s->state != PA_STREAM_READY) @@ -609,7 +611,7 @@ void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t ta goto finish; } - if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel))) + if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel)))) goto finish; if (s->state != PA_STREAM_READY) @@ -651,7 +653,7 @@ void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, goto finish; } - if (!(s = pa_dynarray_get(c->playback_streams, channel))) + if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel)))) goto finish; if (s->state != PA_STREAM_READY) @@ -697,7 +699,7 @@ void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, p goto finish; } - if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel))) + if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel)))) goto finish; if (s->state != PA_STREAM_READY) @@ -733,7 +735,7 @@ void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tag goto finish; } - if (!(s = pa_dynarray_get(c->playback_streams, channel))) + if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel)))) goto finish; if (s->state != PA_STREAM_READY) @@ -741,6 +743,8 @@ void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tag s->requested_bytes += bytes; + /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */ + if (s->requested_bytes > 0 && s->write_callback) s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata); @@ -767,7 +771,7 @@ void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32 goto finish; } - if (!(s = pa_dynarray_get(c->playback_streams, channel))) + if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel)))) goto finish; if (s->state != PA_STREAM_READY) @@ -786,7 +790,7 @@ void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32 s->underflow_callback(s, s->underflow_userdata); } - finish: +finish: pa_context_unref(c); } @@ -852,10 +856,28 @@ static void create_stream_complete(pa_stream *s) { check_smoother_status(s, TRUE, FALSE, FALSE); } -static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) { +static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) { + const char *e; + pa_assert(s); pa_assert(attr); - pa_assert(ss); + + if ((e = getenv("PULSE_LATENCY_MSEC"))) { + uint32_t ms; + + if (pa_atou(e, &ms) < 0 || ms <= 0) + pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e); + else { + attr->maxlength = (uint32_t) -1; + attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec); + attr->minreq = (uint32_t) -1; + attr->prebuf = (uint32_t) -1; + attr->fragsize = attr->tlength; + } + + if (flags) + *flags |= PA_STREAM_ADJUST_LATENCY; + } if (s->context->version >= 13) return; @@ -870,7 +892,7 @@ static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_s attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */ if (attr->tlength == (uint32_t) -1) - attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */ + attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */ if (attr->minreq == (uint32_t) -1) attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */ @@ -903,7 +925,7 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, if (pa_tagstruct_getu32(t, &s->channel) < 0 || s->channel == PA_INVALID_INDEX || - ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) || + ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) || ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) { pa_context_fail(s->context, PA_ERR_PROTOCOL); goto finish; @@ -997,7 +1019,7 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, } s->channel_valid = TRUE; - pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s); + pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s); create_stream_complete(s); @@ -1043,7 +1065,9 @@ static int create_stream( PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND| PA_STREAM_START_UNMUTED| PA_STREAM_FAIL_ON_SUSPEND| - PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID); + PA_STREAM_RELATIVE_VOLUME| + PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID); + PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED); PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED); @@ -1062,15 +1086,16 @@ static int create_stream( pa_stream_ref(s); s->direction = direction; - s->flags = flags; - s->corked = !!(flags & PA_STREAM_START_CORKED); if (sync_stream) s->syncid = sync_stream->syncid; if (attr) s->buffer_attr = *attr; - automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec); + patch_buffer_attr(s, &s->buffer_attr, &flags); + + s->flags = flags; + s->corked = !!(flags & PA_STREAM_START_CORKED); if (flags & PA_STREAM_INTERPOLATE_TIMING) { pa_usec_t x; @@ -1183,6 +1208,12 @@ static int create_stream( } + if (s->context->version >= 18) { + + if (s->direction == PA_STREAM_PLAYBACK) + pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH)); + } + pa_pstream_send_tagstruct(s->context->pstream, t); pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL); @@ -1362,6 +1393,8 @@ int pa_stream_write( * that's OK, the server side applies the same error */ s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length; + /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */ + if (s->direction == PA_STREAM_PLAYBACK) { /* Update latency request correction */ @@ -1700,8 +1733,8 @@ static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq); } - /* Update smoother */ - if (o->stream->smoother) { + /* Update smoother if we're not corked */ + if (o->stream->smoother && !o->stream->corked) { pa_usec_t u, x; u = x = pa_rtclock_now() - i->transport_usec; @@ -2129,6 +2162,11 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use * index, but the read index might jump. */ invalidate_indexes(s, TRUE, FALSE); + /* Note that we do not update requested_bytes here. This is + * because we cannot really know how data actually was dropped + * from the write index due to this. This 'error' will be applied + * by both client and server and hence we should be fine. */ + return o; } @@ -2415,6 +2453,7 @@ pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr pa_operation *o; pa_tagstruct *t; uint32_t tag; + pa_buffer_attr copy; pa_assert(s); pa_assert(PA_REFCNT_VALUE(s) >= 1); @@ -2438,6 +2477,10 @@ pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr &tag); pa_tagstruct_putu32(t, s->channel); + copy = *attr; + patch_buffer_attr(s, ©, NULL); + attr = © + pa_tagstruct_putu32(t, attr->maxlength); if (s->direction == PA_STREAM_PLAYBACK)