size_t qlength;
struct sink_input *sink_input;
struct memblockq *memblockq;
+ size_t requested_bytes;
};
struct connection {
static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct pa_sample_spec *ss, const char *name, size_t qlen, size_t maxlength, size_t prebuf) {
struct playback_stream *s;
- assert(c && sink && s && name && qlen && maxlength && prebuf);
+ assert(c && sink && ss && name && qlen && maxlength && prebuf);
s = malloc(sizeof(struct playback_stream));
assert (s);
s->memblockq = memblockq_new(maxlength, pa_sample_size(ss), prebuf);
assert(s->memblockq);
+ s->requested_bytes = 0;
+
idxset_put(c->playback_streams, s, &s->index);
- request_bytes(s);
return s;
}
if (!(l = memblockq_missing_to(s->memblockq, s->qlength)))
return;
+ if (l <= s->requested_bytes)
+ return;
+
+ l -= s->requested_bytes;
+ s->requested_bytes += l;
+
t = tagstruct_new(NULL, 0);
assert(t);
tagstruct_putu32(t, PA_COMMAND_REQUEST);
+ tagstruct_putu32(t, (uint32_t) -1); /* tag */
tagstruct_putu32(t, s->index);
tagstruct_putu32(t, l);
pstream_send_tagstruct(s->connection->pstream, t);
+
+/* fprintf(stderr, "Requesting %u bytes\n", l);*/
}
/*** sinkinput callbacks ***/
assert(s->sink_input);
tagstruct_putu32(reply, s->sink_input->index);
pstream_send_tagstruct(c->pstream, reply);
+ request_bytes(s);
return 0;
}
/*** pstream callbacks ***/
-
static int packet_callback(struct pstream *p, struct packet *packet, void *userdata) {
struct connection *c = userdata;
assert(p && packet && packet->data && c);
return -1;
}
+ if (chunk->length >= stream->requested_bytes)
+ stream->requested_bytes = 0;
+ else
+ stream->requested_bytes -= chunk->length;
+
memblockq_push(stream->memblockq, chunk, delta);
assert(stream->sink_input);
sink_notify(stream->sink_input->sink);
+ /*fprintf(stderr, "Recieved %u bytes.\n", chunk->length);*/
+
return 0;
}
p->server = server;
p->core = core;
p->connections = idxset_new(NULL, NULL);
+ assert(p->connections);
socket_server_set_callback(p->server, on_connection, p);