]> code.delx.au - pulseaudio/commitdiff
rework bluetooth IO loops
authorLennart Poettering <lennart@poettering.net>
Fri, 20 Mar 2009 17:04:23 +0000 (18:04 +0100)
committerLennart Poettering <lennart@poettering.net>
Fri, 20 Mar 2009 17:04:23 +0000 (18:04 +0100)
src/modules/bluetooth/module-bluetooth-device.c

index 3da69fc72040cb920894f23f72cdf4b5f06e83dc..fc8dc9caef140794237c8b3875f114da04c5977f 100644 (file)
@@ -96,7 +96,7 @@ struct a2dp_info {
     sbc_capabilities_t sbc_capabilities;
     sbc_t sbc;                           /* Codec data */
     pa_bool_t sbc_initialized;           /* Keep track if the encoder is initialized */
-    size_t codesize;                     /* SBC codesize */
+    size_t codesize, frame_length;       /* SBC Codesize, frame_length. We simply cache those values here */
 
     void* buffer;                        /* Codec transfer buffer */
     size_t buffer_size;                  /* Size of the buffer */
@@ -583,7 +583,8 @@ static void setup_sbc(struct a2dp_info *a2dp) {
     }
 
     a2dp->sbc.bitpool = active_capabilities->max_bitpool;
-    a2dp->codesize = (uint16_t) sbc_get_codesize(&a2dp->sbc);
+    a2dp->codesize = sbc_get_codesize(&a2dp->sbc);
+    a2dp->frame_length = sbc_get_frame_length(&a2dp->sbc);
 }
 
 static int set_conf(struct userdata *u) {
@@ -645,7 +646,12 @@ static int set_conf(struct userdata *u) {
     /* setup SBC encoder now we agree on parameters */
     if (u->profile == PROFILE_A2DP) {
         setup_sbc(&u->a2dp);
-        u->block_size = u->a2dp.codesize;
+
+        u->block_size =
+            ((u->link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload))
+            / u->a2dp.frame_length
+            * u->a2dp.codesize);
+
         pa_log_info("SBC parameters:\n\tallocation=%u\n\tsubbands=%u\n\tblocks=%u\n\tbitpool=%u\n",
                     u->a2dp.sbc.allocation, u->a2dp.sbc.subbands, u->a2dp.sbc.blocks, u->a2dp.sbc.bitpool);
     } else
@@ -853,48 +859,62 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
 
 static int hsp_process_render(struct userdata *u) {
     int ret = 0;
-    pa_memchunk memchunk;
 
     pa_assert(u);
     pa_assert(u->profile == PROFILE_HSP);
     pa_assert(u->sink);
 
-    pa_sink_render_full(u->sink, u->block_size, &memchunk);
+    /* First, render some data */
+    if (!u->write_memchunk.memblock)
+        pa_sink_render_full(u->sink, u->block_size, &u->write_memchunk);
+
+    pa_assert(u->write_memchunk.length == u->block_size);
 
     for (;;) {
         ssize_t l;
         const void *p;
 
-        p = (const uint8_t*) pa_memblock_acquire(memchunk.memblock) + memchunk.index;
-        l = pa_write(u->stream_fd, p, memchunk.length, &u->stream_write_type);
-        pa_memblock_release(memchunk.memblock);
+        /* Now write that data to the socket. The socket is of type
+         * SEQPACKET, and we generated the data of the MTU size, so this
+         * should just work. */
 
-        pa_log_debug("Memblock written to socket: %lli bytes", (long long) l);
+        p = (const uint8_t*) pa_memblock_acquire(u->write_memchunk.memblock) + u->write_memchunk.index;
+        l = pa_write(u->stream_fd, p, u->write_memchunk.length, &u->stream_write_type);
+        pa_memblock_release(u->write_memchunk.memblock);
 
         pa_assert(l != 0);
 
         if (l < 0) {
-            if (errno == EINTR || errno == EAGAIN) /*** FIXME: EAGAIN handling borked ***/
+
+            if (errno == EINTR)
+                /* Retry right away if we got interrupted */
                 continue;
-            else {
-                pa_log_error("Failed to write data to SCO socket: %s", pa_cstrerror(errno));
-                ret = -1;
+
+            else if (errno == EAGAIN)
+                /* Hmm, apparently the socket was not writable, give up for now */
                 break;
-            }
-        } else {
-            pa_assert((size_t) l <= memchunk.length);
 
-            memchunk.index += (size_t) l;
-            memchunk.length -= (size_t) l;
+            pa_log_error("Failed to write data to SCO socket: %s", pa_cstrerror(errno));
+            ret = -1;
+            break;
+        }
 
-            u->write_index += (uint64_t) l;
+        pa_assert((size_t) l <= u->write_memchunk.length);
 
-            if (memchunk.length <= 0)
-                break;
+        if ((size_t) l != u->write_memchunk.length) {
+            pa_log_error("Wrote memory block to socket only partially! %llu written, wanted to write %llu.",
+                        (unsigned long long) l,
+                        (unsigned long long) u->write_memchunk.length);
+            ret = -1;
+            break;
         }
-    }
 
-    pa_memblock_unref(memchunk.memblock);
+        u->write_index += (uint64_t) u->write_memchunk.length;
+        pa_memblock_unref(u->write_memchunk.memblock);
+        pa_memchunk_reset(&u->write_memchunk);
+
+        break;
+    }
 
     return ret;
 }
@@ -919,20 +939,27 @@ static int hsp_process_push(struct userdata *u) {
         pa_memblock_release(memchunk.memblock);
 
         if (l <= 0) {
-            if (l < 0 && (errno == EINTR || errno == EAGAIN)) /*** FIXME: EAGAIN handling borked ***/
+
+            if (l < 0 && errno == EINTR)
+                /* Retry right away if we got interrupted */
                 continue;
-            else {
-                pa_log_error("Failed to read data from SCO socket: %s", l < 0 ? pa_cstrerror(errno) : "EOF");
-                ret = -1;
+
+            else if (l < 0 && errno == EAGAIN)
+                /* Hmm, apparently the socket was not readable, give up for now. */
                 break;
-            }
-        } else {
-            memchunk.length = (size_t) l;
-            u->read_index += (uint64_t) l;
 
-            pa_source_post(u->source, &memchunk);
+            pa_log_error("Failed to read data from SCO socket: %s", l < 0 ? pa_cstrerror(errno) : "EOF");
+            ret = -1;
             break;
         }
+
+        pa_assert((size_t) l <= memchunk.length);
+
+        memchunk.length = (size_t) l;
+        u->read_index += (uint64_t) l;
+
+        pa_source_post(u->source, &memchunk);
+        break;
     }
 
     pa_memblock_unref(memchunk.memblock);
@@ -940,127 +967,146 @@ static int hsp_process_push(struct userdata *u) {
     return ret;
 }
 
+static void a2dp_prepare_buffer(struct userdata *u) {
+    pa_assert(u);
+
+    if (u->a2dp.buffer_size >= u->link_mtu)
+        return;
+
+    u->a2dp.buffer_size = 2 * u->link_mtu;
+    pa_xfree(u->a2dp.buffer);
+    u->a2dp.buffer = pa_xmalloc(u->a2dp.buffer_size);
+}
+
 static int a2dp_process_render(struct userdata *u) {
-    size_t frame_size;
     struct a2dp_info *a2dp;
     struct rtp_header *header;
     struct rtp_payload *payload;
-    size_t left;
+    size_t nbytes;
     void *d;
     const void *p;
+    size_t to_write, to_encode;
     unsigned frame_count;
-    size_t written;
-    uint64_t writing_at;
+    int ret = 0;
 
     pa_assert(u);
     pa_assert(u->profile == PROFILE_A2DP);
     pa_assert(u->sink);
 
-    a2dp = &u->a2dp;
+    /* First, render some data */
+    if (!u->write_memchunk.memblock)
+        pa_sink_render_full(u->sink, u->block_size, &u->write_memchunk);
 
-    if (a2dp->buffer_size < u->link_mtu) {
-        a2dp->buffer_size = 2*u->link_mtu;
-        pa_xfree(a2dp->buffer);
-        a2dp->buffer = pa_xmalloc(a2dp->buffer_size);
-    }
+    pa_assert(u->write_memchunk.length == u->block_size);
+
+    a2dp_prepare_buffer(u);
 
-    header = (struct rtp_header*) a2dp->buffer;
+    a2dp = &u->a2dp;
+    header = a2dp->buffer;
     payload = (struct rtp_payload*) ((uint8_t*) a2dp->buffer + sizeof(*header));
-    d = (uint8_t*) a2dp->buffer + sizeof(*header) + sizeof(*payload);
-    left = a2dp->buffer_size - sizeof(*header) - sizeof(*payload);
 
-    frame_size = sbc_get_frame_length(&a2dp->sbc);
     frame_count = 0;
 
-    writing_at = u->write_index;
+    /* Try to create a packet of the full MTU */
 
-    do {
-        ssize_t encoded;
+    p = (const uint8_t*) pa_memblock_acquire(u->write_memchunk.memblock) + u->write_memchunk.index;
+    to_encode = u->write_memchunk.length;
 
-        if (!u->write_memchunk.memblock)
-            pa_sink_render_full(u->sink, u->block_size, &u->write_memchunk);
+    d = (uint8_t*) a2dp->buffer + sizeof(*header) + sizeof(*payload);
+    to_write = a2dp->buffer_size - sizeof(*header) - sizeof(*payload);
+
+    while (PA_LIKELY(to_encode > 0 && to_write > 0)) {
+        size_t written;
+        ssize_t encoded;
 
-        p = (const uint8_t*) pa_memblock_acquire(u->write_memchunk.memblock) + u->write_memchunk.index;
         encoded = sbc_encode(&a2dp->sbc,
-                             p, u->write_memchunk.length,
-                             d, left,
+                             p, to_encode,
+                             d, to_write,
                              &written);
 
-        PA_ONCE_BEGIN {
-            pa_log_debug("Using SBC encoder implementation: %s", pa_strnull(sbc_get_implementation_info(&a2dp->sbc)));
-        } PA_ONCE_END;
-
-        pa_memblock_release(u->write_memchunk.memblock);
-
-        if (encoded <= 0) {
-            pa_log_error("SBC encoding error (%d)", encoded);
+        if (PA_UNLIKELY(encoded <= 0)) {
+            pa_log_error("SBC encoding error (%li)", (long) encoded);
+            pa_memblock_release(u->write_memchunk.memblock);
             return -1;
         }
 
-        pa_assert((size_t) encoded <= u->write_memchunk.length);
-        pa_assert((size_t) encoded == sbc_get_codesize(&a2dp->sbc));
+/*         pa_log_debug("SBC: encoded: %lu; written: %lu", (unsigned long) encoded, (unsigned long) written); */
+/*         pa_log_debug("SBC: codesize: %lu; frame_length: %lu", (unsigned long) a2dp->codesize, (unsigned long) a2dp->frame_length); */
 
-        pa_assert((size_t) written <= left);
-        pa_assert((size_t) written == sbc_get_frame_length(&a2dp->sbc));
+        pa_assert_fp((size_t) encoded <= to_encode);
+        pa_assert_fp((size_t) encoded == a2dp->codesize);
 
-/*         pa_log_debug("SBC: encoded: %d; written: %d", encoded, written); */
+        pa_assert_fp((size_t) written <= to_write);
+        pa_assert_fp((size_t) written == a2dp->frame_length);
 
-        u->write_memchunk.index += encoded;
-        u->write_memchunk.length -= encoded;
-
-        if (u->write_memchunk.length <= 0) {
-            pa_memblock_unref(u->write_memchunk.memblock);
-            pa_memchunk_reset(&u->write_memchunk);
-        }
-
-        u->write_index += encoded;
+        p = (const uint8_t*) p + encoded;
+        to_encode -= encoded;
 
         d = (uint8_t*) d + written;
-        left -= written;
+        to_write -= written;
 
         frame_count++;
+    }
+
+    pa_memblock_release(u->write_memchunk.memblock);
+
+    pa_assert(to_encode == 0);
 
-    } while (((uint8_t*) d - ((uint8_t*) a2dp->buffer + sbc_get_frame_length(&a2dp->sbc))) < (ptrdiff_t) u->link_mtu);
+    PA_ONCE_BEGIN {
+        pa_log_debug("Using SBC encoder implementation: %s", pa_strnull(sbc_get_implementation_info(&a2dp->sbc)));
+    } PA_ONCE_END;
 
     /* write it to the fifo */
     memset(a2dp->buffer, 0, sizeof(*header) + sizeof(*payload));
-    payload->frame_count = frame_count;
     header->v = 2;
     header->pt = 1;
     header->sequence_number = htons(a2dp->seq_num++);
-    header->timestamp = htonl(writing_at / frame_size);
+    header->timestamp = htonl(u->write_index / pa_frame_size(&u->sink->sample_spec));
     header->ssrc = htonl(1);
+    payload->frame_count = frame_count;
 
-    p = a2dp->buffer;
-    left = (uint8_t*) d - (uint8_t*) a2dp->buffer;
+    nbytes = (uint8_t*) d - (uint8_t*) a2dp->buffer;
 
     for (;;) {
         ssize_t l;
 
-        l = pa_write(u->stream_fd, p, left, &u->stream_write_type);
-/*         pa_log_debug("write: requested %lu bytes; written %li bytes; mtu=%li", (unsigned long) left, (long) l, (unsigned long) u->link_mtu); */
+        l = pa_write(u->stream_fd, a2dp->buffer, nbytes, &u->stream_write_type);
 
         pa_assert(l != 0);
 
         if (l < 0) {
-            if (errno == EINTR || errno == EAGAIN) /*** FIXME: EAGAIN handling borked ***/
-                continue;
-            else {
-                pa_log_error("Failed to write data to socket: %s", pa_cstrerror(errno));
-                return -1;
-            }
-        } else {
-            pa_assert((size_t) l <= left);
 
-            d = (uint8_t*) d + l;
-            left -= l;
+            if (errno == EINTR)
+                /* Retry right away if we got interrupted */
+                continue;
 
-            if (left <= 0)
+            else if (errno == EAGAIN)
+                /* Hmm, apparently the socket was not writable, give up for now */
                 break;
+
+            pa_log_error("Failed to write data to socket: %s", pa_cstrerror(errno));
+            ret  = -1;
+            break;
+        }
+
+        pa_assert((size_t) l <= nbytes);
+
+        if ((size_t) l != nbytes) {
+            pa_log_warn("Wrote memory block to socket only partially! %llu written, wanted to write %llu.",
+                        (unsigned long long) l,
+                        (unsigned long long) nbytes);
+            ret = -1;
+            break;
         }
+
+        u->write_index += (uint64_t) u->write_memchunk.length;
+        pa_memblock_unref(u->write_memchunk.memblock);
+        pa_memchunk_reset(&u->write_memchunk);
+
+        break;
     }
 
-    return 0;
+    return ret;
 }
 
 static void thread_func(void *userdata) {
@@ -1509,12 +1555,20 @@ static void shutdown_bt(struct userdata *u) {
     if (u->stream_fd >= 0) {
         pa_close(u->stream_fd);
         u->stream_fd = -1;
+
+        u->stream_write_type = 0;
+        u->stream_read_type = 0;
     }
 
     if (u->service_fd >= 0) {
         pa_close(u->service_fd);
         u->service_fd = -1;
     }
+
+    if (u->write_memchunk.memblock) {
+        pa_memblock_unref(u->write_memchunk.memblock);
+        pa_memchunk_reset(&u->write_memchunk);
+    }
 }
 
 static int init_bt(struct userdata *u) {
@@ -1686,11 +1740,6 @@ static int card_set_profile(pa_card *c, pa_card_profile *new_profile) {
     stop_thread(u);
     shutdown_bt(u);
 
-    if (u->write_memchunk.memblock) {
-        pa_memblock_unref(u->write_memchunk.memblock);
-        pa_memchunk_reset(&u->write_memchunk);
-    }
-
     u->profile = *d;
     u->sample_spec = u->requested_sample_spec;
 
@@ -2034,9 +2083,6 @@ void pa__done(pa_module *m) {
     if (u->device)
         pa_bluetooth_device_free(u->device);
 
-    if (u->write_memchunk.memblock)
-        pa_memblock_unref(u->write_memchunk.memblock);
-
     if (u->a2dp.buffer)
         pa_xfree(u->a2dp.buffer);