if (amount > 0) {
unsigned c;
- pa_memblockq_seek(u->memblockq, - (int64_t) amount, PA_SEEK_RELATIVE);
+ pa_memblockq_seek(u->memblockq, - (int64_t) amount, PA_SEEK_RELATIVE, TRUE);
pa_log_debug("Resetting plugin");
else
delta = j;
- pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE);
+ pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
pa_rtclock_get(&now);
if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
pa_log_warn("Queue overrun");
- pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE);
+ pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
}
/* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
if ((s = pa_dynarray_get(c->record_streams, channel))) {
if (chunk->memblock) {
- pa_memblockq_seek(s->record_memblockq, offset, seek);
+ pa_memblockq_seek(s->record_memblockq, offset, seek, TRUE);
pa_memblockq_push_align(s->record_memblockq, chunk);
} else
- pa_memblockq_seek(s->record_memblockq, offset+chunk->length, seek);
+ pa_memblockq_seek(s->record_memblockq, offset+chunk->length, seek, TRUE);
if (s->read_callback) {
size_t l;
uint32_t syncid;
uint32_t stream_index;
- uint32_t requested_bytes;
+ int64_t requested_bytes;
pa_buffer_attr buffer_attr;
uint32_t device_index;
s->requested_bytes += bytes;
if (s->requested_bytes > 0 && s->write_callback)
- s->write_callback(s, s->requested_bytes, s->write_userdata);
+ s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
finish:
pa_context_unref(c);
pa_stream_set_state(s, PA_STREAM_READY);
if (s->requested_bytes > 0 && s->write_callback)
- s->write_callback(s, s->requested_bytes, s->write_userdata);
+ s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
struct timeval tv;
void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
pa_stream *s = userdata;
+ uint32_t requested_bytes;
pa_assert(pd);
pa_assert(s);
if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
s->channel == PA_INVALID_INDEX ||
((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
- ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
+ ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
pa_context_fail(s->context, PA_ERR_PROTOCOL);
goto finish;
}
+ s->requested_bytes = (int64_t) requested_bytes;
+
if (s->context->version >= 9) {
if (s->direction == PA_STREAM_PLAYBACK) {
if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
if (free_cb && pa_pstream_get_shm(s->context->pstream))
free_cb((void*) data);
- if (length < s->requested_bytes)
- s->requested_bytes -= (uint32_t) length;
- else
- s->requested_bytes = 0;
-
- /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/
+ /* This is obviously wrong since we ignore the seeking index . But
+ * that's OK, the server side applies the same error */
+ s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
if (s->direction == PA_STREAM_PLAYBACK) {
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
- return s->requested_bytes;
+ return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
}
size_t pa_stream_readable_size(pa_stream *s) {
return l >= bq->minreq ? l : 0;
}
-void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek) {
+void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek, pa_bool_t account) {
int64_t old, delta;
pa_assert(bq);
delta = bq->write_index - old;
- if (delta >= (int64_t) bq->requested) {
- delta -= (int64_t) bq->requested;
- bq->requested = 0;
- } else if (delta >= 0) {
- bq->requested -= (size_t) delta;
- delta = 0;
+ if (account) {
+ if (delta >= (int64_t) bq->requested) {
+ delta -= (int64_t) bq->requested;
+ bq->requested = 0;
+ } else if (delta >= 0) {
+ bq->requested -= (size_t) delta;
+ delta = 0;
+ }
}
bq->missing -= delta;
pa_memblock_unref(chunk.memblock);
} else
- pa_memblockq_seek(bq, (int64_t) chunk.length, PA_SEEK_RELATIVE);
+ pa_memblockq_seek(bq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
pa_memblockq_drop(bq, chunk.length);
}
int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk);
/* Manipulate the write pointer */
-void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek);
+void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek, pa_bool_t account);
/* Return a copy of the next memory chunk in the queue. It is not
* removed from the queue. There are two reasons this function might
switch (code) {
case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
pa_tagstruct *t;
- uint32_t l = 0;
+ int l = 0;
for (;;) {
- if ((l = (uint32_t) pa_atomic_load(&s->missing)) <= 0)
- break;
+ if ((l = pa_atomic_load(&s->missing)) <= 0)
+ return 0;
- if (pa_atomic_cmpxchg(&s->missing, (int) l, 0))
+ if (pa_atomic_cmpxchg(&s->missing, l, 0))
break;
}
- if (l <= 0)
- break;
-
t = pa_tagstruct_new(NULL, 0);
pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
pa_tagstruct_putu32(t, s->index);
- pa_tagstruct_putu32(t, l);
+ pa_tagstruct_putu32(t, (uint32_t) l);
pa_pstream_send_tagstruct(s->connection->pstream, t);
/* pa_log("Requesting %lu bytes", (unsigned long) l); */
/* Called from IO context */
static void playback_stream_request_bytes(playback_stream *s) {
- size_t m, previous_missing, minreq;
+ size_t m, minreq;
+ int previous_missing;
playback_stream_assert_ref(s);
/* pa_log("request_bytes(%lu)", (unsigned long) m); */
- previous_missing = (size_t) pa_atomic_add(&s->missing, (int) m);
+ previous_missing = pa_atomic_add(&s->missing, (int) m);
minreq = pa_memblockq_get_minreq(s->memblockq);
if (pa_memblockq_prebuf_active(s->memblockq) ||
- (previous_missing < minreq && previous_missing+m >= minreq))
+ (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
}
int64_t windex;
windex = pa_memblockq_get_write_index(s->memblockq);
- pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata));
+
+ /* The client side is incapable of accounting correctly
+ * for seeks of a type != PA_SEEK_RELATIVE. We need to be
+ * able to deal with that. */
+
+ pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
handle_seek(s, windex);
return 0;
if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
pa_log_warn("Failed to push data into queue");
pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
- pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE);
+ pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
}
handle_seek(s, windex);
* data, so let's just hand out silence */
pa_atomic_store(&i->thread_info.drained, 1);
- pa_memblockq_seek(i->thread_info.render_memblockq, (int64_t) slength, PA_SEEK_RELATIVE);
+ pa_memblockq_seek(i->thread_info.render_memblockq, (int64_t) slength, PA_SEEK_RELATIVE, TRUE);
i->thread_info.playing_for = 0;
if (i->thread_info.underrun_for != (uint64_t) -1)
i->thread_info.underrun_for += ilength;
if (amount > 0)
/* Ok, now update the write pointer */
- pa_memblockq_seek(i->thread_info.render_memblockq, - ((int64_t) amount), PA_SEEK_RELATIVE);
+ pa_memblockq_seek(i->thread_info.render_memblockq, - ((int64_t) amount), PA_SEEK_RELATIVE, TRUE);
if (i->thread_info.rewrite_flush)
pa_memblockq_silence(i->thread_info.render_memblockq);
if (pa_memblockq_push(o->thread_info.delay_memblockq, chunk) < 0) {
pa_log_debug("Delay queue overflow!");
- pa_memblockq_seek(o->thread_info.delay_memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE);
+ pa_memblockq_seek(o->thread_info.delay_memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
}
limit = o->process_rewind ? 0 : o->source->thread_info.max_rewind;
ret = pa_memblockq_push(bq, &chunk4);
assert(ret == 0);
- pa_memblockq_seek(bq, -6, 0);
+ pa_memblockq_seek(bq, -6, 0, TRUE);
ret = pa_memblockq_push(bq, &chunk3);
assert(ret == 0);
- pa_memblockq_seek(bq, -2, 0);
+ pa_memblockq_seek(bq, -2, 0, TRUE);
ret = pa_memblockq_push(bq, &chunk1);
assert(ret == 0);
- pa_memblockq_seek(bq, -10, 0);
+ pa_memblockq_seek(bq, -10, 0, TRUE);
ret = pa_memblockq_push(bq, &chunk4);
assert(ret == 0);
- pa_memblockq_seek(bq, 10, 0);
+ pa_memblockq_seek(bq, 10, 0, TRUE);
ret = pa_memblockq_push(bq, &chunk1);
assert(ret == 0);
- pa_memblockq_seek(bq, -6, 0);
+ pa_memblockq_seek(bq, -6, 0, TRUE);
ret = pa_memblockq_push(bq, &chunk2);
assert(ret == 0);
/* Test splitting */
- pa_memblockq_seek(bq, -12, 0);
+ pa_memblockq_seek(bq, -12, 0, TRUE);
ret = pa_memblockq_push(bq, &chunk1);
assert(ret == 0);
- pa_memblockq_seek(bq, 20, 0);
+ pa_memblockq_seek(bq, 20, 0, TRUE);
/* Test merging */
ret = pa_memblockq_push(bq, &chunk3);
assert(ret == 0);
- pa_memblockq_seek(bq, -2, 0);
+ pa_memblockq_seek(bq, -2, 0, TRUE);
chunk3.index += 2;
chunk3.length -= 2;
ret = pa_memblockq_push(bq, &chunk3);
assert(ret == 0);
- pa_memblockq_seek(bq, 30, PA_SEEK_RELATIVE);
+ pa_memblockq_seek(bq, 30, PA_SEEK_RELATIVE, TRUE);
dump(bq);