]> code.delx.au - pulseaudio/commitdiff
properly account for seeks in the requested_bytes counter
authorLennart Poettering <lennart@poettering.net>
Wed, 1 Apr 2009 21:05:09 +0000 (23:05 +0200)
committerLennart Poettering <lennart@poettering.net>
Wed, 1 Apr 2009 21:05:09 +0000 (23:05 +0200)
src/modules/module-ladspa-sink.c
src/modules/rtp/module-rtp-recv.c
src/pulse/context.c
src/pulse/internal.h
src/pulse/stream.c
src/pulsecore/memblockq.c
src/pulsecore/memblockq.h
src/pulsecore/protocol-native.c
src/pulsecore/sink-input.c
src/pulsecore/source-output.c
src/tests/memblockq-test.c

index e619acd32abbab1cfc5479666af69530e740766a..44052c9c47f9f89c193beb5f9439c643477fb57a 100644 (file)
@@ -235,7 +235,7 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
         if (amount > 0) {
             unsigned c;
 
-            pa_memblockq_seek(u->memblockq, - (int64_t) amount, PA_SEEK_RELATIVE);
+            pa_memblockq_seek(u->memblockq, - (int64_t) amount, PA_SEEK_RELATIVE, TRUE);
 
             pa_log_debug("Resetting plugin");
 
index 33e23af20756c79b0bcff014d96cf9c7ee53a775..209770f94f09265ecbc8a6e955157ec431a7a4ec 100644 (file)
@@ -238,7 +238,7 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) {
     else
         delta = j;
 
-    pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE);
+    pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
 
     pa_rtclock_get(&now);
 
@@ -246,7 +246,7 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) {
 
     if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
         pa_log_warn("Queue overrun");
-        pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE);
+        pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
     }
 
 /*     pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
index 28d17191bdebd1cbb3e736bcd6833afab87e458f..991a886f0ecff7da4752ce5b789bcd4f6a40121b 100644 (file)
@@ -364,10 +364,10 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
     if ((s = pa_dynarray_get(c->record_streams, channel))) {
 
         if (chunk->memblock) {
-            pa_memblockq_seek(s->record_memblockq, offset, seek);
+            pa_memblockq_seek(s->record_memblockq, offset, seek, TRUE);
             pa_memblockq_push_align(s->record_memblockq, chunk);
         } else
-            pa_memblockq_seek(s->record_memblockq, offset+chunk->length, seek);
+            pa_memblockq_seek(s->record_memblockq, offset+chunk->length, seek, TRUE);
 
         if (s->read_callback) {
             size_t l;
index cf362d99c32e181b9d5d20cce18b7915aae64f2f..43d1877ce78bbf6fb7aa0803c0dc08eac7cc181a 100644 (file)
@@ -140,7 +140,7 @@ struct pa_stream {
     uint32_t syncid;
     uint32_t stream_index;
 
-    uint32_t requested_bytes;
+    int64_t requested_bytes;
     pa_buffer_attr buffer_attr;
 
     uint32_t device_index;
index 16342cad3f46f5be8d204fe68a2c109e4d6d5c83..c4a54af67f335c2631bf32b843f96df598c222c9 100644 (file)
@@ -729,7 +729,7 @@ void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tag
     s->requested_bytes += bytes;
 
     if (s->requested_bytes > 0 && s->write_callback)
-        s->write_callback(s, s->requested_bytes, s->write_userdata);
+        s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
 
 finish:
     pa_context_unref(c);
@@ -826,7 +826,7 @@ static void create_stream_complete(pa_stream *s) {
     pa_stream_set_state(s, PA_STREAM_READY);
 
     if (s->requested_bytes > 0 && s->write_callback)
-        s->write_callback(s, s->requested_bytes, s->write_userdata);
+        s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
 
     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
         struct timeval tv;
@@ -874,6 +874,7 @@ static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_s
 
 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
     pa_stream *s = userdata;
+    uint32_t requested_bytes;
 
     pa_assert(pd);
     pa_assert(s);
@@ -893,11 +894,13 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag,
     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
         s->channel == PA_INVALID_INDEX ||
         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
-        ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
+        ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
         pa_context_fail(s->context, PA_ERR_PROTOCOL);
         goto finish;
     }
 
+    s->requested_bytes = (int64_t) requested_bytes;
+
     if (s->context->version >= 9) {
         if (s->direction == PA_STREAM_PLAYBACK) {
             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
@@ -1258,12 +1261,9 @@ int pa_stream_write(
     if (free_cb && pa_pstream_get_shm(s->context->pstream))
         free_cb((void*) data);
 
-    if (length < s->requested_bytes)
-        s->requested_bytes -= (uint32_t) length;
-    else
-        s->requested_bytes = 0;
-
-    /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/
+    /* This is obviously wrong since we ignore the seeking index . But
+     * that's OK, the server side applies the same error */
+    s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
 
     if (s->direction == PA_STREAM_PLAYBACK) {
 
@@ -1359,7 +1359,7 @@ size_t pa_stream_writable_size(pa_stream *s) {
     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
 
-    return s->requested_bytes;
+    return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
 }
 
 size_t pa_stream_readable_size(pa_stream *s) {
index e6e7b7366a090a51e4c0d8c0d5df6f33f0dd942a..d12d13a8036f42419f54f5b1580d0a3c3f566ff6 100644 (file)
@@ -601,7 +601,7 @@ size_t pa_memblockq_missing(pa_memblockq *bq) {
     return l >= bq->minreq ? l : 0;
 }
 
-void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek) {
+void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek, pa_bool_t account) {
     int64_t old, delta;
     pa_assert(bq);
 
@@ -628,12 +628,14 @@ void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek) {
 
     delta = bq->write_index - old;
 
-    if (delta >= (int64_t) bq->requested) {
-        delta -= (int64_t) bq->requested;
-        bq->requested = 0;
-    } else if (delta >= 0) {
-        bq->requested -= (size_t) delta;
-        delta = 0;
+    if (account) {
+        if (delta >= (int64_t) bq->requested) {
+            delta -= (int64_t) bq->requested;
+            bq->requested = 0;
+        } else if (delta >= 0) {
+            bq->requested -= (size_t) delta;
+            delta = 0;
+        }
     }
 
     bq->missing -= delta;
@@ -895,7 +897,7 @@ int pa_memblockq_splice(pa_memblockq *bq, pa_memblockq *source) {
 
             pa_memblock_unref(chunk.memblock);
         } else
-            pa_memblockq_seek(bq, (int64_t) chunk.length, PA_SEEK_RELATIVE);
+            pa_memblockq_seek(bq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
 
         pa_memblockq_drop(bq, chunk.length);
     }
index e315b8318d007ac828fc8df7f162922329e26b6c..146d261b7f9386e029bd70c1e323a457868e509d 100644 (file)
@@ -85,7 +85,7 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *chunk);
 int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk);
 
 /* Manipulate the write pointer */
-void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek);
+void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek, pa_bool_t account);
 
 /* Return a copy of the next memory chunk in the queue. It is not
  * removed from the queue. There are two reasons this function might
index e11d7a6cdfba7610735bee282bbc3ae87d184a74..30d68f3531aa9b469695e05a999e1ed47d49bc37 100644 (file)
@@ -738,24 +738,21 @@ static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata,
     switch (code) {
         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
             pa_tagstruct *t;
-            uint32_t l = 0;
+            int l = 0;
 
             for (;;) {
-                if ((l = (uint32_t) pa_atomic_load(&s->missing)) <= 0)
-                    break;
+                if ((l = pa_atomic_load(&s->missing)) <= 0)
+                    return 0;
 
-                if (pa_atomic_cmpxchg(&s->missing, (int) l, 0))
+                if (pa_atomic_cmpxchg(&s->missing, l, 0))
                     break;
             }
 
-            if (l <= 0)
-                break;
-
             t = pa_tagstruct_new(NULL, 0);
             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
             pa_tagstruct_putu32(t, s->index);
-            pa_tagstruct_putu32(t, l);
+            pa_tagstruct_putu32(t, (uint32_t) l);
             pa_pstream_send_tagstruct(s->connection->pstream, t);
 
 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
@@ -1097,7 +1094,8 @@ static playback_stream* playback_stream_new(
 
 /* Called from IO context */
 static void playback_stream_request_bytes(playback_stream *s) {
-    size_t m, previous_missing, minreq;
+    size_t m, minreq;
+    int previous_missing;
 
     playback_stream_assert_ref(s);
 
@@ -1108,11 +1106,11 @@ static void playback_stream_request_bytes(playback_stream *s) {
 
 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
 
-    previous_missing = (size_t) pa_atomic_add(&s->missing, (int) m);
+    previous_missing = pa_atomic_add(&s->missing, (int) m);
     minreq = pa_memblockq_get_minreq(s->memblockq);
 
     if (pa_memblockq_prebuf_active(s->memblockq) ||
-        (previous_missing < minreq && previous_missing+m >= minreq))
+        (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
 }
 
@@ -1297,7 +1295,12 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
             int64_t windex;
 
             windex = pa_memblockq_get_write_index(s->memblockq);
-            pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata));
+
+            /* The client side is incapable of accounting correctly
+             * for seeks of a type != PA_SEEK_RELATIVE. We need to be
+             * able to deal with that. */
+
+            pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
 
             handle_seek(s, windex);
             return 0;
@@ -1315,7 +1318,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
                 pa_log_warn("Failed to push data into queue");
                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
-                pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE);
+                pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
             }
 
             handle_seek(s, windex);
index 0ed16dd868307a03968428984fe9b91dbb2554ba..1fdb3fa6743843a68628ff9804b0aad85b514e30 100644 (file)
@@ -631,7 +631,7 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, p
              * data, so let's just hand out silence */
             pa_atomic_store(&i->thread_info.drained, 1);
 
-            pa_memblockq_seek(i->thread_info.render_memblockq, (int64_t) slength, PA_SEEK_RELATIVE);
+            pa_memblockq_seek(i->thread_info.render_memblockq, (int64_t) slength, PA_SEEK_RELATIVE, TRUE);
             i->thread_info.playing_for = 0;
             if (i->thread_info.underrun_for != (uint64_t) -1)
                 i->thread_info.underrun_for += ilength;
@@ -776,7 +776,7 @@ void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sam
 
             if (amount > 0)
                 /* Ok, now update the write pointer */
-                pa_memblockq_seek(i->thread_info.render_memblockq, - ((int64_t) amount), PA_SEEK_RELATIVE);
+                pa_memblockq_seek(i->thread_info.render_memblockq, - ((int64_t) amount), PA_SEEK_RELATIVE, TRUE);
 
             if (i->thread_info.rewrite_flush)
                 pa_memblockq_silence(i->thread_info.render_memblockq);
index 27f24cd17866fd13d3f8b61093cea05d5ed6ef66..1c37be932cb12af1f5a063cedb937d7530f93ac3 100644 (file)
@@ -434,7 +434,7 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) {
 
     if (pa_memblockq_push(o->thread_info.delay_memblockq, chunk) < 0) {
         pa_log_debug("Delay queue overflow!");
-        pa_memblockq_seek(o->thread_info.delay_memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE);
+        pa_memblockq_seek(o->thread_info.delay_memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
     }
 
     limit = o->process_rewind ? 0 : o->source->thread_info.max_rewind;
index 127fb1971cc229c4e2f72dc14ad489123dba33f5..ec3f54265ced9b47e1ff7145e16bcc8522c046f1 100644 (file)
@@ -105,45 +105,45 @@ int main(int argc, char *argv[]) {
     ret = pa_memblockq_push(bq, &chunk4);
     assert(ret == 0);
 
-    pa_memblockq_seek(bq, -6, 0);
+    pa_memblockq_seek(bq, -6, 0, TRUE);
     ret = pa_memblockq_push(bq, &chunk3);
     assert(ret == 0);
 
-    pa_memblockq_seek(bq, -2, 0);
+    pa_memblockq_seek(bq, -2, 0, TRUE);
     ret = pa_memblockq_push(bq, &chunk1);
     assert(ret == 0);
 
-    pa_memblockq_seek(bq, -10, 0);
+    pa_memblockq_seek(bq, -10, 0, TRUE);
     ret = pa_memblockq_push(bq, &chunk4);
     assert(ret == 0);
 
-    pa_memblockq_seek(bq, 10, 0);
+    pa_memblockq_seek(bq, 10, 0, TRUE);
 
     ret = pa_memblockq_push(bq, &chunk1);
     assert(ret == 0);
 
-    pa_memblockq_seek(bq, -6, 0);
+    pa_memblockq_seek(bq, -6, 0, TRUE);
     ret = pa_memblockq_push(bq, &chunk2);
     assert(ret == 0);
 
     /* Test splitting */
-    pa_memblockq_seek(bq, -12, 0);
+    pa_memblockq_seek(bq, -12, 0, TRUE);
     ret = pa_memblockq_push(bq, &chunk1);
     assert(ret == 0);
 
-    pa_memblockq_seek(bq, 20, 0);
+    pa_memblockq_seek(bq, 20, 0, TRUE);
 
     /* Test merging */
     ret = pa_memblockq_push(bq, &chunk3);
     assert(ret == 0);
-    pa_memblockq_seek(bq, -2, 0);
+    pa_memblockq_seek(bq, -2, 0, TRUE);
 
     chunk3.index += 2;
     chunk3.length -= 2;
     ret = pa_memblockq_push(bq, &chunk3);
     assert(ret == 0);
 
-    pa_memblockq_seek(bq, 30, PA_SEEK_RELATIVE);
+    pa_memblockq_seek(bq, 30, PA_SEEK_RELATIVE, TRUE);
 
     dump(bq);