+ int read_type = 0;
+
+ pa_assert(u);
+
+ pa_log_debug("Thread starting up");
+
+ pa_thread_mq_install(&u->thread_mq);
+
+ for (;;) {
+ int ret;
+ struct pollfd *pollfd;
+
+ pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+
+ /* Try to read some data and pass it on to the source driver */
+ if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd->revents) {
+ ssize_t l;
+ void *p;
+
+ if (!u->memchunk.memblock) {
+ u->memchunk.memblock = pa_memblock_new(u->core->mempool, pa_pipe_buf(u->fd));
+ u->memchunk.index = u->memchunk.length = 0;
+ }
+
+ pa_assert(pa_memblock_get_length(u->memchunk.memblock) > u->memchunk.index);
+
+ p = pa_memblock_acquire(u->memchunk.memblock);
+ l = pa_read(u->fd, (uint8_t*) p + u->memchunk.index, pa_memblock_get_length(u->memchunk.memblock) - u->memchunk.index, &read_type);
+ pa_memblock_release(u->memchunk.memblock);
+
+ pa_assert(l != 0); /* EOF cannot happen, since we opened the fifo for both reading and writing */
+
+ if (l < 0) {
+
+ if (errno == EINTR)
+ continue;
+ else if (errno != EAGAIN) {
+ pa_log("Failed to read data from FIFO: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+
+ } else {
+
+ u->memchunk.length = (size_t) l;
+ pa_source_post(u->source, &u->memchunk);
+ u->memchunk.index += (size_t) l;
+
+ if (u->memchunk.index >= pa_memblock_get_length(u->memchunk.memblock)) {
+ pa_memblock_unref(u->memchunk.memblock);
+ pa_memchunk_reset(&u->memchunk);
+ }
+
+ pollfd->revents = 0;
+ }
+ }
+
+ /* Hmm, nothing to do. Let's sleep */
+ pollfd->events = (short) (u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0);
+
+ if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
+ goto fail;
+
+ if (ret == 0)
+ goto finish;
+
+ pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+
+ if (pollfd->revents & ~POLLIN) {
+ pa_log("FIFO shutdown.");
+ goto fail;
+ }
+ }
+
+fail:
+ /* If this was no regular exit from the loop we have to continue
+ * processing messages until we received PA_MESSAGE_SHUTDOWN */
+ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
+ pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
+
+finish:
+ pa_log_debug("Thread shutting down");