]> code.delx.au - pulseaudio/blobdiff - src/pulsecore/asyncmsgq.c
Merge branch 'master' of ssh://rootserver/home/lennart/git/public/pulseaudio
[pulseaudio] / src / pulsecore / asyncmsgq.c
index b36544605fe3e76fb614b3ef36015523d2d27386..e191b05fb03ff6d63c067ee09a82f0d3d7c9c025 100644 (file)
@@ -1,5 +1,3 @@
-/* $Id$ */
-
 /***
   This file is part of PulseAudio.
 
@@ -40,6 +38,7 @@
 #include "asyncmsgq.h"
 
 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
+PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
 
 struct asyncmsgq_item {
     int code;
@@ -67,7 +66,7 @@ pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
 
     PA_REFCNT_INIT(a);
     pa_assert_se(a->asyncq = pa_asyncq_new(size));
-    pa_assert_se(a->mutex = pa_mutex_new(0));
+    pa_assert_se(a->mutex = pa_mutex_new(FALSE, TRUE));
     a->current = NULL;
 
     return a;
@@ -133,9 +132,9 @@ void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const vo
         pa_memchunk_reset(&i->memchunk);
     i->semaphore = NULL;
 
-    /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
+    /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
     pa_mutex_lock(a->mutex);
-    pa_assert_se(pa_asyncq_push(a->asyncq, i, 1) == 0);
+    pa_asyncq_post(a->asyncq, i);
     pa_mutex_unlock(a->mutex);
 }
 
@@ -154,20 +153,26 @@ int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const voi
         i.memchunk = *chunk;
     } else
         pa_memchunk_reset(&i.memchunk);
-    pa_assert_se(i.semaphore = pa_semaphore_new(0));
 
-    /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
+    if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
+        i.semaphore = pa_semaphore_new(0);
+
+    pa_assert_se(i.semaphore);
+
+    /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
     pa_mutex_lock(a->mutex);
-    pa_assert_se(pa_asyncq_push(a->asyncq, &i, 1) == 0);
+    pa_assert_se(pa_asyncq_push(a->asyncq, &i, TRUE) == 0);
     pa_mutex_unlock(a->mutex);
 
     pa_semaphore_wait(i.semaphore);
-    pa_semaphore_free(i.semaphore);
+
+    if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
+        pa_semaphore_free(i.semaphore);
 
     return i.ret;
 }
 
-int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, int wait) {
+int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, pa_bool_t wait) {
     pa_assert(PA_REFCNT_VALUE(a) > 0);
     pa_assert(!a->current);
 
@@ -191,8 +196,14 @@ int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **u
     if (chunk)
         *chunk = a->current->memchunk;
 
-/*     pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", (void*) a, (void*) a->current->object, a->current->object ? a->current->object->parent.type_name : NULL, a->current->code, (void*) a->current->userdata, (unsigned long) a->current->memchunk.length); */
-    
+/*     pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
+/*                  (void*) a, */
+/*                  (void*) a->current->object, */
+/*                  a->current->object ? a->current->object->parent.type_name : NULL, */
+/*                  a->current->code, */
+/*                  (void*) a->current->userdata, */
+/*                  (unsigned long) a->current->memchunk.length); */
+
     return 0;
 }
 
@@ -244,7 +255,7 @@ int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
     } while (c != code);
 
     pa_asyncmsgq_unref(a);
-    
+
     return 0;
 }
 
@@ -257,34 +268,52 @@ int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
     int ret;
 
     pa_assert(PA_REFCNT_VALUE(a) > 0);
-    
+
     if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, 0) < 0)
         return 0;
-    
+
     pa_asyncmsgq_ref(a);
     ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
     pa_asyncmsgq_done(a, ret);
     pa_asyncmsgq_unref(a);
-    
+
     return 1;
 }
 
-int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) {
+int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
+    pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+    return pa_asyncq_read_fd(a->asyncq);
+}
+
+int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
+    pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+    return pa_asyncq_read_before_poll(a->asyncq);
+}
+
+void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
+    pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+    pa_asyncq_read_after_poll(a->asyncq);
+}
+
+int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
     pa_assert(PA_REFCNT_VALUE(a) > 0);
 
-    return pa_asyncq_get_fd(a->asyncq);
+    return pa_asyncq_write_fd(a->asyncq);
 }
 
-int pa_asyncmsgq_before_poll(pa_asyncmsgq *a) {
+void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
     pa_assert(PA_REFCNT_VALUE(a) > 0);
 
-    return pa_asyncq_before_poll(a->asyncq);
+    pa_asyncq_write_before_poll(a->asyncq);
 }
 
-void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
+void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
     pa_assert(PA_REFCNT_VALUE(a) > 0);
 
-    pa_asyncq_after_poll(a->asyncq);
+    pa_asyncq_write_after_poll(a->asyncq);
 }
 
 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {