X-Git-Url: https://code.delx.au/pulseaudio/blobdiff_plain/ce534976cec4fe3fa4b861b0136062e817ed0994..05400321c0a87ccda505f04e376e1a8c910d6525:/src/pulsecore/protocol-native.c diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c index 7d8b9399..cd3056d9 100644 --- a/src/pulsecore/protocol-native.c +++ b/src/pulsecore/protocol-native.c @@ -1,5 +1,3 @@ -/* $Id$ */ - /*** This file is part of PulseAudio. @@ -102,7 +100,8 @@ typedef struct playback_stream { pa_sink_input *sink_input; pa_memblockq *memblockq; - pa_bool_t drain_request; + pa_bool_t is_underrun:1; + pa_bool_t drain_request:1; uint32_t drain_tag; uint32_t syncid; @@ -132,7 +131,8 @@ typedef struct upload_stream { struct connection { pa_msgobject parent; - pa_bool_t authorized; + pa_bool_t authorized:1; + pa_bool_t is_local:1; uint32_t version; pa_protocol_native *protocol; pa_client *client; @@ -475,11 +475,15 @@ static void fix_record_buffer_attr_pre(record_stream *s, pa_bool_t adjust_latenc pa_assert(maxlength); pa_assert(fragsize); - if (*maxlength <= 0 || *maxlength > MAX_MEMBLOCKQ_LENGTH) + if (*maxlength == (uint32_t) -1 || *maxlength > MAX_MEMBLOCKQ_LENGTH) *maxlength = MAX_MEMBLOCKQ_LENGTH; + if (*maxlength <= 0) + *maxlength = pa_frame_size(&s->source_output->sample_spec); - if (*fragsize <= 0) + if (*fragsize == (uint32_t) -1) *fragsize = pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec); + if (*fragsize <= 0) + *fragsize = pa_frame_size(&s->source_output->sample_spec); if (adjust_latency) { pa_usec_t fragsize_usec; @@ -730,16 +734,23 @@ static void fix_playback_buffer_attr_pre(playback_stream *s, pa_bool_t adjust_la pa_assert(prebuf); pa_assert(minreq); - if (*maxlength <= 0 || *maxlength > MAX_MEMBLOCKQ_LENGTH) + frame_size = pa_frame_size(&s->sink_input->sample_spec); + + if (*maxlength == (uint32_t) -1 || *maxlength > MAX_MEMBLOCKQ_LENGTH) *maxlength = MAX_MEMBLOCKQ_LENGTH; - if (*tlength <= 0) + if (*maxlength <= 0) + *maxlength = frame_size; + + if (*tlength == (uint32_t) -1) *tlength = pa_usec_to_bytes(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec); - if (*minreq <= 0) - *minreq = pa_usec_to_bytes(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec); + if (*tlength <= 0) + *tlength = frame_size; - frame_size = pa_frame_size(&s->sink_input->sample_spec); + if (*minreq == (uint32_t) -1) + *minreq = pa_usec_to_bytes(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec); if (*minreq <= 0) *minreq = frame_size; + if (*tlength < *minreq+frame_size) *tlength = *minreq+frame_size; @@ -811,7 +822,7 @@ static void fix_playback_buffer_attr_pre(playback_stream *s, pa_bool_t adjust_la if (*tlength <= *minreq) *tlength = *minreq*2 + frame_size; - if (*prebuf <= 0) + if (*prebuf == (uint32_t) -1 || *prebuf > *tlength) *prebuf = *tlength; } @@ -910,6 +921,9 @@ static playback_stream* playback_stream_new( s->connection = c; s->syncid = syncid; s->sink_input = sink_input; + s->is_underrun = TRUE; + s->drain_request = FALSE; + pa_atomic_store(&s->missing, 0); s->sink_input->parent.process_msg = sink_input_process_msg; s->sink_input->pop = sink_input_pop_cb; @@ -943,9 +957,6 @@ static playback_stream* playback_stream_new( *ss = s->sink_input->sample_spec; *map = s->sink_input->channel_map; - pa_atomic_store(&s->missing, 0); - s->drain_request = FALSE; - pa_idxset_put(c->output_streams, s, &s->index); pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms", @@ -1197,7 +1208,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int switch (code) { case SINK_INPUT_MESSAGE_FLUSH: - func = pa_memblockq_flush; + func = pa_memblockq_flush_write; break; case SINK_INPUT_MESSAGE_PREBUF_FORCE: @@ -1287,24 +1298,28 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk playback_stream_assert_ref(s); pa_assert(chunk); - if (pa_memblockq_peek(s->memblockq, chunk) < 0) { +/* pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */ -/* pa_log("UNDERRUN: %lu", pa_memblockq_get_length(s->memblockq)); */ + if (pa_memblockq_is_readable(s->memblockq)) + s->is_underrun = FALSE; + else { +/* pa_log("%s, UNDERRUN: %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */ if (s->drain_request && pa_sink_input_safe_to_remove(i)) { s->drain_request = FALSE; pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL); - } else if (i->thread_info.playing_for > 0) + } else if (!s->is_underrun) pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL); -/* pa_log("adding %llu bytes", (unsigned long long) nbytes); */ + s->is_underrun = TRUE; request_bytes(s); - - return -1; } -/* pa_log("NOTUNDERRUN %lu", (unsigned long) chunk->length); */ + /* This call will not fail with prebuf=0, hence we check for + underrun explicitly above */ + if (pa_memblockq_peek(s->memblockq, chunk) < 0) + return -1; chunk->length = PA_MIN(nbytes, chunk->length); @@ -1936,7 +1951,7 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t connection *c = CONNECTION(userdata); const void*cookie; pa_tagstruct *reply; - char tmp[16]; + pa_bool_t shm_on_remote, do_shm; connection_assert_ref(c); pa_assert(t); @@ -1954,8 +1969,17 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t return; } - pa_snprintf(tmp, sizeof(tmp), "%u", c->version); - pa_proplist_sets(c->client->proplist, "native-protocol.version", tmp); + /* Starting with protocol version 13 the MSB of the version tag + reflects if shm is available for this connection or + not. */ + if (c->version >= 13) { + shm_on_remote = !!(c->version & 0x80000000U); + c->version &= 0x7FFFFFFFU; + } + + pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION); + + pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version); if (!c->authorized) { pa_bool_t success = FALSE; @@ -1986,16 +2010,7 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t pa_log_info("Got credentials: uid=%lu gid=%lu success=%i", (unsigned long) creds->uid, (unsigned long) creds->gid, - success); - - if (c->version >= 10 && - pa_mempool_is_shared(c->protocol->core->mempool) && - creds->uid == getuid()) { - - pa_pstream_enable_shm(c->pstream, TRUE); - pa_log_info("Enabled SHM for new connection"); - } - + (int) success); } #endif @@ -2015,8 +2030,32 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t } } + /* Enable shared memory support if possible */ + do_shm = + pa_mempool_is_shared(c->protocol->core->mempool) && + c->is_local; + + pa_log_debug("SHM possible: %s", pa_yes_no(do_shm)); + + if (do_shm) + if (c->version < 10 || (c->version >= 13 && !shm_on_remote)) + do_shm = FALSE; + + if (do_shm) { + /* Only enable SHM if both sides are owned by the same + * user. This is a security measure because otherwise data + * private to the user might leak. */ + + const pa_creds *creds; + if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid) + do_shm = FALSE; + } + + pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm)); + pa_pstream_enable_shm(c->pstream, do_shm); + reply = reply_new(tag); - pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION); + pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0)); #ifdef HAVE_CREDS { @@ -2512,6 +2551,7 @@ static void module_fill_tagstruct(pa_tagstruct *t, pa_module *module) { static void sink_input_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sink_input *s) { pa_sample_spec fixed_ss; + pa_usec_t sink_latency; pa_assert(t); pa_sink_input_assert_ref(s); @@ -2526,8 +2566,8 @@ static void sink_input_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sink_in pa_tagstruct_put_sample_spec(t, &fixed_ss); pa_tagstruct_put_channel_map(t, &s->channel_map); pa_tagstruct_put_cvolume(t, &s->volume); - pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s)); - pa_tagstruct_put_usec(t, pa_sink_get_latency(s->sink)); + pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency)); + pa_tagstruct_put_usec(t, sink_latency); pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s))); pa_tagstruct_puts(t, s->driver); if (c->version >= 11) @@ -2538,6 +2578,7 @@ static void sink_input_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sink_in static void source_output_fill_tagstruct(connection *c, pa_tagstruct *t, pa_source_output *s) { pa_sample_spec fixed_ss; + pa_usec_t source_latency; pa_assert(t); pa_source_output_assert_ref(s); @@ -2551,8 +2592,8 @@ static void source_output_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sour pa_tagstruct_putu32(t, s->source->index); pa_tagstruct_put_sample_spec(t, &fixed_ss); pa_tagstruct_put_channel_map(t, &s->channel_map); - pa_tagstruct_put_usec(t, pa_source_output_get_latency(s)); - pa_tagstruct_put_usec(t, pa_source_get_latency(s->source)); + pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency)); + pa_tagstruct_put_usec(t, source_latency); pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s))); pa_tagstruct_puts(t, s->driver); @@ -2574,7 +2615,7 @@ static void scache_fill_tagstruct(connection *c, pa_tagstruct *t, pa_scache_entr pa_tagstruct_putu32(t, e->index); pa_tagstruct_puts(t, e->name); pa_tagstruct_put_cvolume(t, &e->volume); - pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : NULL); + pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0); pa_tagstruct_put_sample_spec(t, &fixed_ss); pa_tagstruct_put_channel_map(t, &e->channel_map); pa_tagstruct_putu32(t, e->memchunk.length); @@ -3047,7 +3088,7 @@ static void command_flush_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_U s = pa_idxset_get_by_index(c->record_streams, idx); CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY); - pa_memblockq_flush(s->memblockq); + pa_memblockq_flush_read(s->memblockq); pa_pstream_send_simple_ack(c->pstream, tag); } @@ -3940,6 +3981,7 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo } else c->auth_timeout_event = NULL; + c->is_local = pa_iochannel_socket_is_local(io); c->version = 8; c->protocol = p; pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname)); @@ -3972,7 +4014,6 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo #ifdef HAVE_CREDS if (pa_iochannel_creds_supported(io)) pa_iochannel_creds_enable(io); - #endif }