+ pa_pdispatch_unref(c->pdispatch);
+ pa_pstream_unref(c->pstream);
+ pa_client_free(c->client);
+
+ pa_xfree(c);
+}
+
+/* Called from thread context */
+static void request_bytes(playback_stream *s) {
+ size_t m, previous_missing;
+
+ playback_stream_assert_ref(s);
+
+ m = pa_memblockq_pop_missing(s->memblockq);
+
+ if (m <= 0)
+ return;
+
+/* pa_log("request_bytes(%lu)", (unsigned long) m); */
+
+ previous_missing = pa_atomic_add(&s->missing, m);
+
+ if (pa_memblockq_prebuf_active(s->memblockq) ||
+ (previous_missing < s->minreq && previous_missing+m >= s->minreq)) {
+ pa_assert(pa_thread_mq_get());
+ pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
+ }
+}
+
+static void send_memblock(connection *c) {
+ uint32_t start;
+ record_stream *r;
+
+ start = PA_IDXSET_INVALID;
+ for (;;) {
+ pa_memchunk chunk;
+
+ if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
+ return;
+
+ if (start == PA_IDXSET_INVALID)
+ start = c->rrobin_index;
+ else if (start == c->rrobin_index)
+ return;
+
+ if (pa_memblockq_peek(r->memblockq, &chunk) >= 0) {
+ pa_memchunk schunk = chunk;
+
+ if (schunk.length > r->fragment_size)
+ schunk.length = r->fragment_size;
+
+ pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
+
+ pa_memblockq_drop(r->memblockq, schunk.length);
+ pa_memblock_unref(schunk.memblock);
+
+ return;
+ }
+ }
+}
+
+static void send_playback_stream_killed(playback_stream *p) {
+ pa_tagstruct *t;
+ playback_stream_assert_ref(p);
+
+ t = pa_tagstruct_new(NULL, 0);
+ pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
+ pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+ pa_tagstruct_putu32(t, p->index);
+ pa_pstream_send_tagstruct(p->connection->pstream, t);
+}
+
+static void send_record_stream_killed(record_stream *r) {
+ pa_tagstruct *t;
+ record_stream_assert_ref(r);
+
+ t = pa_tagstruct_new(NULL, 0);
+ pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
+ pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+ pa_tagstruct_putu32(t, r->index);
+ pa_pstream_send_tagstruct(r->connection->pstream, t);
+}
+
+/*** sink input callbacks ***/
+
+static void handle_seek(playback_stream *s, int64_t indexw) {
+ playback_stream_assert_ref(s);
+
+/* pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
+
+ if (s->sink_input->thread_info.underrun_for > 0) {
+
+/* pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
+
+ if (pa_memblockq_is_readable(s->memblockq)) {
+
+ /* We just ended an underrun, let's ask the sink
+ * for a complete rewind rewrite */
+
+ pa_log_debug("Requesting rewind due to end of underrun.");
+ pa_sink_input_request_rewind(s->sink_input,
+ s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for,
+ FALSE, TRUE);
+ }
+
+ } else {
+ int64_t indexr;
+
+ indexr = pa_memblockq_get_read_index(s->memblockq);
+
+ if (indexw < indexr) {
+ /* OK, the sink already asked for this data, so
+ * let's have it usk us again */
+
+ pa_log_debug("Requesting rewind due to rewrite.");
+ pa_sink_input_request_rewind(s->sink_input, indexr - indexw, TRUE, FALSE);
+ }
+ }
+
+ request_bytes(s);
+}
+
+/* Called from thread context */
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
+ pa_sink_input *i = PA_SINK_INPUT(o);
+ playback_stream *s;
+
+ pa_sink_input_assert_ref(i);
+ s = PLAYBACK_STREAM(i->userdata);
+ playback_stream_assert_ref(s);
+
+ switch (code) {
+
+ case SINK_INPUT_MESSAGE_SEEK: {
+ int64_t windex;
+
+ windex = pa_memblockq_get_write_index(s->memblockq);
+ pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata));
+
+ handle_seek(s, windex);
+ return 0;
+ }
+
+ case SINK_INPUT_MESSAGE_POST_DATA: {
+ int64_t windex;
+
+ pa_assert(chunk);
+
+ windex = pa_memblockq_get_write_index(s->memblockq);
+
+/* pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
+
+ if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
+ pa_log_warn("Failed to push data into queue");
+ pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
+ pa_memblockq_seek(s->memblockq, chunk->length, PA_SEEK_RELATIVE);
+ }
+
+ handle_seek(s, windex);
+
+/* pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
+
+ return 0;
+ }
+
+ case SINK_INPUT_MESSAGE_DRAIN:
+ case SINK_INPUT_MESSAGE_FLUSH:
+ case SINK_INPUT_MESSAGE_PREBUF_FORCE:
+ case SINK_INPUT_MESSAGE_TRIGGER: {
+
+ int64_t windex;
+ pa_sink_input *isync;
+ void (*func)(pa_memblockq *bq);
+
+ switch (code) {
+ case SINK_INPUT_MESSAGE_FLUSH:
+ func = pa_memblockq_flush;
+ break;
+
+ case SINK_INPUT_MESSAGE_PREBUF_FORCE:
+ func = pa_memblockq_prebuf_force;
+ break;
+
+ case SINK_INPUT_MESSAGE_DRAIN:
+ case SINK_INPUT_MESSAGE_TRIGGER:
+ func = pa_memblockq_prebuf_disable;
+ break;
+
+ default:
+ pa_assert_not_reached();
+ }
+
+ windex = pa_memblockq_get_write_index(s->memblockq);
+ func(s->memblockq);
+ handle_seek(s, windex);
+
+ /* Do the same for all other members in the sync group */
+ for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
+ playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
+ windex = pa_memblockq_get_write_index(ssync->memblockq);
+ func(ssync->memblockq);
+ handle_seek(ssync, windex);
+ }
+
+ for (isync = i->sync_next; isync; isync = isync->sync_next) {
+ playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
+ windex = pa_memblockq_get_write_index(ssync->memblockq);
+ func(ssync->memblockq);
+ handle_seek(ssync, windex);
+ }
+
+ if (code == SINK_INPUT_MESSAGE_DRAIN) {
+ if (!pa_memblockq_is_readable(s->memblockq))
+ pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
+ else {
+ s->drain_tag = PA_PTR_TO_UINT(userdata);
+ s->drain_request = TRUE;
+ }
+ }
+
+ return 0;
+ }
+
+ case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
+
+ s->read_index = pa_memblockq_get_read_index(s->memblockq);
+ s->write_index = pa_memblockq_get_write_index(s->memblockq);
+ s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
+ return 0;
+
+ case PA_SINK_INPUT_MESSAGE_SET_STATE: {
+ int64_t windex;
+
+ windex = pa_memblockq_get_write_index(s->memblockq);
+
+ pa_memblockq_prebuf_force(s->memblockq);
+
+ handle_seek(s, windex);
+
+ /* Fall through to the default handler */
+ break;
+ }
+
+ case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
+ pa_usec_t *r = userdata;
+
+ *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
+
+ /* Fall through, the default handler will add in the extra
+ * latency added by the resampler */
+ break;
+ }
+ }
+
+ return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
+}
+
+/* Called from thread context */
+static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
+ playback_stream *s;
+
+ pa_sink_input_assert_ref(i);
+ s = PLAYBACK_STREAM(i->userdata);
+ playback_stream_assert_ref(s);
+ pa_assert(chunk);
+
+ if (pa_memblockq_peek(s->memblockq, chunk) < 0) {
+
+/* pa_log("UNDERRUN: %lu", pa_memblockq_get_length(s->memblockq)); */
+
+ if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
+ s->drain_request = FALSE;
+ pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
+ } else if (i->thread_info.playing_for > 0)
+ pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
+
+/* pa_log("adding %llu bytes, total is %llu", (unsigned long long) nbytes, (unsigned long long) i->thread_info.underrun_for); */
+
+ request_bytes(s);
+
+ return -1;
+ }
+
+/* pa_log("NOTUNDERRUN"); */