+ return pa_sink_process_msg(o, code, data, offset, chunk);
+}
+
+static void thread_func(void *userdata) {
+ struct userdata *u = userdata;
+ int write_type = 0;
+
+ pa_assert(u);
+
+ pa_log_debug("Thread starting up");
+
+ pa_thread_mq_install(&u->thread_mq);
+
+ pa_smoother_set_time_offset(u->smoother, pa_rtclock_now());
+
+ for (;;) {
+ int ret;
+
+ if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
+ pa_sink_process_rewind(u->sink, 0);
+
+ if (u->rtpoll_item) {
+ struct pollfd *pollfd;
+ pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+
+ /* Render some data and write it to the fifo */
+ if (PA_SINK_IS_OPENED(u->sink->thread_info.state) && pollfd->revents) {
+ pa_usec_t usec;
+ int64_t n;
+
+ for (;;) {
+ ssize_t l;
+ void *p;
+
+ if (u->memchunk.length <= 0)
+ pa_sink_render(u->sink, u->block_size, &u->memchunk);
+
+ pa_assert(u->memchunk.length > 0);
+
+ p = pa_memblock_acquire(u->memchunk.memblock);
+ l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &write_type);
+ pa_memblock_release(u->memchunk.memblock);
+
+ pa_assert(l != 0);
+
+ if (l < 0) {
+
+ if (errno == EINTR)
+ continue;
+ else if (errno == EAGAIN) {
+
+ /* OK, we filled all socket buffers up
+ * now. */
+ goto filled_up;
+
+ } else {
+ pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+
+ } else {
+ u->offset += l;
+
+ u->memchunk.index += (size_t) l;
+ u->memchunk.length -= (size_t) l;
+
+ if (u->memchunk.length <= 0) {
+ pa_memblock_unref(u->memchunk.memblock);
+ pa_memchunk_reset(&u->memchunk);
+ }
+
+ pollfd->revents = 0;
+
+ if (u->memchunk.length > 0)
+
+ /* OK, we wrote less that we asked for,
+ * hence we can assume that the socket
+ * buffers are full now */
+ goto filled_up;
+ }
+ }
+
+ filled_up:
+
+ /* At this spot we know that the socket buffers are
+ * fully filled up. This is the best time to estimate
+ * the playback position of the server */
+
+ n = u->offset;
+
+#ifdef SIOCOUTQ
+ {
+ int l;
+ if (ioctl(u->fd, SIOCOUTQ, &l) >= 0 && l > 0)
+ n -= l;
+ }
+#endif
+
+ usec = pa_bytes_to_usec((uint64_t) n, &u->sink->sample_spec);
+
+ if (usec > u->latency)
+ usec -= u->latency;
+ else
+ usec = 0;
+
+ pa_smoother_put(u->smoother, pa_rtclock_now(), usec);
+ }
+
+ /* Hmm, nothing to do. Let's sleep */
+ pollfd->events = (short) (PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0);
+ }
+
+ if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
+ goto fail;
+
+ if (ret == 0)
+ goto finish;
+
+ if (u->rtpoll_item) {
+ struct pollfd* pollfd;
+
+ pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+
+ if (pollfd->revents & ~POLLOUT) {
+ pa_log("FIFO shutdown.");
+ goto fail;
+ }
+ }