]> code.delx.au - pulseaudio/blobdiff - src/pulsecore/protocol-native.c
fix underrun detection for prebuf=0 streams
[pulseaudio] / src / pulsecore / protocol-native.c
index 7d8b939922c0be1f7e181bfdafca9acb561e9ba4..cd3056d92696b97f363482437d8b104be240f723 100644 (file)
@@ -1,5 +1,3 @@
-/* $Id$ */
-
 /***
   This file is part of PulseAudio.
 
@@ -102,7 +100,8 @@ typedef struct playback_stream {
 
     pa_sink_input *sink_input;
     pa_memblockq *memblockq;
-    pa_bool_t drain_request;
+    pa_bool_t is_underrun:1;
+    pa_bool_t drain_request:1;
     uint32_t drain_tag;
     uint32_t syncid;
 
@@ -132,7 +131,8 @@ typedef struct upload_stream {
 struct connection {
     pa_msgobject parent;
 
-    pa_bool_t authorized;
+    pa_bool_t authorized:1;
+    pa_bool_t is_local:1;
     uint32_t version;
     pa_protocol_native *protocol;
     pa_client *client;
@@ -475,11 +475,15 @@ static void fix_record_buffer_attr_pre(record_stream *s, pa_bool_t adjust_latenc
     pa_assert(maxlength);
     pa_assert(fragsize);
 
-    if (*maxlength <= 0 || *maxlength > MAX_MEMBLOCKQ_LENGTH)
+    if (*maxlength == (uint32_t) -1 || *maxlength > MAX_MEMBLOCKQ_LENGTH)
         *maxlength = MAX_MEMBLOCKQ_LENGTH;
+    if (*maxlength <= 0)
+        *maxlength = pa_frame_size(&s->source_output->sample_spec);
 
-    if (*fragsize <= 0)
+    if (*fragsize == (uint32_t) -1)
         *fragsize = pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
+    if (*fragsize <= 0)
+        *fragsize = pa_frame_size(&s->source_output->sample_spec);
 
     if (adjust_latency) {
         pa_usec_t fragsize_usec;
@@ -730,16 +734,23 @@ static void fix_playback_buffer_attr_pre(playback_stream *s, pa_bool_t adjust_la
     pa_assert(prebuf);
     pa_assert(minreq);
 
-    if (*maxlength <= 0 || *maxlength > MAX_MEMBLOCKQ_LENGTH)
+    frame_size = pa_frame_size(&s->sink_input->sample_spec);
+
+    if (*maxlength == (uint32_t) -1 || *maxlength > MAX_MEMBLOCKQ_LENGTH)
         *maxlength = MAX_MEMBLOCKQ_LENGTH;
-    if (*tlength <= 0)
+    if (*maxlength <= 0)
+        *maxlength = frame_size;
+
+    if (*tlength == (uint32_t) -1)
         *tlength = pa_usec_to_bytes(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
-    if (*minreq <= 0)
-        *minreq = pa_usec_to_bytes(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
+    if (*tlength <= 0)
+        *tlength = frame_size;
 
-    frame_size = pa_frame_size(&s->sink_input->sample_spec);
+    if (*minreq == (uint32_t) -1)
+        *minreq = pa_usec_to_bytes(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
     if (*minreq <= 0)
         *minreq = frame_size;
+
     if (*tlength < *minreq+frame_size)
         *tlength = *minreq+frame_size;
 
@@ -811,7 +822,7 @@ static void fix_playback_buffer_attr_pre(playback_stream *s, pa_bool_t adjust_la
     if (*tlength <= *minreq)
         *tlength =  *minreq*2 + frame_size;
 
-    if (*prebuf <= 0)
+    if (*prebuf == (uint32_t) -1 || *prebuf > *tlength)
         *prebuf = *tlength;
 }
 
@@ -910,6 +921,9 @@ static playback_stream* playback_stream_new(
     s->connection = c;
     s->syncid = syncid;
     s->sink_input = sink_input;
+    s->is_underrun = TRUE;
+    s->drain_request = FALSE;
+    pa_atomic_store(&s->missing, 0);
 
     s->sink_input->parent.process_msg = sink_input_process_msg;
     s->sink_input->pop = sink_input_pop_cb;
@@ -943,9 +957,6 @@ static playback_stream* playback_stream_new(
     *ss = s->sink_input->sample_spec;
     *map = s->sink_input->channel_map;
 
-    pa_atomic_store(&s->missing, 0);
-    s->drain_request = FALSE;
-
     pa_idxset_put(c->output_streams, s, &s->index);
 
     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
@@ -1197,7 +1208,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
 
             switch  (code) {
                 case SINK_INPUT_MESSAGE_FLUSH:
-                    func = pa_memblockq_flush;
+                    func = pa_memblockq_flush_write;
                     break;
 
                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
@@ -1287,24 +1298,28 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
     playback_stream_assert_ref(s);
     pa_assert(chunk);
 
-    if (pa_memblockq_peek(s->memblockq, chunk) < 0) {
+/*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
 
-/*         pa_log("UNDERRUN: %lu", pa_memblockq_get_length(s->memblockq)); */
+    if (pa_memblockq_is_readable(s->memblockq))
+        s->is_underrun = FALSE;
+    else {
+/*         pa_log("%s, UNDERRUN: %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
 
         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
             s->drain_request = FALSE;
             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
-        } else if (i->thread_info.playing_for > 0)
+        } else if (!s->is_underrun)
             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
 
-/*         pa_log("adding %llu bytes", (unsigned long long) nbytes); */
+        s->is_underrun = TRUE;
 
         request_bytes(s);
-
-        return -1;
     }
 
-/*     pa_log("NOTUNDERRUN %lu", (unsigned long) chunk->length); */
+    /* This call will not fail with prebuf=0, hence we check for
+       underrun explicitly above */
+    if (pa_memblockq_peek(s->memblockq, chunk) < 0)
+        return -1;
 
     chunk->length = PA_MIN(nbytes, chunk->length);
 
@@ -1936,7 +1951,7 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
     connection *c = CONNECTION(userdata);
     const void*cookie;
     pa_tagstruct *reply;
-    char tmp[16];
+    pa_bool_t shm_on_remote, do_shm;
 
     connection_assert_ref(c);
     pa_assert(t);
@@ -1954,8 +1969,17 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
         return;
     }
 
-    pa_snprintf(tmp, sizeof(tmp), "%u", c->version);
-    pa_proplist_sets(c->client->proplist, "native-protocol.version", tmp);
+    /* Starting with protocol version 13 the MSB of the version tag
+       reflects if shm is available for this connection or
+       not. */
+    if (c->version >= 13) {
+        shm_on_remote = !!(c->version & 0x80000000U);
+        c->version &= 0x7FFFFFFFU;
+    }
+
+    pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
+
+    pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
 
     if (!c->authorized) {
         pa_bool_t success = FALSE;
@@ -1986,16 +2010,7 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
                         (unsigned long) creds->uid,
                         (unsigned long) creds->gid,
-                        success);
-
-            if (c->version >= 10 &&
-                pa_mempool_is_shared(c->protocol->core->mempool) &&
-                creds->uid == getuid()) {
-
-                pa_pstream_enable_shm(c->pstream, TRUE);
-                pa_log_info("Enabled SHM for new connection");
-            }
-
+                        (int) success);
         }
 #endif
 
@@ -2015,8 +2030,32 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
         }
     }
 
+    /* Enable shared memory support if possible */
+    do_shm =
+        pa_mempool_is_shared(c->protocol->core->mempool) &&
+        c->is_local;
+
+    pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
+
+    if (do_shm)
+        if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
+            do_shm = FALSE;
+
+    if (do_shm) {
+        /* Only enable SHM if both sides are owned by the same
+         * user. This is a security measure because otherwise data
+         * private to the user might leak. */
+
+        const pa_creds *creds;
+        if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
+            do_shm = FALSE;
+    }
+
+    pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
+    pa_pstream_enable_shm(c->pstream, do_shm);
+
     reply = reply_new(tag);
-    pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION);
+    pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
 
 #ifdef HAVE_CREDS
 {
@@ -2512,6 +2551,7 @@ static void module_fill_tagstruct(pa_tagstruct *t, pa_module *module) {
 
 static void sink_input_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sink_input *s) {
     pa_sample_spec fixed_ss;
+    pa_usec_t sink_latency;
 
     pa_assert(t);
     pa_sink_input_assert_ref(s);
@@ -2526,8 +2566,8 @@ static void sink_input_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sink_in
     pa_tagstruct_put_sample_spec(t, &fixed_ss);
     pa_tagstruct_put_channel_map(t, &s->channel_map);
     pa_tagstruct_put_cvolume(t, &s->volume);
-    pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s));
-    pa_tagstruct_put_usec(t, pa_sink_get_latency(s->sink));
+    pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
+    pa_tagstruct_put_usec(t, sink_latency);
     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
     pa_tagstruct_puts(t, s->driver);
     if (c->version >= 11)
@@ -2538,6 +2578,7 @@ static void sink_input_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sink_in
 
 static void source_output_fill_tagstruct(connection *c, pa_tagstruct *t, pa_source_output *s) {
     pa_sample_spec fixed_ss;
+    pa_usec_t source_latency;
 
     pa_assert(t);
     pa_source_output_assert_ref(s);
@@ -2551,8 +2592,8 @@ static void source_output_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sour
     pa_tagstruct_putu32(t, s->source->index);
     pa_tagstruct_put_sample_spec(t, &fixed_ss);
     pa_tagstruct_put_channel_map(t, &s->channel_map);
-    pa_tagstruct_put_usec(t, pa_source_output_get_latency(s));
-    pa_tagstruct_put_usec(t, pa_source_get_latency(s->source));
+    pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
+    pa_tagstruct_put_usec(t, source_latency);
     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
     pa_tagstruct_puts(t, s->driver);
 
@@ -2574,7 +2615,7 @@ static void scache_fill_tagstruct(connection *c, pa_tagstruct *t, pa_scache_entr
     pa_tagstruct_putu32(t, e->index);
     pa_tagstruct_puts(t, e->name);
     pa_tagstruct_put_cvolume(t, &e->volume);
-    pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : NULL);
+    pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
     pa_tagstruct_put_sample_spec(t, &fixed_ss);
     pa_tagstruct_put_channel_map(t, &e->channel_map);
     pa_tagstruct_putu32(t, e->memchunk.length);
@@ -3047,7 +3088,7 @@ static void command_flush_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_U
     s = pa_idxset_get_by_index(c->record_streams, idx);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
 
-    pa_memblockq_flush(s->memblockq);
+    pa_memblockq_flush_read(s->memblockq);
     pa_pstream_send_simple_ack(c->pstream, tag);
 }
 
@@ -3940,6 +3981,7 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
     } else
         c->auth_timeout_event = NULL;
 
+    c->is_local = pa_iochannel_socket_is_local(io);
     c->version = 8;
     c->protocol = p;
     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
@@ -3972,7 +4014,6 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
 #ifdef HAVE_CREDS
     if (pa_iochannel_creds_supported(io))
         pa_iochannel_creds_enable(io);
-
 #endif
 }