]> code.delx.au - pulseaudio/blobdiff - src/pulsecore/asyncq.c
Merge branch 'master' of git://0pointer.de/pulseaudio into dbus-work
[pulseaudio] / src / pulsecore / asyncq.c
index da1f16fb75bd1f9c0438051548981101418f1a36..072ef02cc3a135e405200cc0c54d8bfa8b8f5a4a 100644 (file)
@@ -1,9 +1,7 @@
-/* $Id$ */
-
 /***
   This file is part of PulseAudio.
 
-  Copyright 2006 Lennart Poettering
+  Copyright 2006-2008 Lennart Poettering
 
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as
 #include <pulsecore/thread.h>
 #include <pulsecore/macro.h>
 #include <pulsecore/core-util.h>
+#include <pulsecore/llist.h>
+#include <pulsecore/flist.h>
 #include <pulse/xmalloc.h>
 
 #include "asyncq.h"
+#include "fdsem.h"
 
-#define ASYNCQ_SIZE 128
+#define ASYNCQ_SIZE 256
 
-/* For debugging purposes we can define _Y to put and extra thread
+/* For debugging purposes we can define _Y to put an extra thread
  * yield between each operation. */
 
+/* #define PROFILE */
+
 #ifdef PROFILE
 #define _Y pa_thread_yield()
 #else
 #define _Y do { } while(0)
 #endif
 
+struct localq {
+    void *data;
+    PA_LLIST_FIELDS(struct localq);
+};
+
 struct pa_asyncq {
     unsigned size;
     unsigned read_idx;
     unsigned write_idx;
-    pa_atomic_t read_waiting, n_read;
-    pa_atomic_t write_waiting, n_written;
-    int read_fds[2], write_fds[2];
+    pa_fdsem *read_fdsem, *write_fdsem;
+
+    PA_LLIST_HEAD(struct localq, localq);
+    struct localq *last_localq;
+    pa_bool_t waiting_for_post;
 };
 
-#define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
+PA_STATIC_FLIST_DECLARE(localq, 0, pa_xfree);
 
-static int is_power_of_two(unsigned size) {
-    return !(size & (size - 1));
-}
+#define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
 
-static int reduce(pa_asyncq *l, int value) {
+static unsigned reduce(pa_asyncq *l, unsigned value) {
     return value & (unsigned) (l->size - 1);
 }
 
@@ -73,35 +81,32 @@ pa_asyncq *pa_asyncq_new(unsigned size) {
     if (!size)
         size = ASYNCQ_SIZE;
 
-    pa_assert(is_power_of_two(size));
+    pa_assert(pa_is_power_of_two(size));
 
     l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size));
 
     l->size = size;
-    pa_atomic_store(&l->read_waiting, 0);
-    pa_atomic_store(&l->write_waiting, 0);
-    pa_atomic_store(&l->n_written, 0);
-    pa_atomic_store(&l->n_read, 0);
 
-    if (pipe(l->read_fds) < 0) {
+    PA_LLIST_HEAD_INIT(struct localq, l->localq);
+    l->last_localq = NULL;
+    l->waiting_for_post = FALSE;
+
+    if (!(l->read_fdsem = pa_fdsem_new())) {
         pa_xfree(l);
         return NULL;
     }
 
-    if (pipe(l->write_fds) < 0) {
-        pa_close(l->read_fds[0]);
-        pa_close(l->read_fds[1]);
+    if (!(l->write_fdsem = pa_fdsem_new())) {
+        pa_fdsem_free(l->read_fdsem);
         pa_xfree(l);
         return NULL;
     }
 
-    pa_make_nonblock_fd(l->read_fds[1]);
-    pa_make_nonblock_fd(l->write_fds[1]);
-
     return l;
 }
 
 void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
+    struct localq *q;
     pa_assert(l);
 
     if (free_cb) {
@@ -111,16 +116,23 @@ void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
             free_cb(p);
     }
 
-    pa_close(l->read_fds[0]);
-    pa_close(l->read_fds[1]);
-    pa_close(l->write_fds[0]);
-    pa_close(l->write_fds[1]);
+    while ((q = l->localq)) {
+        if (free_cb)
+            free_cb(q->data);
+
+        PA_LLIST_REMOVE(struct localq, l->localq, q);
 
+        if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
+            pa_xfree(q);
+    }
+
+    pa_fdsem_free(l->read_fdsem);
+    pa_fdsem_free(l->write_fdsem);
     pa_xfree(l);
 }
 
-int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
-    int idx;
+static int push(pa_asyncq*l, void *p, pa_bool_t wait_op) {
+    unsigned idx;
     pa_atomic_ptr_t *cells;
 
     pa_assert(l);
@@ -133,157 +145,127 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
 
     if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
 
-        if (!wait) {
-            /* Let's empty the FIFO from old notifications, before we return */
-            
-            while (pa_atomic_load(&l->n_read) > 0) {
-                ssize_t r;
-                int x[20];
-                
-                errno = 0;
-                if ((r = read(l->write_fds[0], x, sizeof(x))) <= 0 && errno != EINTR)
-                    return -1;
-                
-                if (r > 0)
-                    if (pa_atomic_sub(&l->n_read, r) <= r)
-                        break;
-            }
-            
+        if (!wait_op)
             return -1;
-        }
 
-        /* First try failed. Let's wait for changes. */
+/*         pa_log("sleeping on push"); */
 
-        _Y;
+        do {
+            pa_fdsem_wait(l->read_fdsem);
+        } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p));
+    }
 
-        pa_atomic_inc(&l->write_waiting);
+    _Y;
+    l->write_idx++;
 
-        for (;;) {
-            char x[20];
-            ssize_t r;
+    pa_fdsem_post(l->write_fdsem);
 
-            _Y;
+    return 0;
+}
 
-            if (pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p))
-                break;
+static pa_bool_t flush_postq(pa_asyncq *l, pa_bool_t wait_op) {
+    struct localq *q;
 
-            _Y;
+    pa_assert(l);
 
-            if ((r = read(l->write_fds[0], x, sizeof(x))) < 0 && errno != EINTR) {
-                pa_atomic_dec(&l->write_waiting);
-                return -1;
-            }
+    while ((q = l->last_localq)) {
 
-            if (r > 0)
-                pa_atomic_sub(&l->n_read, r);
-        }
+        if (push(l, q->data, wait_op) < 0)
+            return FALSE;
 
-        _Y;
+        l->last_localq = q->prev;
 
-        pa_atomic_dec(&l->write_waiting);
+        PA_LLIST_REMOVE(struct localq, l->localq, q);
+
+        if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
+            pa_xfree(q);
     }
 
-    _Y;
-    l->write_idx++;
+    return TRUE;
+}
 
-    if (pa_atomic_load(&l->read_waiting)) {
-        char x = 'x';
-        _Y;
-        if (write(l->read_fds[1], &x, sizeof(x)) > 0)
-            pa_atomic_inc(&l->n_written);
-    }
+int pa_asyncq_push(pa_asyncq*l, void *p, pa_bool_t wait_op) {
+    pa_assert(l);
 
-    return 0;
+    if (!flush_postq(l, wait_op))
+        return -1;
+
+    return push(l, p, wait_op);
 }
 
-void* pa_asyncq_pop(pa_asyncq*l, int wait) {
-    int idx;
-    void *ret;
-    pa_atomic_ptr_t *cells;
+void pa_asyncq_post(pa_asyncq*l, void *p) {
+    struct localq *q;
 
     pa_assert(l);
+    pa_assert(p);
 
-    cells = PA_ASYNCQ_CELLS(l);
+    if (flush_postq(l, FALSE))
+        if (pa_asyncq_push(l, p, FALSE) >= 0)
+            return;
 
-    _Y;
-    idx = reduce(l, l->read_idx);
+    /* OK, we couldn't push anything in the queue. So let's queue it
+     * locally and push it later */
 
-    if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
+    if (pa_log_ratelimit())
+        pa_log_warn("q overrun, queuing locally");
 
-        /* First try failed. Let's wait for changes. */
-
-        if (!wait) {
-            /* Let's empty the FIFO from old notifications, before we return */
-            
-            while (pa_atomic_load(&l->n_written) > 0) {
-                ssize_t r;
-                int x[20];
-                
-                errno = 0;
-                if ((r = read(l->read_fds[0], x, sizeof(x))) <= 0 && errno != EINTR)
-                    return NULL;
-                
-                if (r > 0)
-                    if (pa_atomic_sub(&l->n_written, r) <= r)
-                        break;
-            }
-            
-            return NULL;
-        }
+    if (!(q = pa_flist_pop(PA_STATIC_FLIST_GET(localq))))
+        q = pa_xnew(struct localq, 1);
 
-        _Y;
+    q->data = p;
+    PA_LLIST_PREPEND(struct localq, l->localq, q);
 
-        pa_atomic_inc(&l->read_waiting);
+    if (!l->last_localq)
+        l->last_localq = q;
 
-        for (;;) {
-            char x[20];
-            ssize_t r;
+    return;
+}
 
-            _Y;
+void* pa_asyncq_pop(pa_asyncq*l, pa_bool_t wait_op) {
+    unsigned idx;
+    void *ret;
+    pa_atomic_ptr_t *cells;
+
+    pa_assert(l);
 
-            if ((ret = pa_atomic_ptr_load(&cells[idx])))
-                break;
+    cells = PA_ASYNCQ_CELLS(l);
 
-            _Y;
+    _Y;
+    idx = reduce(l, l->read_idx);
 
-            if ((r = read(l->read_fds[0], x, sizeof(x)) < 0) && errno != EINTR) {
-                pa_atomic_dec(&l->read_waiting);
-                return NULL;
-            }
+    if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
 
-            if (r > 0)
-                pa_atomic_sub(&l->n_written, r);
-        }
+        if (!wait_op)
+            return NULL;
 
-        _Y;
+/*         pa_log("sleeping on pop"); */
 
-        pa_atomic_dec(&l->read_waiting);
+        do {
+            pa_fdsem_wait(l->write_fdsem);
+        } while (!(ret = pa_atomic_ptr_load(&cells[idx])));
     }
 
-    /* Guaranteed if we only have a single reader */
+    pa_assert(ret);
+
+    /* Guaranteed to succeed if we only have a single reader */
     pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL));
 
     _Y;
     l->read_idx++;
 
-    if (pa_atomic_load(&l->write_waiting)) {
-        char x = 'x';
-        _Y;
-        if (write(l->write_fds[1], &x, sizeof(x)) >= 0)
-            pa_atomic_inc(&l->n_read);
-    }
+    pa_fdsem_post(l->read_fdsem);
 
     return ret;
 }
 
-int pa_asyncq_get_fd(pa_asyncq *q) {
+int pa_asyncq_read_fd(pa_asyncq *q) {
     pa_assert(q);
 
-    return q->read_fds[0];
+    return pa_fdsem_get(q->write_fdsem);
 }
 
-int pa_asyncq_before_poll(pa_asyncq *l) {
-    int idx;
+int pa_asyncq_read_before_poll(pa_asyncq *l) {
+    unsigned idx;
     pa_atomic_ptr_t *cells;
 
     pa_assert(l);
@@ -293,26 +275,47 @@ int pa_asyncq_before_poll(pa_asyncq *l) {
     _Y;
     idx = reduce(l, l->read_idx);
 
-    if (pa_atomic_ptr_load(&cells[idx]))
-        return -1;
-
-    pa_atomic_inc(&l->read_waiting);
+    for (;;) {
+        if (pa_atomic_ptr_load(&cells[idx]))
+            return -1;
 
-    if (pa_atomic_ptr_load(&cells[idx])) {
-        pa_atomic_dec(&l->read_waiting);
-        return -1;
+        if (pa_fdsem_before_poll(l->write_fdsem) >= 0)
+            return 0;
     }
+}
 
-    return 0;
+void pa_asyncq_read_after_poll(pa_asyncq *l) {
+    pa_assert(l);
+
+    pa_fdsem_after_poll(l->write_fdsem);
 }
 
-void pa_asyncq_after_poll(pa_asyncq *l) {
+int pa_asyncq_write_fd(pa_asyncq *q) {
+    pa_assert(q);
+
+    return pa_fdsem_get(q->read_fdsem);
+}
+
+void pa_asyncq_write_before_poll(pa_asyncq *l) {
     pa_assert(l);
 
-    pa_assert(pa_atomic_load(&l->read_waiting) > 0);
+    for (;;) {
 
-    pa_atomic_dec(&l->read_waiting);
+        if (flush_postq(l, FALSE))
+            break;
+
+        if (pa_fdsem_before_poll(l->read_fdsem) >= 0) {
+            l->waiting_for_post = TRUE;
+            break;
+        }
+    }
+}
 
+void pa_asyncq_write_after_poll(pa_asyncq *l) {
+    pa_assert(l);
 
-    
+    if (l->waiting_for_post) {
+        pa_fdsem_after_poll(l->read_fdsem);
+        l->waiting_for_post = FALSE;
+    }
 }