]> code.delx.au - pulseaudio/blobdiff - src/modules/module-pipe-sink.c
bluetooth: Check adapter address to set device_info_valid
[pulseaudio] / src / modules / module-pipe-sink.c
index 1b6d081375c854ebebeadf3707f4ad5723285b51..4d82cc37b22e297accf995f03df838e5d8c7b567 100644 (file)
@@ -1,5 +1,3 @@
-/* $Id$ */
-
 /***
   This file is part of PulseAudio.
 
 /***
   This file is part of PulseAudio.
 
@@ -7,7 +5,7 @@
 
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as published
 
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as published
-  by the Free Software Foundation; either version 2 of the License,
+  by the Free Software Foundation; either version 2.1 of the License,
   or (at your option) any later version.
 
   PulseAudio is distributed in the hope that it will be useful, but
   or (at your option) any later version.
 
   PulseAudio is distributed in the hope that it will be useful, but
 #include <sys/stat.h>
 #include <stdio.h>
 #include <errno.h>
 #include <sys/stat.h>
 #include <stdio.h>
 #include <errno.h>
-#include <string.h>
 #include <fcntl.h>
 #include <unistd.h>
 #include <fcntl.h>
 #include <unistd.h>
-#include <limits.h>
 #include <sys/ioctl.h>
 #include <sys/ioctl.h>
-#include <sys/poll.h>
+
+#ifdef HAVE_SYS_FILIO_H
+#include <sys/filio.h>
+#endif
 
 #include <pulse/xmalloc.h>
 
 
 #include <pulse/xmalloc.h>
 
 #include <pulsecore/modargs.h>
 #include <pulsecore/log.h>
 #include <pulsecore/thread.h>
 #include <pulsecore/modargs.h>
 #include <pulsecore/log.h>
 #include <pulsecore/thread.h>
+#include <pulsecore/thread-mq.h>
+#include <pulsecore/rtpoll.h>
+#include <pulsecore/poll.h>
 
 #include "module-pipe-sink-symdef.h"
 
 
 #include "module-pipe-sink-symdef.h"
 
-PA_MODULE_AUTHOR("Lennart Poettering")
-PA_MODULE_DESCRIPTION("UNIX pipe sink")
-PA_MODULE_VERSION(PACKAGE_VERSION)
+PA_MODULE_AUTHOR("Lennart Poettering");
+PA_MODULE_DESCRIPTION("UNIX pipe sink");
+PA_MODULE_VERSION(PACKAGE_VERSION);
+PA_MODULE_LOAD_ONCE(false);
 PA_MODULE_USAGE(
         "sink_name=<name for the sink> "
 PA_MODULE_USAGE(
         "sink_name=<name for the sink> "
+        "sink_properties=<properties for the sink> "
         "file=<path of the FIFO> "
         "format=<sample format> "
         "file=<path of the FIFO> "
         "format=<sample format> "
+        "rate=<sample rate> "
         "channels=<number of channels> "
         "channels=<number of channels> "
-        "rate=<sample rate>"
-        "channel_map=<channel map>")
+        "channel_map=<channel map>");
 
 
-#define DEFAULT_FILE_NAME "/tmp/music.output"
+#define DEFAULT_FILE_NAME "fifo_output"
 #define DEFAULT_SINK_NAME "fifo_output"
 
 struct userdata {
     pa_core *core;
     pa_module *module;
     pa_sink *sink;
 #define DEFAULT_SINK_NAME "fifo_output"
 
 struct userdata {
     pa_core *core;
     pa_module *module;
     pa_sink *sink;
+
     pa_thread *thread;
     pa_thread *thread;
-    pa_asyncmsgq *asyncmsgq;
+    pa_thread_mq thread_mq;
+    pa_rtpoll *rtpoll;
+
     char *filename;
     int fd;
 
     pa_memchunk memchunk;
     char *filename;
     int fd;
 
     pa_memchunk memchunk;
+
+    pa_rtpoll_item *rtpoll_item;
+
+    int write_type;
 };
 
 static const char* const valid_modargs[] = {
 };
 
 static const char* const valid_modargs[] = {
+    "sink_name",
+    "sink_properties",
     "file",
     "file",
-    "rate",
     "format",
     "format",
+    "rate",
     "channels",
     "channels",
-    "sink_name",
     "channel_map",
     NULL
 };
     "channel_map",
     NULL
 };
@@ -88,157 +100,134 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
     struct userdata *u = PA_SINK(o)->userdata;
 
     switch (code) {
     struct userdata *u = PA_SINK(o)->userdata;
 
     switch (code) {
-            
+
         case PA_SINK_MESSAGE_GET_LATENCY: {
             size_t n = 0;
         case PA_SINK_MESSAGE_GET_LATENCY: {
             size_t n = 0;
+
+#ifdef FIONREAD
             int l;
             int l;
-            
-            if (ioctl(u->fd, TIOCINQ, &l) >= 0 && l > 0)
+
+            if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0)
                 n = (size_t) l;
                 n = (size_t) l;
-            
+#endif
+
             n += u->memchunk.length;
             n += u->memchunk.length;
-            
+
             *((pa_usec_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec);
             *((pa_usec_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec);
-            break;
+            return 0;
         }
     }
         }
     }
-    
+
     return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 
-static void thread_func(void *userdata) {
-    enum {
-        POLLFD_ASYNCQ,
-        POLLFD_FIFO,
-        POLLFD_MAX,
-    };
-    
-    struct userdata *u = userdata;
-    struct pollfd pollfd[POLLFD_MAX];
-    int write_type = 0;
-
+static int process_render(struct userdata *u) {
     pa_assert(u);
 
     pa_assert(u);
 
-    pa_log_debug("Thread starting up");
+    if (u->memchunk.length <= 0)
+        pa_sink_render(u->sink, pa_pipe_buf(u->fd), &u->memchunk);
 
 
-    memset(&pollfd, 0, sizeof(pollfd));
-    
-    pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->asyncmsgq);
-    pollfd[POLLFD_ASYNCQ].events = POLLIN;
-    pollfd[POLLFD_FIFO].fd = u->fd;
+    pa_assert(u->memchunk.length > 0);
 
     for (;;) {
 
     for (;;) {
-        pa_msgobject *object;
-        int code;
-        void *data;
-        pa_memchunk chunk;
-        int r;
-        int64_t offset;
-
-        /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset, &chunk, 0) == 0) {
-            int ret;
-
-            if (!object && code == PA_MESSAGE_SHUTDOWN) {
-                pa_asyncmsgq_done(u->asyncmsgq, 0);
-                goto finish;
+        ssize_t l;
+        void *p;
+
+        p = pa_memblock_acquire(u->memchunk.memblock);
+        l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &u->write_type);
+        pa_memblock_release(u->memchunk.memblock);
+
+        pa_assert(l != 0);
+
+        if (l < 0) {
+
+            if (errno == EINTR)
+                continue;
+            else if (errno == EAGAIN)
+                return 0;
+            else {
+                pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
+                return -1;
             }
 
             }
 
-            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
-            pa_asyncmsgq_done(u->asyncmsgq, ret);
-            continue;
-        }
+        } else {
 
 
-        /* Render some data and write it to the fifo */
+            u->memchunk.index += (size_t) l;
+            u->memchunk.length -= (size_t) l;
 
 
-        if (u->sink->thread_info.state == PA_SINK_RUNNING && pollfd[POLLFD_FIFO].revents) {
-            ssize_t l;
-            void *p;
+            if (u->memchunk.length <= 0) {
+                pa_memblock_unref(u->memchunk.memblock);
+                pa_memchunk_reset(&u->memchunk);
+            }
+        }
 
 
-            if (u->memchunk.length <= 0)
-                pa_sink_render(u->sink, PIPE_BUF, &u->memchunk);
+        return 0;
+    }
+}
 
 
-            pa_assert(u->memchunk.length > 0);
+static void thread_func(void *userdata) {
+    struct userdata *u = userdata;
 
 
-            p = pa_memblock_acquire(u->memchunk.memblock);
-            l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &write_type);
-            pa_memblock_release(u->memchunk.memblock);
+    pa_assert(u);
 
 
-            pa_assert(l != 0);
+    pa_log_debug("Thread starting up");
 
 
-            if (l < 0) {
+    pa_thread_mq_install(&u->thread_mq);
 
 
-                if (errno == EINTR)
-                    continue;
-                else if (errno != EAGAIN) {
-                    pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
-                    goto fail;
-                }
+    for (;;) {
+        struct pollfd *pollfd;
+        int ret;
 
 
-            } else {
+        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
 
 
-                u->memchunk.index += l;
-                u->memchunk.length -= l;
+        if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
+            pa_sink_process_rewind(u->sink, 0);
 
 
-                if (u->memchunk.length <= 0) {
-                    pa_memblock_unref(u->memchunk.memblock);
-                    pa_memchunk_reset(&u->memchunk);
-                }
+        /* Render some data and write it to the fifo */
+        if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
+            if (pollfd->revents) {
+                if (process_render(u) < 0)
+                    goto fail;
 
 
-                pollfd[POLLFD_FIFO].revents = 0;
-                continue;
+                pollfd->revents = 0;
             }
         }
 
             }
         }
 
-        pollfd[POLLFD_FIFO].events = u->sink->thread_info.state == PA_SINK_RUNNING ? POLLOUT : 0;
-
         /* Hmm, nothing to do. Let's sleep */
         /* Hmm, nothing to do. Let's sleep */
+        pollfd->events = (short) (u->sink->thread_info.state == PA_SINK_RUNNING ? POLLOUT : 0);
 
 
-        if (pa_asyncmsgq_before_poll(u->asyncmsgq) < 0)
-            continue;
-
-/*         pa_log("polling for %u", pollfd[POLLFD_FIFO].events);  */
-        r = poll(pollfd, POLLFD_MAX, -1);
-/*         pa_log("polling got %u", r > 0 ? pollfd[POLLFD_FIFO].revents : 0);  */
+        if ((ret = pa_rtpoll_run(u->rtpoll, true)) < 0)
+            goto fail;
 
 
-        pa_asyncmsgq_after_poll(u->asyncmsgq);
+        if (ret == 0)
+            goto finish;
 
 
-        if (r < 0) {
-            if (errno == EINTR) {
-                pollfd[POLLFD_ASYNCQ].revents = 0;
-                pollfd[POLLFD_FIFO].revents = 0;
-                continue;
-            }
+        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
 
 
-            pa_log("poll() failed: %s", pa_cstrerror(errno));
-            goto fail;
-        }
-
-        if (pollfd[POLLFD_FIFO].revents & ~POLLOUT) {
+        if (pollfd->revents & ~POLLOUT) {
             pa_log("FIFO shutdown.");
             goto fail;
         }
             pa_log("FIFO shutdown.");
             goto fail;
         }
-
-        pa_assert((pollfd[POLLFD_ASYNCQ].revents & ~POLLIN) == 0);
     }
 
 fail:
     }
 
 fail:
-    /* We have to continue processing messages until we receive the
-     * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
-    pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
+    /* If this was no regular exit from the loop we have to continue
+     * processing messages until we received PA_MESSAGE_SHUTDOWN */
+    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
+    pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
 
 finish:
     pa_log_debug("Thread shutting down");
 }
 
 
 finish:
     pa_log_debug("Thread shutting down");
 }
 
-int pa__init(pa_module*m) {
+int pa__init(pa_module *m) {
     struct userdata *u;
     struct stat st;
     pa_sample_spec ss;
     pa_channel_map map;
     pa_modargs *ma;
     struct userdata *u;
     struct stat st;
     pa_sample_spec ss;
     pa_channel_map map;
     pa_modargs *ma;
-    char *t;
+    struct pollfd *pollfd;
+    pa_sink_new_data data;
 
     pa_assert(m);
 
 
     pa_assert(m);
 
@@ -248,6 +237,7 @@ int pa__init(pa_module*m) {
     }
 
     ss = m->core->default_sample_spec;
     }
 
     ss = m->core->default_sample_spec;
+    map = m->core->default_channel_map;
     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
         pa_log("Invalid sample format specification or channel map");
         goto fail;
     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
         pa_log("Invalid sample format specification or channel map");
         goto fail;
@@ -258,19 +248,22 @@ int pa__init(pa_module*m) {
     u->module = m;
     m->userdata = u;
     pa_memchunk_reset(&u->memchunk);
     u->module = m;
     m->userdata = u;
     pa_memchunk_reset(&u->memchunk);
+    u->rtpoll = pa_rtpoll_new();
+    pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
+    u->write_type = 0;
 
 
-    pa_assert_se(u->asyncmsgq = pa_asyncmsgq_new(0));
-    
-    u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME));
+    u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME));
 
 
-    mkfifo(u->filename, 0666);
-    if ((u->fd = open(u->filename, O_RDWR|O_NOCTTY)) < 0) {
+    if (mkfifo(u->filename, 0666) < 0) {
+        pa_log("mkfifo('%s'): %s", u->filename, pa_cstrerror(errno));
+        goto fail;
+    }
+    if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) {
         pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
         goto fail;
     }
 
         pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
         goto fail;
     }
 
-    pa_fd_set_cloexec(u->fd, 1);
-    pa_make_nonblock_fd(u->fd);
+    pa_make_fd_nonblock(u->fd);
 
     if (fstat(u->fd, &st) < 0) {
         pa_log("fstat('%s'): %s", u->filename, pa_cstrerror(errno));
 
     if (fstat(u->fd, &st) < 0) {
         pa_log("fstat('%s'): %s", u->filename, pa_cstrerror(errno));
@@ -282,24 +275,49 @@ int pa__init(pa_module*m) {
         goto fail;
     }
 
         goto fail;
     }
 
-    if (!(u->sink = pa_sink_new(m->core, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, &map))) {
+    pa_sink_new_data_init(&data);
+    data.driver = __FILE__;
+    data.module = m;
+    pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
+    pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, u->filename);
+    pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Unix FIFO sink %s", u->filename);
+    pa_sink_new_data_set_sample_spec(&data, &ss);
+    pa_sink_new_data_set_channel_map(&data, &map);
+
+    if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
+        pa_log("Invalid properties");
+        pa_sink_new_data_done(&data);
+        goto fail;
+    }
+
+    u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
+    pa_sink_new_data_done(&data);
+
+    if (!u->sink) {
         pa_log("Failed to create sink.");
         goto fail;
     }
 
     u->sink->parent.process_msg = sink_process_msg;
     u->sink->userdata = u;
         pa_log("Failed to create sink.");
         goto fail;
     }
 
     u->sink->parent.process_msg = sink_process_msg;
     u->sink->userdata = u;
-    
-    pa_sink_set_module(u->sink, m);
-    pa_sink_set_asyncmsgq(u->sink, u->asyncmsgq);
-    pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Unix FIFO sink '%s'", u->filename));
-    pa_xfree(t);
 
 
-    if (!(u->thread = pa_thread_new(thread_func, u))) {
+    pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
+    pa_sink_set_rtpoll(u->sink, u->rtpoll);
+    pa_sink_set_max_request(u->sink, pa_pipe_buf(u->fd));
+    pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(pa_pipe_buf(u->fd), &u->sink->sample_spec));
+
+    u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
+    pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+    pollfd->fd = u->fd;
+    pollfd->events = pollfd->revents = 0;
+
+    if (!(u->thread = pa_thread_new("pipe-sink", thread_func, u))) {
         pa_log("Failed to create thread.");
         goto fail;
     }
 
         pa_log("Failed to create thread.");
         goto fail;
     }
 
+    pa_sink_put(u->sink);
+
     pa_modargs_free(ma);
 
     return 0;
     pa_modargs_free(ma);
 
     return 0;
@@ -313,30 +331,44 @@ fail:
     return -1;
 }
 
     return -1;
 }
 
-void pa__done(pa_module*m) {
+int pa__get_n_used(pa_module *m) {
     struct userdata *u;
     struct userdata *u;
-    
+
+    pa_assert(m);
+    pa_assert_se(u = m->userdata);
+
+    return pa_sink_linked_by(u->sink);
+}
+
+void pa__done(pa_module *m) {
+    struct userdata *u;
+
     pa_assert(m);
 
     if (!(u = m->userdata))
         return;
 
     if (u->sink)
     pa_assert(m);
 
     if (!(u = m->userdata))
         return;
 
     if (u->sink)
-        pa_sink_disconnect(u->sink);
+        pa_sink_unlink(u->sink);
 
     if (u->thread) {
 
     if (u->thread) {
-        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
+        pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
         pa_thread_free(u->thread);
     }
 
         pa_thread_free(u->thread);
     }
 
-    if (u->asyncmsgq)
-        pa_asyncmsgq_free(u->asyncmsgq);
-    
+    pa_thread_mq_done(&u->thread_mq);
+
     if (u->sink)
         pa_sink_unref(u->sink);
 
     if (u->memchunk.memblock)
     if (u->sink)
         pa_sink_unref(u->sink);
 
     if (u->memchunk.memblock)
-       pa_memblock_unref(u->memchunk.memblock);
+        pa_memblock_unref(u->memchunk.memblock);
+
+    if (u->rtpoll_item)
+        pa_rtpoll_item_free(u->rtpoll_item);
+
+    if (u->rtpoll)
+        pa_rtpoll_free(u->rtpoll);
 
     if (u->filename) {
         unlink(u->filename);
 
     if (u->filename) {
         unlink(u->filename);
@@ -344,7 +376,7 @@ void pa__done(pa_module*m) {
     }
 
     if (u->fd >= 0)
     }
 
     if (u->fd >= 0)
-        close(u->fd);
+        pa_assert_se(pa_close(u->fd) == 0);
 
     pa_xfree(u);
 }
 
     pa_xfree(u);
 }