]> code.delx.au - pulseaudio/blobdiff - src/pulse/stream.c
Merge branch 'master' of git://0pointer.de/pulseaudio into dbus-work
[pulseaudio] / src / pulse / stream.c
index 339a89e5f893faa7ca9f2f9e48c03a931eecc3e6..2bc2b1e4ae0fd1dc189ca02ce73ae15147d2550e 100644 (file)
 
 #include <pulse/def.h>
 #include <pulse/timeval.h>
+#include <pulse/rtclock.h>
 #include <pulse/xmalloc.h>
 
 #include <pulsecore/pstream-util.h>
 #include <pulsecore/log.h>
 #include <pulsecore/hashmap.h>
 #include <pulsecore/macro.h>
-#include <pulsecore/rtclock.h>
+#include <pulsecore/core-rtclock.h>
 
 #include "fork-detect.h"
 #include "internal.h"
@@ -143,12 +144,13 @@ pa_stream *pa_stream_new_with_proplist(
     s->suspended = FALSE;
     s->corked = FALSE;
 
+    s->write_memblock = NULL;
+    s->write_data = NULL;
+
     pa_memchunk_reset(&s->peek_memchunk);
     s->peek_data = NULL;
-
     s->record_memblockq = NULL;
 
-
     memset(&s->timing_info, 0, sizeof(s->timing_info));
     s->timing_info_valid = FALSE;
 
@@ -220,6 +222,11 @@ static void stream_free(pa_stream *s) {
 
     stream_unlink(s);
 
+    if (s->write_memblock) {
+        pa_memblock_release(s->write_memblock);
+        pa_memblock_unref(s->write_data);
+    }
+
     if (s->peek_memchunk.memblock) {
         if (s->peek_data)
             pa_memblock_release(s->peek_memchunk.memblock);
@@ -319,14 +326,10 @@ static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
     }
 
     if (s->auto_timing_update_event) {
-        struct timeval next;
-
         if (force)
             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
 
-        pa_gettimeofday(&next);
-        pa_timeval_add(&next, s->auto_timing_interval_usec);
-        s->mainloop->time_restart(s->auto_timing_update_event, &next);
+        pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
 
         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
     }
@@ -373,7 +376,7 @@ static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t
     if (!s->smoother)
         return;
 
-    x = pa_rtclock_usec();
+    x = pa_rtclock_now();
 
     if (s->timing_info_valid) {
         if (aposteriori)
@@ -800,7 +803,7 @@ static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
     request_auto_timing_update(s, TRUE);
 }
 
-static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
+static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
     pa_stream *s = userdata;
 
     pa_assert(s);
@@ -822,12 +825,9 @@ static void create_stream_complete(pa_stream *s) {
         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
 
     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
-        struct timeval tv;
-        pa_gettimeofday(&tv);
         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
-        pa_timeval_add(&tv, s->auto_timing_interval_usec);
         pa_assert(!s->auto_timing_update_event);
-        s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
+        s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
 
         request_auto_timing_update(s, TRUE);
     }
@@ -867,7 +867,7 @@ static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_s
 
 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
     pa_stream *s = userdata;
-    uint32_t requested_bytes;
+    uint32_t requested_bytes = 0;
 
     pa_assert(pd);
     pa_assert(s);
@@ -1057,7 +1057,7 @@ static int create_stream(
     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
         pa_usec_t x;
 
-        x = pa_rtclock_usec();
+        x = pa_rtclock_now();
 
         pa_assert(!s->smoother);
         s->smoother = pa_smoother_new(
@@ -1172,7 +1172,7 @@ int pa_stream_connect_playback(
         const char *dev,
         const pa_buffer_attr *attr,
         pa_stream_flags_t flags,
-        pa_cvolume *volume,
+        const pa_cvolume *volume,
         pa_stream *sync_stream) {
 
     pa_assert(s);
@@ -1193,20 +1193,71 @@ int pa_stream_connect_record(
     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
 }
 
+int pa_stream_begin_write(
+        pa_stream *s,
+        void **data,
+        size_t *nbytes) {
+
+    pa_assert(s);
+    pa_assert(PA_REFCNT_VALUE(s) >= 1);
+
+    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
+    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+    PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+    PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
+    PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
+
+    if (*nbytes != (size_t) -1) {
+        size_t m, fs;
+
+        m = pa_mempool_block_size_max(s->context->mempool);
+        fs = pa_frame_size(&s->sample_spec);
+
+        m = (m / fs) * fs;
+        if (*nbytes > m)
+            *nbytes = m;
+    }
+
+    if (!s->write_memblock) {
+        s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
+        s->write_data = pa_memblock_acquire(s->write_memblock);
+    }
+
+    *data = s->write_data;
+    *nbytes = pa_memblock_get_length(s->write_memblock);
+
+    return 0;
+}
+
+int pa_stream_cancel_write(
+        pa_stream *s) {
+
+    pa_assert(s);
+    pa_assert(PA_REFCNT_VALUE(s) >= 1);
+
+    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
+    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+    PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+    PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
+
+    pa_assert(s->write_data);
+
+    pa_memblock_release(s->write_memblock);
+    pa_memblock_unref(s->write_memblock);
+    s->write_memblock = NULL;
+    s->write_data = NULL;
+
+    return 0;
+}
+
 int pa_stream_write(
         pa_stream *s,
         const void *data,
         size_t length,
-        void (*free_cb)(void *p),
+        pa_free_cb_t free_cb,
         int64_t offset,
         pa_seek_mode_t seek) {
 
-    pa_memchunk chunk;
-    pa_seek_mode_t t_seek;
-    int64_t t_offset;
-    size_t t_length;
-    const void *t_data;
-
     pa_assert(s);
     pa_assert(PA_REFCNT_VALUE(s) >= 1);
     pa_assert(data);
@@ -1216,46 +1267,71 @@ int pa_stream_write(
     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
+    PA_CHECK_VALIDITY(s->context,
+                      !s->write_memblock ||
+                      ((data >= s->write_data) &&
+                       ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
+                      PA_ERR_INVALID);
+    PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
 
-    if (length <= 0)
-        return 0;
+    if (s->write_memblock) {
+        pa_memchunk chunk;
 
-    t_seek = seek;
-    t_offset = offset;
-    t_length = length;
-    t_data = data;
+        /* pa_stream_write_begin() was called before */
 
-    while (t_length > 0) {
+        pa_memblock_release(s->write_memblock);
 
-        chunk.index = 0;
+        chunk.memblock = s->write_memblock;
+        chunk.index = (const char *) data - (const char *) s->write_data;
+        chunk.length = length;
 
-        if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
-            chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
-            chunk.length = t_length;
-        } else {
-            void *d;
+        s->write_memblock = NULL;
+        s->write_data = NULL;
 
-            chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
-            chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
+        pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
+        pa_memblock_unref(chunk.memblock);
 
-            d = pa_memblock_acquire(chunk.memblock);
-            memcpy(d, t_data, chunk.length);
-            pa_memblock_release(chunk.memblock);
-        }
+    } else {
+        pa_seek_mode_t t_seek = seek;
+        int64_t t_offset = offset;
+        size_t t_length = length;
+        const void *t_data = data;
 
-        pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
+        /* pa_stream_write_begin() was not called before */
 
-        t_offset = 0;
-        t_seek = PA_SEEK_RELATIVE;
+        while (t_length > 0) {
+            pa_memchunk chunk;
 
-        t_data = (const uint8_t*) t_data + chunk.length;
-        t_length -= chunk.length;
+            chunk.index = 0;
 
-        pa_memblock_unref(chunk.memblock);
-    }
+            if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
+                chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
+                chunk.length = t_length;
+            } else {
+                void *d;
+
+                chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
+                chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
+
+                d = pa_memblock_acquire(chunk.memblock);
+                memcpy(d, t_data, chunk.length);
+                pa_memblock_release(chunk.memblock);
+            }
 
-    if (free_cb && pa_pstream_get_shm(s->context->pstream))
-        free_cb((void*) data);
+            pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
+
+            t_offset = 0;
+            t_seek = PA_SEEK_RELATIVE;
+
+            t_data = (const uint8_t*) t_data + chunk.length;
+            t_length -= chunk.length;
+
+            pa_memblock_unref(chunk.memblock);
+        }
+
+        if (free_cb && pa_pstream_get_shm(s->context->pstream))
+            free_cb((void*) data);
+    }
 
     /* This is obviously wrong since we ignore the seeking index . But
      * that's OK, the server side applies the same error */
@@ -1594,7 +1670,7 @@ static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command,
         if (o->stream->smoother) {
             pa_usec_t u, x;
 
-            u = x = pa_rtclock_usec() - i->transport_usec;
+            u = x = pa_rtclock_now() - i->transport_usec;
 
             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
                 pa_usec_t su;
@@ -2103,7 +2179,7 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
 
     if (s->smoother)
-        usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
+        usec = pa_smoother_get(s->smoother, pa_rtclock_now());
     else
         usec = calc_time(s, FALSE);