]> code.delx.au - pulseaudio/blobdiff - polyp/ioline.c
* some iochannel fixes
[pulseaudio] / polyp / ioline.c
index f3d17b5864ba50934f252bfe1843df7e0a29a5a6..059591b85095aa12477a067fd41e32b9a410f7f4 100644 (file)
 
 #include "ioline.h"
 #include "xmalloc.h"
+#include "log.h"
 
 #define BUFFER_LIMIT (64*1024)
 #define READ_SIZE (1024)
 
 struct pa_ioline {
     struct pa_iochannel *io;
+    struct pa_defer_event *defer_event;
+    struct pa_mainloop_api *mainloop;
+    int ref;
     int dead;
 
     char *wbuf;
@@ -50,7 +54,7 @@ struct pa_ioline {
 };
 
 static void io_callback(struct pa_iochannel*io, void *userdata);
-static int do_write(struct pa_ioline *l);
+static void defer_callback(struct pa_mainloop_api*m, struct pa_defer_event*e, void *userdata);
 
 struct pa_ioline* pa_ioline_new(struct pa_iochannel *io) {
     struct pa_ioline *l;
@@ -68,68 +72,112 @@ struct pa_ioline* pa_ioline_new(struct pa_iochannel *io) {
 
     l->callback = NULL;
     l->userdata = NULL;
+    l->ref = 1;
 
+    l->mainloop = pa_iochannel_get_mainloop_api(io);
+
+    l->defer_event = l->mainloop->defer_new(l->mainloop, defer_callback, l);
+    l->mainloop->defer_enable(l->defer_event, 0);
+    
     pa_iochannel_set_callback(io, io_callback, l);
     
     return l;
 }
 
-void pa_ioline_free(struct pa_ioline *l) {
+static void ioline_free(struct pa_ioline *l) {
     assert(l);
-    pa_iochannel_free(l->io);
+
+    if (l->io)
+        pa_iochannel_free(l->io);
     pa_xfree(l->wbuf);
     pa_xfree(l->rbuf);
     pa_xfree(l);
 }
 
+void pa_ioline_unref(struct pa_ioline *l) {
+    assert(l && l->ref >= 1);
+
+    if ((--l->ref) <= 0)
+        ioline_free(l);
+}
+
+struct pa_ioline* pa_ioline_ref(struct pa_ioline *l) {
+    assert(l && l->ref >= 1);
+
+    l->ref++;
+    return l;
+}
+
+void pa_ioline_close(struct pa_ioline *l) {
+    assert(l && l->ref >= 1);
+
+    l->dead = 1;
+    if (l->io) {
+        pa_iochannel_free(l->io);
+        l->io = NULL;
+    }
+}
+
 void pa_ioline_puts(struct pa_ioline *l, const char *c) {
     size_t len;
-    assert(l && c);
+    assert(l && c && l->ref >= 1 && !l->dead);
+
+    pa_ioline_ref(l);
     
     len = strlen(c);
     if (len > BUFFER_LIMIT - l->wbuf_valid_length)
         len = BUFFER_LIMIT - l->wbuf_valid_length;
 
-    if (!len)
-        return;
-
-    assert(l->wbuf_length >= l->wbuf_valid_length);
-
-    /* In case the allocated buffer is too small, enlarge it. */
-    if (l->wbuf_valid_length + len > l->wbuf_length) {
-        size_t n = l->wbuf_valid_length+len;
-        char *new = pa_xmalloc(n);
-        if (l->wbuf) {
-            memcpy(new, l->wbuf+l->wbuf_index, l->wbuf_valid_length);
-            pa_xfree(l->wbuf);
+    if (len) {
+        assert(l->wbuf_length >= l->wbuf_valid_length);
+        
+        /* In case the allocated buffer is too small, enlarge it. */
+        if (l->wbuf_valid_length + len > l->wbuf_length) {
+            size_t n = l->wbuf_valid_length+len;
+            char *new = pa_xmalloc(n);
+            if (l->wbuf) {
+                memcpy(new, l->wbuf+l->wbuf_index, l->wbuf_valid_length);
+                pa_xfree(l->wbuf);
+            }
+            l->wbuf = new;
+            l->wbuf_length = n;
+            l->wbuf_index = 0;
+        } else if (l->wbuf_index + l->wbuf_valid_length + len > l->wbuf_length) {
+            
+            /* In case the allocated buffer fits, but the current index is too far from the start, move it to the front. */
+            memmove(l->wbuf, l->wbuf+l->wbuf_index, l->wbuf_valid_length);
+            l->wbuf_index = 0;
         }
-        l->wbuf = new;
-        l->wbuf_length = n;
-        l->wbuf_index = 0;
-    } else if (l->wbuf_index + l->wbuf_valid_length + len > l->wbuf_length) {
-
-        /* In case the allocated buffer fits, but the current index is too far from the start, move it to the front. */
-        memmove(l->wbuf, l->wbuf+l->wbuf_index, l->wbuf_valid_length);
-        l->wbuf_index = 0;
-    }
-
-    assert(l->wbuf_index + l->wbuf_valid_length + len <= l->wbuf_length);
+        
+        assert(l->wbuf_index + l->wbuf_valid_length + len <= l->wbuf_length);
+        
+        /* Append the new string */
+        memcpy(l->wbuf + l->wbuf_index + l->wbuf_valid_length, c, len);
+        l->wbuf_valid_length += len;
 
-    /* Append the new string */
-    memcpy(l->wbuf + l->wbuf_index + l->wbuf_valid_length, c, len);
-    l->wbuf_valid_length += len;
+        l->mainloop->defer_enable(l->defer_event, 1);
+    }
 
-    do_write(l);
+    pa_ioline_unref(l);
 }
 
 void pa_ioline_set_callback(struct pa_ioline*l, void (*callback)(struct pa_ioline*io, const char *s, void *userdata), void *userdata) {
-    assert(l);
+    assert(l && l->ref >= 1);
     l->callback = callback;
     l->userdata = userdata;
 }
 
+static void failure(struct pa_ioline *l) {
+    assert(l && l->ref >= 1 && !l->dead);
+
+    pa_ioline_close(l);
+
+    if (l->callback)
+        l->callback(l, NULL, l->userdata);
+}
+
 static void scan_for_lines(struct pa_ioline *l, size_t skip) {
-    assert(l && skip < l->rbuf_valid_length);
+    assert(l && l->ref >= 1 && skip < l->rbuf_valid_length);
 
     while (!l->dead && l->rbuf_valid_length > skip) {
         char *e, *p;
@@ -162,86 +210,109 @@ static void scan_for_lines(struct pa_ioline *l, size_t skip) {
 }
 
 static int do_read(struct pa_ioline *l) {
-    ssize_t r;
-    size_t len;
-    assert(l);
-
-    if (!pa_iochannel_is_readable(l->io))
-        return 0;
+    assert(l && l->ref >= 1);
 
-    len = l->rbuf_length - l->rbuf_index - l->rbuf_valid_length;
+    while (!l->dead && pa_iochannel_is_readable(l->io)) {
+        ssize_t r;
+        size_t len;
 
-    /* Check if we have to enlarge the read buffer */
-    if (len < READ_SIZE) {
-        size_t n = l->rbuf_valid_length+READ_SIZE;
-
-        if (n >= BUFFER_LIMIT)
-            n = BUFFER_LIMIT;
+        len = l->rbuf_length - l->rbuf_index - l->rbuf_valid_length;
         
-        if (l->rbuf_length >= n) {
-            /* The current buffer is large enough, let's just move the data to the front */
-            if (l->rbuf_valid_length)
-                memmove(l->rbuf, l->rbuf+l->rbuf_index, l->rbuf_valid_length);
-        } else {
-            /* Enlarge the buffer */
-            char *new = pa_xmalloc(n);
-            if (l->rbuf_valid_length)
-                memcpy(new, l->rbuf+l->rbuf_index, l->rbuf_valid_length);
-            pa_xfree(l->rbuf);
-            l->rbuf = new;
-            l->rbuf_length = n;
+        /* Check if we have to enlarge the read buffer */
+        if (len < READ_SIZE) {
+            size_t n = l->rbuf_valid_length+READ_SIZE;
+            
+            if (n >= BUFFER_LIMIT)
+                n = BUFFER_LIMIT;
+            
+            if (l->rbuf_length >= n) {
+                /* The current buffer is large enough, let's just move the data to the front */
+                if (l->rbuf_valid_length)
+                    memmove(l->rbuf, l->rbuf+l->rbuf_index, l->rbuf_valid_length);
+            } else {
+                /* Enlarge the buffer */
+                char *new = pa_xmalloc(n);
+                if (l->rbuf_valid_length)
+                    memcpy(new, l->rbuf+l->rbuf_index, l->rbuf_valid_length);
+                pa_xfree(l->rbuf);
+                l->rbuf = new;
+                l->rbuf_length = n;
+            }
+            
+            l->rbuf_index = 0;
+        }
+        
+        len = l->rbuf_length - l->rbuf_index - l->rbuf_valid_length;
+        
+        assert(len >= READ_SIZE);
+        
+        /* Read some data */
+        if ((r = pa_iochannel_read(l->io, l->rbuf+l->rbuf_index+l->rbuf_valid_length, len)) <= 0) {
+            pa_log(__FILE__": read() failed: %s\n", r < 0 ? strerror(errno) : "EOF");
+            failure(l);
+            return -1;
         }
         
-        l->rbuf_index = 0;
+        l->rbuf_valid_length += r;
+        
+        /* Look if a line has been terminated in the newly read data */
+        scan_for_lines(l, l->rbuf_valid_length - r);
     }
-
-    len = l->rbuf_length - l->rbuf_index - l->rbuf_valid_length;
-
-    assert(len >= READ_SIZE);
-
-    /* Read some data */
-    if ((r = pa_iochannel_read(l->io, l->rbuf+l->rbuf_index+l->rbuf_valid_length, len)) <= 0)
-        return -1;
-
-    l->rbuf_valid_length += r;
-
-    /* Look if a line has been terminated in the newly read data */
-    scan_for_lines(l, l->rbuf_valid_length - r);
-
+        
     return 0;
 }
 
 /* Try to flush the buffer */
 static int do_write(struct pa_ioline *l) {
     ssize_t r;
-    assert(l);
+    assert(l && l->ref >= 1);
 
-    if (!l->wbuf_valid_length || !pa_iochannel_is_writable(l->io))
-        return 0;
-    
-    if ((r = pa_iochannel_write(l->io, l->wbuf+l->wbuf_index, l->wbuf_valid_length)) < 0)
-        return -1;
+    while (!l->dead && pa_iochannel_is_writable(l->io) && l->wbuf_valid_length) {
+        
+        if ((r = pa_iochannel_write(l->io, l->wbuf+l->wbuf_index, l->wbuf_valid_length)) < 0) {
+            pa_log(__FILE__": write() failed: %s\n", r < 0 ? strerror(errno) : "EOF");
+            failure(l);
+            return -1;
+        }
+        
+        l->wbuf_index += r;
+        l->wbuf_valid_length -= r;
+        
+        /* A shortcut for the next time */
+        if (l->wbuf_valid_length == 0)
+            l->wbuf_index = 0;
+    }
+        
+    return 0;
+}
 
-    l->wbuf_index += r;
-    l->wbuf_valid_length -= r;
+/* Try to flush read/write data */
+static void do_work(struct pa_ioline *l) {
+    assert(l && l->ref >= 1);
 
-    /* A shortcut for the next time */
-    if (l->wbuf_valid_length == 0)
-        l->wbuf_index = 0;
+    pa_ioline_ref(l);
 
-    return 0;
+    l->mainloop->defer_enable(l->defer_event, 0);
+    
+    if (!l->dead)
+        do_write(l);
+
+    if (!l->dead)
+        do_read(l);
+
+    pa_ioline_unref(l);
 }
 
-/* Try to flush read/write data */
 static void io_callback(struct pa_iochannel*io, void *userdata) {
     struct pa_ioline *l = userdata;
-    assert(io && l);
-    
-    if ((!l->dead && do_write(l) < 0) ||
-        (!l->dead && do_read(l) < 0)) {
-        
-        l->dead = 1;
-        if (l->callback)
-            l->callback(l, NULL, l->userdata);
-    }
+    assert(io && l && l->ref >= 1);
+
+    do_work(l);
+}
+
+static void defer_callback(struct pa_mainloop_api*m, struct pa_defer_event*e, void *userdata) {
+    struct pa_ioline *l = userdata;
+    assert(l && l->ref >= 1 && l->mainloop == m && l->defer_event == e);
+
+    do_work(l);
 }