]> code.delx.au - pulseaudio/commitdiff
new features:
authorLennart Poettering <lennart@poettering.net>
Sun, 22 Aug 2004 21:13:58 +0000 (21:13 +0000)
committerLennart Poettering <lennart@poettering.net>
Sun, 22 Aug 2004 21:13:58 +0000 (21:13 +0000)
  future cancellation
  corking
  flushing
for playback streams in native protocol

git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@152 fefdeb5f-60dc-0310-8127-8f9354f1896f

20 files changed:
polyp/memblockq.c
polyp/memblockq.h
polyp/native-common.h
polyp/pacat.c
polyp/pactl.c
polyp/play-memchunk.c
polyp/polyplib-context.c
polyp/polyplib-simple.c
polyp/polyplib-stream.c
polyp/polyplib-stream.h
polyp/protocol-esound.c
polyp/protocol-native.c
polyp/protocol-simple.c
polyp/pstream.c
polyp/pstream.h
polyp/sink-input.c
polyp/sink-input.h
polyp/sink.c
polyp/util.c
polyp/util.h

index 085c05108ff7298a6acb2c83dfb3727a0010fde3..b6dcca3f546617e956db77bb9beab55acaa6593e 100644 (file)
 #include <stdio.h>
 #include <assert.h>
 #include <stdlib.h>
+#include <string.h>
 
 #include "memblockq.h"
 #include "xmalloc.h"
 
 struct memblock_list {
-    struct memblock_list *next;
+    struct memblock_list *next, *prev;
     struct pa_memchunk chunk;
-    struct timeval stamp;
 };
 
 struct pa_memblockq {
     struct memblock_list *blocks, *blocks_tail;
     unsigned n_blocks;
     size_t current_length, maxlength, tlength, base, prebuf, minreq;
-    int measure_delay;
-    uint32_t delay;
     struct pa_mcalign *mcalign;
     struct pa_memblock_stat *memblock_stat;
 };
@@ -66,7 +64,7 @@ struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t tlength, size_t b
     assert(bq->maxlength >= base);
 
     bq->tlength = ((tlength+base-1)/base)*base;
-    if (bq->tlength == 0 || bq->tlength >= bq->maxlength)
+    if (!bq->tlength || bq->tlength >= bq->maxlength)
         bq->tlength = bq->maxlength;
     
     bq->prebuf = (prebuf == (size_t) -1) ? bq->maxlength/2 : prebuf;
@@ -80,29 +78,21 @@ struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t tlength, size_t b
 
     fprintf(stderr, "memblockq sanitized: maxlength=%u, tlength=%u, base=%u, prebuf=%u, minreq=%u\n", bq->maxlength, bq->tlength, bq->base, bq->prebuf, bq->minreq);
     
-    bq->measure_delay = 0;
-    bq->delay = 0;
-
     bq->mcalign = NULL;
 
     bq->memblock_stat = s;
-    
+
     return bq;
 }
 
 void pa_memblockq_free(struct pa_memblockq* bq) {
-    struct memblock_list *l;
     assert(bq);
 
+    pa_memblockq_flush(bq);
+    
     if (bq->mcalign)
         pa_mcalign_free(bq->mcalign);
 
-    while ((l = bq->blocks)) {
-        bq->blocks = l->next;
-        pa_memblock_unref(l->chunk.memblock);
-        pa_xfree(l);
-    }
-    
     pa_xfree(bq);
 }
 
@@ -110,31 +100,25 @@ void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk,
     struct memblock_list *q;
     assert(bq && chunk && chunk->memblock && chunk->length && (chunk->length % bq->base) == 0);
 
+    pa_memblockq_seek(bq, delta);
+    
     if (bq->blocks_tail && bq->blocks_tail->chunk.memblock == chunk->memblock) {
         /* Try to merge memory chunks */
 
         if (bq->blocks_tail->chunk.index+bq->blocks_tail->chunk.length == chunk->index) {
             bq->blocks_tail->chunk.length += chunk->length;
             bq->current_length += chunk->length;
-
-    /*        fprintf(stderr, __FILE__": merge succeeded: %u\n", chunk->length);*/
             return;
         }
     }
     
     q = pa_xmalloc(sizeof(struct memblock_list));
 
-    if (bq->measure_delay)
-        gettimeofday(&q->stamp, NULL);
-    else
-        timerclear(&q->stamp);
-
     q->chunk = *chunk;
     pa_memblock_ref(q->chunk.memblock);
     assert(q->chunk.index+q->chunk.length <= q->chunk.memblock->length);
     q->next = NULL;
-    
-    if (bq->blocks_tail)
+    if ((q->prev = bq->blocks_tail))
         bq->blocks_tail->next = q;
     else
         bq->blocks = q;
@@ -158,57 +142,43 @@ int pa_memblockq_peek(struct pa_memblockq* bq, struct pa_memchunk *chunk) {
     *chunk = bq->blocks->chunk;
     pa_memblock_ref(chunk->memblock);
 
-/*     if (chunk->memblock->ref != 2) */
-/*         fprintf(stderr, "block %p with ref %u peeked.\n", chunk->memblock, chunk->memblock->ref); */
-    
     return 0;
 }
 
-/*
-int memblockq_pop(struct memblockq* bq, struct pa_memchunk *chunk) {
-    struct memblock_list *q;
-    
-    assert(bq && chunk);
-
-    if (!bq->blocks || bq->current_length < bq->prebuf)
-        return -1;
-
-    bq->prebuf = 0;
-
-    q = bq->blocks;
-    bq->blocks = bq->blocks->next;
-
-    *chunk = q->chunk;
+void pa_memblockq_drop(struct pa_memblockq *bq, const struct pa_memchunk *chunk, size_t length) {
+    assert(bq && chunk && length);
 
-    bq->n_blocks--;
-    bq->current_length -= chunk->length;
+    if (!bq->blocks || memcmp(&bq->blocks->chunk, chunk, sizeof(struct pa_memchunk)))
+        return;
 
-    pa_xfree(q);
-    return 0;
+    assert(length <= bq->blocks->chunk.length);
+    pa_memblockq_skip(bq, length);
 }
-*/
 
-static uint32_t age(struct timeval *tv) {
-    struct timeval now;
-    uint32_t r;
-    assert(tv);
+static void remove_block(struct pa_memblockq *bq, struct memblock_list *q) {
+    assert(bq && q);
 
-    if (tv->tv_sec == 0)
-        return 0;
-
-    gettimeofday(&now, NULL);
+    if (q->prev)
+        q->prev->next = q->next;
+    else {
+        assert(bq->blocks == q);
+        bq->blocks = q->next;
+    }
     
-    r = (now.tv_sec-tv->tv_sec) * 1000000;
-
-    if (now.tv_usec >= tv->tv_usec)
-        r += now.tv_usec - tv->tv_usec;
-    else
-        r -= tv->tv_usec - now.tv_usec;
-
-    return r;
+    if (q->next)
+        q->next->prev = q->prev;
+    else {
+        assert(bq->blocks_tail == q);
+        bq->blocks_tail = q->prev;
+    }
+    
+    pa_memblock_unref(q->chunk.memblock);
+    pa_xfree(q);
+    
+    bq->n_blocks--;
 }
 
-void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) {
+void pa_memblockq_skip(struct pa_memblockq *bq, size_t length) {
     assert(bq && length && (length % bq->base) == 0);
 
     while (length > 0) {
@@ -218,25 +188,12 @@ void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) {
         if (l > bq->blocks->chunk.length)
             l = bq->blocks->chunk.length;
 
-        if (bq->measure_delay)
-            bq->delay = age(&bq->blocks->stamp);
-        
         bq->blocks->chunk.index += l;
         bq->blocks->chunk.length -= l;
         bq->current_length -= l;
         
-        if (bq->blocks->chunk.length == 0) {
-            struct memblock_list *q;
-            
-            q = bq->blocks;
-            bq->blocks = bq->blocks->next;
-            if (bq->blocks == NULL)
-                bq->blocks_tail = NULL;
-            pa_memblock_unref(q->chunk.memblock);
-            pa_xfree(q);
-            
-            bq->n_blocks--;
-        }
+        if (!bq->blocks->chunk.length)
+            remove_block(bq, bq->blocks);
 
         length -= l;
     }
@@ -255,7 +212,7 @@ void pa_memblockq_shorten(struct pa_memblockq *bq, size_t length) {
     l /= bq->base;
     l *= bq->base;
 
-    pa_memblockq_drop(bq, l);
+    pa_memblockq_skip(bq, l);
 }
 
 
@@ -276,11 +233,6 @@ int pa_memblockq_is_writable(struct pa_memblockq *bq, size_t length) {
     return bq->current_length + length <= bq->tlength;
 }
 
-uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq) {
-    assert(bq);
-    return bq->delay;
-}
-
 uint32_t pa_memblockq_get_length(struct pa_memblockq *bq) {
     assert(bq);
     return bq->current_length;
@@ -331,3 +283,44 @@ void pa_memblockq_prebuf_disable(struct pa_memblockq *bq) {
     assert(bq);
     bq->prebuf = 0;
 }
+
+void pa_memblockq_seek(struct pa_memblockq *bq, size_t length) {
+    assert(bq);
+
+    if (!length)
+        return;
+
+    while (length >= bq->base) {
+        size_t l = length;
+        if (!bq->current_length)
+            return;
+
+        assert(bq->blocks_tail);
+        
+        if (l > bq->blocks_tail->chunk.length)
+            l = bq->blocks_tail->chunk.length;
+
+        bq->blocks_tail->chunk.length -= l;
+        bq->current_length -= l;
+        
+        if (bq->blocks_tail->chunk.length == 0)
+            remove_block(bq, bq->blocks);
+
+        length -= l;
+    }
+}
+
+void pa_memblockq_flush(struct pa_memblockq *bq) {
+    struct memblock_list *l;
+    assert(bq);
+    
+    while ((l = bq->blocks)) {
+        bq->blocks = l->next;
+        pa_memblock_unref(l->chunk.memblock);
+        pa_xfree(l);
+    }
+
+    bq->blocks_tail = NULL;
+    bq->n_blocks = 0;
+    bq->current_length = 0;
+}
index af8fa3746ee98d53f6ead700fff86e906561eabd..277beb55f7b5f980c56579c849815312d16646aa 100644 (file)
@@ -44,7 +44,7 @@ struct pa_memblockq* pa_memblockq_new(size_t maxlength,
                                       struct pa_memblock_stat *s);
 void pa_memblockq_free(struct pa_memblockq*bq);
 
-/* Push a new memory chunk into the queue. Optionally specify a value for future cancellation. This is currently not implemented, however! */
+/* Push a new memory chunk into the queue. Optionally specify a value for future cancellation. */
 void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk, size_t delta);
 
 /* Same as pa_memblockq_push(), however chunks are filtered through a mcalign object, and thus aligned to multiples of base */
@@ -53,8 +53,11 @@ void pa_memblockq_push_align(struct pa_memblockq* bq, const struct pa_memchunk *
 /* Return a copy of the next memory chunk in the queue. It is not removed from the queue */
 int pa_memblockq_peek(struct pa_memblockq* bq, struct pa_memchunk *chunk);
 
+/* Drop the specified bytes from the queue, only valid aufter pa_memblockq_peek() */
+void pa_memblockq_drop(struct pa_memblockq *bq, const struct pa_memchunk *chunk, size_t length);
+
 /* Drop the specified bytes from the queue */
-void pa_memblockq_drop(struct pa_memblockq *bq, size_t length);
+void pa_memblockq_skip(struct pa_memblockq *bq, size_t length);
 
 /* Shorten the pa_memblockq to the specified length by dropping data at the end of the queue */
 void pa_memblockq_shorten(struct pa_memblockq *bq, size_t length);
@@ -68,9 +71,6 @@ int pa_memblockq_is_readable(struct pa_memblockq *bq);
 /* Test if the pa_memblockq is currently writable for the specified amount of bytes */
 int pa_memblockq_is_writable(struct pa_memblockq *bq, size_t length);
 
-/* The time memory chunks stay in the queue until they are removed completely in usecs */
-uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq);
-
 /* Return the length of the queue in bytes */
 uint32_t pa_memblockq_get_length(struct pa_memblockq *bq);
 
@@ -83,4 +83,10 @@ uint32_t pa_memblockq_get_minreq(struct pa_memblockq *bq);
 /* Force disabling of pre-buf even when the pre-buffer is not yet filled */
 void pa_memblockq_prebuf_disable(struct pa_memblockq *bq);
 
+/* Manipulate the write pointer */
+void pa_memblockq_seek(struct pa_memblockq *bq, size_t delta);
+
+/* Flush the queue */
+void pa_memblockq_flush(struct pa_memblockq *bq);
+
 #endif
index b921ccc2c396472fd578dc6819cebeea1819cecd..d826837ae91d2a15ff4614ea1e256583a78c1931 100644 (file)
@@ -75,6 +75,9 @@ enum {
 
     PA_COMMAND_SET_SINK_VOLUME,
     PA_COMMAND_SET_SINK_INPUT_VOLUME,
+
+    PA_COMMAND_CORK_PLAYBACK_STREAM,
+    PA_COMMAND_FLUSH_PLAYBACK_STREAM,
     
     PA_COMMAND_MAX
 };
index 2c7044b8c77b97417393cb3431202842f6f41357..198776d3582fb320808c5893688291160672ada8 100644 (file)
@@ -65,7 +65,7 @@ static void do_stream_write(size_t length) {
     if (l > buffer_length)
         l = buffer_length;
     
-    pa_stream_write(stream, buffer+buffer_index, l, NULL);
+    pa_stream_write(stream, buffer+buffer_index, l, NULL, 0);
     buffer_length -= l;
     buffer_index += l;
     
index f2556706ba78fb271809a866e571bf3eb7c267a4..dfa11b70d68b47d58438207a3fd5a68ce6fa8791 100644 (file)
@@ -150,7 +150,7 @@ static void stream_write_callback(struct pa_stream *s, size_t length, void *user
         quit(1);
     }
     
-    pa_stream_write(s, d, length, free);
+    pa_stream_write(s, d, length, free, 0);
 
     sample_length -= length;
 
index 5c423567073d396e43bb4b8e3094ca1b8c29d41b..b94a052420b78491bc7aa5d11b8907bee80ba9a8 100644 (file)
@@ -59,11 +59,12 @@ static void si_kill(struct pa_mainloop_api *m, void *i) {
     sink_input_kill(i);
 }
 
-static void sink_input_drop(struct pa_sink_input *i, size_t length) {
+static void sink_input_drop(struct pa_sink_input *i, const struct pa_memchunk*chunk, size_t length) {
     struct pa_memchunk *c;
     assert(i && length && i->userdata);
     c = i->userdata;
 
+    assert(chunk == c);
     assert(length <= c->length);
 
     c->length -= length;
index d048cda99371ee57857ec56cfa2789f65bb7859b..9acb2d70367e4bdd5db54f71884a988b7174fcf7 100644 (file)
@@ -193,7 +193,7 @@ static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *pack
     pa_context_unref(c);
 }
 
-static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata) {
+static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) {
     struct pa_context *c = userdata;
     struct pa_stream *s;
     assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
index 66ee5995faae6c5674f701729521bf882d7942ed..c71d59a4a45e9994a8258629ef62ab30b1e723fb 100644 (file)
@@ -187,7 +187,7 @@ int pa_simple_write(struct pa_simple *p, const void*data, size_t length, int *pe
         if (l > length)
             l = length;
 
-        pa_stream_write(p->stream, data, l, NULL);
+        pa_stream_write(p->stream, data, l, NULL, 0);
         data += l;
         length -= l;
     }
index 451dd0461d3dedee9fa9f558fae2abe5b4f372f5..c0ec9e7ebca0ee01cdcf876740b219f9e816a750 100644 (file)
@@ -267,7 +267,7 @@ void pa_stream_connect_record(struct pa_stream *s, const char *dev, const struct
     create_stream(s, dev, attr);
 }
 
-void pa_stream_write(struct pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p)) {
+void pa_stream_write(struct pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p), size_t delta) {
     struct pa_memchunk chunk;
     assert(s && s->context && data && length && s->state == PA_STREAM_READY && s->ref >= 1);
 
@@ -282,7 +282,7 @@ void pa_stream_write(struct pa_stream *s, const void *data, size_t length, void
     chunk.index = 0;
     chunk.length = length;
 
-    pa_pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk);
+    pa_pstream_send_memblock(s->context->pstream, s->channel, delta, &chunk);
     pa_memblock_unref(chunk.memblock);
     
     if (length < s->requested_bytes)
@@ -452,3 +452,48 @@ finish:
     pa_operation_done(o);
     pa_operation_unref(o);
 }
+
+struct pa_operation* pa_stream_cork(struct pa_stream *s, int b, void (*cb) (struct pa_stream*s, int success, void *userdata), void *userdata) {
+    struct pa_operation *o;
+    struct pa_tagstruct *t;
+    uint32_t tag;
+    assert(s && s->ref >= 1 && s->state == PA_STREAM_READY);
+
+    o = pa_operation_new(s->context, s);
+    assert(o);
+    o->callback = cb;
+    o->userdata = userdata;
+
+    t = pa_tagstruct_new(NULL, 0);
+    assert(t);
+    pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
+    pa_tagstruct_putu32(t, tag = s->context->ctag++);
+    pa_tagstruct_putu32(t, s->channel);
+    pa_tagstruct_putu32(t, !!b);
+    pa_pstream_send_tagstruct(s->context->pstream, t);
+    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
+
+    return pa_operation_ref(o);
+}
+
+struct pa_operation* pa_stream_flush(struct pa_stream *s, void (*cb)(struct pa_stream *s, int success, void *userdata), void *userdata) {
+    struct pa_operation *o;
+    struct pa_tagstruct *t;
+    uint32_t tag;
+    assert(s && s->ref >= 1 && s->state == PA_STREAM_READY);
+
+    o = pa_operation_new(s->context, s);
+    assert(o);
+    o->callback = cb;
+    o->userdata = userdata;
+
+    t = pa_tagstruct_new(NULL, 0);
+    assert(t);
+    pa_tagstruct_putu32(t, PA_COMMAND_FLUSH_PLAYBACK_STREAM);
+    pa_tagstruct_putu32(t, tag = s->context->ctag++);
+    pa_tagstruct_putu32(t, s->channel);
+    pa_pstream_send_tagstruct(s->context->pstream, t);
+    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
+
+    return pa_operation_ref(o);   
+}
index 1a9d58dd2b1cf20fd801e58f54946f7f388c0c2d..ff31332680b2330e42ee2c62a71c8467383e782c 100644 (file)
@@ -70,7 +70,30 @@ void pa_stream_disconnect(struct pa_stream *s);
  * and an internal reference to the specified data is kept, the data
  * is not copied. If NULL, the data is copied into an internal
  * buffer. */ 
-void pa_stream_write(struct pa_stream *p, const void *data, size_t length, void (*free_cb)(void *p));
+void pa_stream_write(struct pa_stream *p      /**< The stream to use */,
+                     const void *data         /**< The data to write */,
+                     size_t length            /**< The length of the data to write */,
+                     void (*free_cb)(void *p) /**< A cleanup routine for the data or NULL to request an internal copy */,
+                     size_t delta             /**< Drop this many
+                                                 bytes in the playback
+                                                 buffer before writing
+                                                 this data. Use
+                                                 (size_t) -1 for
+                                                 clearing the whole
+                                                 playback
+                                                 buffer. Normally you
+                                                 will specify 0 here,
+                                                 .i.e. append to the
+                                                 playback buffer. If
+                                                 the value given here
+                                                 is greater than the
+                                                 buffered data length
+                                                 the buffer is cleared
+                                                 and the data is
+                                                 written to the
+                                                 buffer's start. This
+                                                 value is ignored on
+                                                 upload streams. */);
 
 /** Return the amount of bytes that may be written using pa_stream_write() */
 size_t pa_stream_writable_size(struct pa_stream *p);
@@ -90,6 +113,16 @@ void pa_stream_set_write_callback(struct pa_stream *p, void (*cb)(struct pa_stre
 /** Set the callback function that is called when new data is available from the stream */
 void pa_stream_set_read_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata);
 
+/** Pause (or resume) playback of this stream temporarily
+ * \since 0.3 */
+struct pa_operation* pa_stream_cork(struct pa_stream *s, int b, void (*cb) (struct pa_stream*s, int success, void *userdata), void *userdata);
+
+/** Flush the playback buffer of this stream. Most of the time you're
+ * better off using the delta of pa_stream_write() instead of this
+ * function.
+ * \since 0.3*/
+struct pa_operation* pa_stream_flush(struct pa_stream *s, void (*cb)(struct pa_stream *s, int success, void *userdata), void *userdata);
+
 PA_C_DECL_END
 
 #endif
index be2ef2b94e09fb18f4ca8fc41b6c509b08b02484..5102540b2ae8ac1c4bbafadf39f65139fe74de62 100644 (file)
@@ -102,7 +102,7 @@ typedef struct proto_handler {
     const char *description;
 } esd_proto_handler_info_t;
 
-static void sink_input_drop_cb(struct pa_sink_input *i, size_t length);
+static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk);
 static void sink_input_kill_cb(struct pa_sink_input *i);
 static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i);
@@ -835,7 +835,7 @@ static int do_write(struct connection *c) {
             return -1;
         }
     
-        pa_memblockq_drop(c->output_memblockq, r);
+        pa_memblockq_drop(c->output_memblockq, &chunk, r);
         pa_memblock_unref(chunk.memblock);
     }
     
@@ -894,11 +894,11 @@ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk
     return 0;
 }
 
-static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) {
+static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
     struct connection*c = i->userdata;
     assert(i && c && length);
 
-    pa_memblockq_drop(c->input_memblockq, length);
+    pa_memblockq_drop(c->input_memblockq, chunk, length);
 
     /* do something */
     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
index c8e5137e4ec283b354ec331a0b26ebcb64046d92..8b39482ca2acf10853b5c54983d3fadb4273dda5 100644 (file)
@@ -107,7 +107,7 @@ struct pa_protocol_native {
 };
 
 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk);
-static void sink_input_drop_cb(struct pa_sink_input *i, size_t length);
+static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
 static void sink_input_kill_cb(struct pa_sink_input *i);
 static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i);
 
@@ -135,6 +135,8 @@ static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uin
 static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
 static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
 static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
+static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
+static void command_flush_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
 
 static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
     [PA_COMMAND_ERROR] = { NULL },
@@ -176,6 +178,8 @@ static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
     [PA_COMMAND_SUBSCRIBE] = { command_subscribe },
     [PA_COMMAND_SET_SINK_VOLUME] = { command_set_volume },
     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = { command_set_volume },
+    [PA_COMMAND_CORK_PLAYBACK_STREAM] = { command_cork_playback_stream },
+    [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = { command_flush_playback_stream },
 };
 
 /* structure management */
@@ -376,7 +380,7 @@ static void send_memblock(struct connection *c) {
                 chunk.length = r->fragment_size;
 
             pa_pstream_send_memblock(c->pstream, r->index, 0, &chunk);
-            pa_memblockq_drop(r->memblockq, chunk.length);
+            pa_memblockq_drop(r->memblockq, &chunk, chunk.length);
             pa_memblock_unref(chunk.memblock);
             
             return;
@@ -422,12 +426,12 @@ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk
     return 0;
 }
 
-static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) {
+static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
     struct playback_stream *s;
     assert(i && i->userdata && length);
     s = i->userdata;
 
-    pa_memblockq_drop(s->memblockq, length);
+    pa_memblockq_drop(s->memblockq, chunk, length);
     request_bytes(s);
 
     if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
@@ -1293,6 +1297,59 @@ static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32
     pa_pstream_send_simple_ack(c->pstream, tag);
 }
 
+static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
+    struct connection *c = userdata;
+    uint32_t index;
+    uint32_t b;
+    struct playback_stream *s;
+    assert(c && t);
+
+    if (pa_tagstruct_getu32(t, &index) < 0 ||
+        pa_tagstruct_getu32(t, &b) < 0 ||
+        !pa_tagstruct_eof(t)) {
+        protocol_error(c);
+        return;
+    }
+
+    if (!c->authorized) {
+        pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+        return;
+    }
+
+    if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
+        pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+        return;
+    }
+
+    pa_sink_input_cork(s->sink_input, b);
+    pa_pstream_send_simple_ack(c->pstream, tag);
+}
+
+static void command_flush_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
+    struct connection *c = userdata;
+    uint32_t index;
+    struct playback_stream *s;
+    assert(c && t);
+
+    if (pa_tagstruct_getu32(t, &index) < 0 ||
+        !pa_tagstruct_eof(t)) {
+        protocol_error(c);
+        return;
+    }
+
+    if (!c->authorized) {
+        pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+        return;
+    }
+
+    if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
+        pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+        return;
+    }
+
+    pa_memblockq_flush(s->memblockq);
+    pa_pstream_send_simple_ack(c->pstream, tag);
+}
 
 /*** pstream callbacks ***/
 
@@ -1306,7 +1363,7 @@ static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *pack
     }
 }
 
-static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata) {
+static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) {
     struct connection *c = userdata;
     struct output_stream *stream;
     assert(p && chunk && userdata);
@@ -1338,7 +1395,6 @@ static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, in
                 u->memchunk = *chunk;
                 pa_memblock_ref(u->memchunk.memblock);
                 u->length = 0;
-                fprintf(stderr, "COPY\n");
             } else {
                 u->memchunk.memblock = pa_memblock_new(u->length, c->protocol->core->memblock_stat);
                 u->memchunk.index = u->memchunk.length = 0;
index 5834348605aae663bb8f82ea680a7036180ce831..b03c2e54ace9354adc26342519616f59d65fa991 100644 (file)
@@ -159,7 +159,7 @@ static int do_write(struct connection *c) {
         return -1;
     }
     
-    pa_memblockq_drop(c->output_memblockq, r);
+    pa_memblockq_drop(c->output_memblockq, &chunk, r);
     pa_memblock_unref(chunk.memblock);
     
     return 0;
@@ -202,11 +202,11 @@ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk
     return 0;
 }
 
-static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) {
+static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
     struct connection*c = i->userdata;
     assert(i && c && length);
 
-    pa_memblockq_drop(c->input_memblockq, length);
+    pa_memblockq_drop(c->input_memblockq, chunk, length);
 
     /* do something */
     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
index 5664e18a1507cbbebf689257fb60005462639610..ad3dd0e071608eaf90f9a56913637d2fb7314707 100644 (file)
@@ -50,7 +50,7 @@ struct item_info {
     /* memblock info */
     struct pa_memchunk chunk;
     uint32_t channel;
-    int32_t delta;
+    uint32_t delta;
 
     /* packet info */
     struct pa_packet *packet;
@@ -86,7 +86,7 @@ struct pa_pstream {
     void (*recieve_packet_callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata);
     void *recieve_packet_callback_userdata;
 
-    void (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata);
+    void (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata);
     void *recieve_memblock_callback_userdata;
 
     void (*drain_callback)(struct pa_pstream *p, void *userdata);
@@ -219,7 +219,7 @@ void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet) {
     p->mainloop->defer_enable(p->defer_event, 1);
 }
 
-void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk) {
+void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk) {
     struct item_info *i;
     assert(p && channel != (uint32_t) -1 && chunk);
     
@@ -242,7 +242,7 @@ void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callbac
     p->recieve_packet_callback_userdata = userdata;
 }
 
-void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata) {
+void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata) {
     assert(p && callback);
 
     p->recieve_memblock_callback = callback;
@@ -378,7 +378,7 @@ static void do_read(struct pa_pstream *p) {
                     p->recieve_memblock_callback(
                         p,
                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
-                        (int32_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]),
+                        ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]),
                         &chunk,
                         p->recieve_memblock_callback_userdata);
             }
index 9a2895073825e1e4276a5b8aacd9948bc6a0d0e2..dfd29983b3178b85ad2f7a57878d2a9e26458930 100644 (file)
@@ -37,10 +37,10 @@ void pa_pstream_unref(struct pa_pstream*p);
 struct pa_pstream* pa_pstream_ref(struct pa_pstream*p);
 
 void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet);
-void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk);
+void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk);
 
 void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata);
-void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata);
+void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata);
 void pa_pstream_set_drain_callback(struct pa_pstream *p, void (*cb)(struct pa_pstream *p, void *userdata), void *userdata);
 
 void pa_pstream_set_die_callback(struct pa_pstream *p, void (*callback)(struct pa_pstream *p, void *userdata), void *userdata);
index c57dd8e058617ee4a6ee6552e106da6273d89c17..5009033f29e93bd64c69f6ae178ad3ddde0ef260 100644 (file)
@@ -59,6 +59,7 @@ struct pa_sink_input* pa_sink_input_new(struct pa_sink *s, const char *name, con
     i->get_latency = NULL;
     i->userdata = NULL;
 
+    i->corked = 0;
     i->volume = PA_VOLUME_NORM;
 
     i->resampled_chunk.memblock = NULL;
@@ -120,6 +121,9 @@ uint32_t pa_sink_input_get_latency(struct pa_sink_input *i) {
 int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk) {
     assert(i && chunk && i->peek && i->drop);
 
+    if (i->corked == 0)
+        return -1;
+    
     if (!i->resampler)
         return i->peek(i, chunk);
 
@@ -134,11 +138,12 @@ int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk) {
         assert(tchunk.length);
         
         l = pa_resampler_request(i->resampler, CONVERT_BUFFER_LENGTH);
+
+        i->drop(i, &tchunk, l);
+
         if (tchunk.length > l)
             tchunk.length = l;
 
-        i->drop(i, tchunk.length);
-
         pa_resampler_run(i->resampler, &tchunk, &i->resampled_chunk);
         pa_memblock_unref(tchunk.memblock);
     }
@@ -149,11 +154,11 @@ int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk) {
     return 0;
 }
 
-void pa_sink_input_drop(struct pa_sink_input *i, size_t length) {
+void pa_sink_input_drop(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
     assert(i && length);
 
     if (!i->resampler) {
-        i->drop(i, length);
+        i->drop(i, chunk, length);
         return;
     }
     
@@ -177,3 +182,13 @@ void pa_sink_input_set_volume(struct pa_sink_input *i, pa_volume_t volume) {
         pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
     }
 }
+
+void pa_sink_input_cork(struct pa_sink_input *i, int b) {
+    int n;
+    assert(i);
+    n = i->corked && !b;
+    i->corked = b;
+
+    if (n)
+        pa_sink_notify(i->sink);
+}
index 8d7788d8e7795ed364d3d0492f35300d3070a3a4..b0644540949fe9dd92a0b9389af8f7076118bd05 100644 (file)
@@ -34,6 +34,8 @@
 struct pa_sink_input {
     uint32_t index;
 
+    int corked;
+    
     char *name;
     struct pa_module *owner;
     struct pa_client *client;
@@ -42,7 +44,7 @@ struct pa_sink_input {
     uint32_t volume;
     
     int (*peek) (struct pa_sink_input *i, struct pa_memchunk *chunk);
-    void (*drop) (struct pa_sink_input *i, size_t length);
+    void (*drop) (struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
     void (*kill) (struct pa_sink_input *i);
     uint32_t (*get_latency) (struct pa_sink_input *i);
 
@@ -62,8 +64,10 @@ void pa_sink_input_kill(struct pa_sink_input *i);
 uint32_t pa_sink_input_get_latency(struct pa_sink_input *i);
 
 int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk);
-void pa_sink_input_drop(struct pa_sink_input *i, size_t length);
+void pa_sink_input_drop(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
 
 void pa_sink_input_set_volume(struct pa_sink_input *i, pa_volume_t volume);
 
+void pa_sink_input_cork(struct pa_sink_input *i, int b);
+
 #endif
index 62b9a7af2333d1b3ea4f538e7231da362bfa17ff..43fd351cf67fa5a7ad39956b4a99db1422b7754f 100644 (file)
@@ -147,8 +147,8 @@ static void inputs_drop(struct pa_sink *s, struct pa_mix_info *info, unsigned ma
         struct pa_sink_input *i = info->userdata;
         assert(i && info->chunk.memblock);
         
+        pa_sink_input_drop(i, &info->chunk, length);
         pa_memblock_unref(info->chunk.memblock);
-        pa_sink_input_drop(i, length);
     }
 }
         
index 2878c546b7032313bc57bb007ebe9b266779ba20..6c8febb62494cbf634d365d1c39b2576398f4c1e 100644 (file)
@@ -36,6 +36,7 @@
 #include <pwd.h>
 #include <signal.h>
 #include <pthread.h>
+#include <sys/time.h>
 
 #include "util.h"
 #include "xmalloc.h"
@@ -192,3 +193,23 @@ char *pa_get_host_name(char *s, size_t l) {
     s[l-1] = 0;
     return s;
 }
+
+uint32_t pa_age(struct timeval *tv) {
+    struct timeval now;
+    uint32_t r;
+    assert(tv);
+
+    if (tv->tv_sec == 0)
+        return 0;
+
+    gettimeofday(&now, NULL);
+    
+    r = (now.tv_sec-tv->tv_sec) * 1000000;
+
+    if (now.tv_usec >= tv->tv_usec)
+        r += now.tv_usec - tv->tv_usec;
+    else
+        r -= tv->tv_usec - now.tv_usec;
+
+    return r;
+}
index 7dd7b7de469a4982722f09c467444a9877a82b82..9dab45d24fbbae772a97cc2998321afc2187e17a 100644 (file)
@@ -23,6 +23,7 @@
 ***/
 
 #include <sys/types.h>
+#include <inttypes.h>
 
 void pa_make_nonblock_fd(int fd);
 
@@ -38,4 +39,6 @@ char *pa_sprintf_malloc(const char *format, ...) __attribute__ ((format (printf,
 char *pa_get_user_name(char *s, size_t l);
 char *pa_get_host_name(char *s, size_t l);
 
+uint32_t pa_age(struct timeval *tv);
+
 #endif