X-Git-Url: https://code.delx.au/pulseaudio/blobdiff_plain/1250b5d735129c3e04c45484f80f99cdb12f39a1..eca082a93f2619cfa10733947a81fa779cb49573:/src/pulsecore/pstream.c diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index 3e0bfa3b..2c1444ff 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -28,23 +28,15 @@ #include #include -#ifdef HAVE_SYS_SOCKET_H -#include -#endif -#ifdef HAVE_SYS_UN_H -#include -#endif #ifdef HAVE_NETINET_IN_H #include #endif - #include -#include +#include #include #include -#include #include #include #include @@ -81,7 +73,13 @@ enum { typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX]; #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t)) -#define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */ + +#define MINIBUF_SIZE (256) + +/* To allow uploading a single sample in one frame, this value should be the + * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h. + */ +#define FRAME_SIZE_MAX_ALLOW (1024*1024*16) PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree); @@ -96,7 +94,7 @@ struct item_info { /* packet info */ pa_packet *packet; #ifdef HAVE_CREDS - pa_bool_t with_creds; + bool with_creds; pa_creds creds; #endif @@ -119,14 +117,17 @@ struct pa_pstream { pa_queue *send_queue; - pa_bool_t dead; + bool dead; struct { - pa_pstream_descriptor descriptor; + union { + uint8_t minibuf[MINIBUF_SIZE]; + pa_pstream_descriptor descriptor; + }; struct item_info* current; - uint32_t shm_info[PA_PSTREAM_SHM_MAX]; void *data; size_t index; + int minibuf_validsize; pa_memchunk memchunk; } write; @@ -139,15 +140,15 @@ struct pa_pstream { size_t index; } read; - pa_bool_t use_shm; + bool use_shm; pa_memimport *import; pa_memexport *export; - pa_pstream_packet_cb_t recieve_packet_callback; - void *recieve_packet_callback_userdata; + pa_pstream_packet_cb_t receive_packet_callback; + void *receive_packet_callback_userdata; - pa_pstream_memblock_cb_t recieve_memblock_callback; - void *recieve_memblock_callback_userdata; + pa_pstream_memblock_cb_t receive_memblock_callback; + void *receive_memblock_callback_userdata; pa_pstream_notify_cb_t drain_callback; void *drain_callback_userdata; @@ -165,14 +166,14 @@ struct pa_pstream { #ifdef HAVE_CREDS pa_creds read_creds, write_creds; - pa_bool_t read_creds_valid, send_creds_now; + bool read_creds_valid, send_creds_now; #endif }; static int do_write(pa_pstream *p); static int do_read(pa_pstream *p); -static void do_something(pa_pstream *p) { +static void do_pstream_read_write(pa_pstream *p) { pa_assert(p); pa_assert(PA_REFCNT_VALUE(p) > 0); @@ -186,9 +187,12 @@ static void do_something(pa_pstream *p) { } else if (!p->dead && pa_iochannel_is_hungup(p->io)) goto fail; - if (!p->dead && pa_iochannel_is_writable(p->io)) { - if (do_write(p) < 0) + while (!p->dead && pa_iochannel_is_writable(p->io)) { + int r = do_write(p); + if (r < 0) goto fail; + if (r == 0) + break; } pa_pstream_unref(p); @@ -210,7 +214,7 @@ static void io_callback(pa_iochannel*io, void *userdata) { pa_assert(PA_REFCNT_VALUE(p) > 0); pa_assert(p->io == io); - do_something(p); + do_pstream_read_write(p); } static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) { @@ -221,7 +225,7 @@ static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) pa_assert(p->defer_event == e); pa_assert(p->mainloop == m); - do_something(p); + do_pstream_read_write(p); } static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata); @@ -233,11 +237,10 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *poo pa_assert(io); pa_assert(pool); - p = pa_xnew(pa_pstream, 1); + p = pa_xnew0(pa_pstream, 1); PA_REFCNT_INIT(p); p->io = io; pa_iochannel_set_callback(io, io_callback, p); - p->dead = FALSE; p->mainloop = m; p->defer_event = m->defer_new(m, defer_callback, p); @@ -245,45 +248,18 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *poo p->send_queue = pa_queue_new(); - p->write.current = NULL; - p->write.index = 0; - pa_memchunk_reset(&p->write.memchunk); - p->read.memblock = NULL; - p->read.packet = NULL; - p->read.index = 0; - - p->recieve_packet_callback = NULL; - p->recieve_packet_callback_userdata = NULL; - p->recieve_memblock_callback = NULL; - p->recieve_memblock_callback_userdata = NULL; - p->drain_callback = NULL; - p->drain_callback_userdata = NULL; - p->die_callback = NULL; - p->die_callback_userdata = NULL; - p->revoke_callback = NULL; - p->revoke_callback_userdata = NULL; - p->release_callback = NULL; - p->release_callback_userdata = NULL; - p->mempool = pool; - p->use_shm = FALSE; - p->export = NULL; - /* We do importing unconditionally */ p->import = pa_memimport_new(p->mempool, memimport_release_cb, p); pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool)); pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool)); -#ifdef HAVE_CREDS - p->send_creds_now = FALSE; - p->read_creds_valid = FALSE; -#endif return p; } -static void item_free(void *item, void *q) { +static void item_free(void *item) { struct item_info *i = item; pa_assert(i); @@ -304,10 +280,10 @@ static void pstream_free(pa_pstream *p) { pa_pstream_unlink(p); - pa_queue_free(p->send_queue, item_free, NULL); + pa_queue_free(p->send_queue, item_free); if (p->write.current) - item_free(p->write.current, NULL); + item_free(p->write.current); if (p->write.memchunk.memblock) pa_memblock_unref(p->write.memchunk.memblock); @@ -381,7 +357,7 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa i->offset = offset; i->seek_mode = seek_mode; #ifdef HAVE_CREDS - i->with_creds = FALSE; + i->with_creds = false; #endif pa_queue_push(p->send_queue, i); @@ -408,7 +384,7 @@ void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) { item->type = PA_PSTREAM_ITEM_SHMRELEASE; item->block_id = block_id; #ifdef HAVE_CREDS - item->with_creds = FALSE; + item->with_creds = false; #endif pa_queue_push(p->send_queue, item); @@ -445,7 +421,7 @@ void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) { item->type = PA_PSTREAM_ITEM_SHMREVOKE; item->block_id = block_id; #ifdef HAVE_CREDS - item->with_creds = FALSE; + item->with_creds = false; #endif pa_queue_push(p->send_queue, item); @@ -473,9 +449,9 @@ static void prepare_next_write_item(pa_pstream *p) { if (!p->write.current) return; - p->write.index = 0; p->write.data = NULL; + p->write.minibuf_validsize = 0; pa_memchunk_reset(&p->write.memchunk); p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0; @@ -490,6 +466,11 @@ static void prepare_next_write_item(pa_pstream *p) { p->write.data = p->write.current->packet->data; p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->packet->length); + if (p->write.current->packet->length <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) { + memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, p->write.current->packet->length); + p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + p->write.current->packet->length; + } + } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) { p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE); @@ -502,7 +483,7 @@ static void prepare_next_write_item(pa_pstream *p) { } else { uint32_t flags; - pa_bool_t send_payload = TRUE; + bool send_payload = true; pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK); pa_assert(p->write.current->chunk.memblock); @@ -516,6 +497,8 @@ static void prepare_next_write_item(pa_pstream *p) { if (p->use_shm) { uint32_t block_id, shm_id; size_t offset, length; + uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE]; + size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX; pa_assert(p->export); @@ -527,15 +510,15 @@ static void prepare_next_write_item(pa_pstream *p) { &length) >= 0) { flags |= PA_FLAG_SHMDATA; - send_payload = FALSE; + send_payload = false; - p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id); - p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id); - p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index)); - p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length); + shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id); + shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id); + shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index)); + shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length); - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info)); - p->write.data = p->write.shm_info; + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size); + p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size; } /* else */ /* pa_log_warn("Failed to export memory block."); */ @@ -572,7 +555,10 @@ static int do_write(pa_pstream *p) { if (!p->write.current) return 0; - if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) { + if (p->write.minibuf_validsize > 0) { + d = p->write.minibuf + p->write.index; + l = p->write.minibuf_validsize - p->write.index; + } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) { d = (uint8_t*) p->write.descriptor + p->write.index; l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index; } else { @@ -581,7 +567,7 @@ static int do_write(pa_pstream *p) { if (p->write.data) d = p->write.data; else { - d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index; + d = pa_memblock_acquire_chunk(&p->write.memchunk); release_memblock = p->write.memchunk.memblock; } @@ -597,7 +583,7 @@ static int do_write(pa_pstream *p) { if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0) goto fail; - p->send_creds_now = FALSE; + p->send_creds_now = false; } else #endif @@ -611,7 +597,7 @@ static int do_write(pa_pstream *p) { if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) { pa_assert(p->write.current); - item_free(p->write.current, NULL); + item_free(p->write.current); p->write.current = NULL; if (p->write.memchunk.memblock) @@ -623,7 +609,7 @@ static int do_write(pa_pstream *p) { p->drain_callback(p, p->drain_callback_userdata); } - return 0; + return (size_t) r == l ? 1 : 0; fail: @@ -660,7 +646,7 @@ static int do_read(pa_pstream *p) { #ifdef HAVE_CREDS { - pa_bool_t b = 0; + bool b = 0; if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0) goto fail; @@ -743,7 +729,7 @@ static int do_read(pa_pstream *p) { if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) { if (length != sizeof(p->read.shm_info)) { - pa_log_warn("Received SHM memblock frame with Invalid frame length."); + pa_log_warn("Received SHM memblock frame with invalid frame length."); return -1; } @@ -766,7 +752,7 @@ static int do_read(pa_pstream *p) { } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) { /* Frame payload available */ - if (p->read.memblock && p->recieve_memblock_callback) { + if (p->read.memblock && p->receive_memblock_callback) { /* Is this memblock data? Than pass it to the user */ l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r; @@ -778,20 +764,20 @@ static int do_read(pa_pstream *p) { chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l; chunk.length = l; - if (p->recieve_memblock_callback) { + if (p->receive_memblock_callback) { int64_t offset; offset = (int64_t) ( (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); - p->recieve_memblock_callback( + p->receive_memblock_callback( p, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), offset, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, &chunk, - p->recieve_memblock_callback_userdata); + p->receive_memblock_callback_userdata); } /* Drop seek info for following callbacks */ @@ -811,11 +797,11 @@ static int do_read(pa_pstream *p) { } else if (p->read.packet) { - if (p->recieve_packet_callback) + if (p->receive_packet_callback) #ifdef HAVE_CREDS - p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata); + p->receive_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->receive_packet_callback_userdata); #else - p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata); + p->receive_packet_callback(p, p->read.packet, NULL, p->receive_packet_callback_userdata); #endif pa_packet_unref(p->read.packet); @@ -836,7 +822,7 @@ static int do_read(pa_pstream *p) { pa_log_debug("Failed to import memory block."); } - if (p->recieve_memblock_callback) { + if (p->receive_memblock_callback) { int64_t offset; pa_memchunk chunk; @@ -848,13 +834,13 @@ static int do_read(pa_pstream *p) { (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); - p->recieve_memblock_callback( + p->receive_memblock_callback( p, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), offset, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, &chunk, - p->recieve_memblock_callback_userdata); + p->receive_memblock_callback_userdata); } if (b) @@ -874,7 +860,7 @@ frame_done: p->read.data = NULL; #ifdef HAVE_CREDS - p->read_creds_valid = FALSE; + p->read_creds_valid = false; #endif return 0; @@ -902,20 +888,20 @@ void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, voi p->drain_callback_userdata = userdata; } -void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) { +void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) { pa_assert(p); pa_assert(PA_REFCNT_VALUE(p) > 0); - p->recieve_packet_callback = cb; - p->recieve_packet_callback_userdata = userdata; + p->receive_packet_callback = cb; + p->receive_packet_callback_userdata = userdata; } -void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) { +void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) { pa_assert(p); pa_assert(PA_REFCNT_VALUE(p) > 0); - p->recieve_memblock_callback = cb; - p->recieve_memblock_callback_userdata = userdata; + p->receive_memblock_callback = cb; + p->receive_memblock_callback_userdata = userdata; } void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) { @@ -934,14 +920,14 @@ void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, p->release_callback_userdata = userdata; } -pa_bool_t pa_pstream_is_pending(pa_pstream *p) { - pa_bool_t b; +bool pa_pstream_is_pending(pa_pstream *p) { + bool b; pa_assert(p); pa_assert(PA_REFCNT_VALUE(p) > 0); if (p->dead) - b = FALSE; + b = false; else b = p->write.current || !pa_queue_isempty(p->send_queue); @@ -970,7 +956,7 @@ void pa_pstream_unlink(pa_pstream *p) { if (p->dead) return; - p->dead = TRUE; + p->dead = true; if (p->import) { pa_memimport_free(p->import); @@ -994,11 +980,11 @@ void pa_pstream_unlink(pa_pstream *p) { p->die_callback = NULL; p->drain_callback = NULL; - p->recieve_packet_callback = NULL; - p->recieve_memblock_callback = NULL; + p->receive_packet_callback = NULL; + p->receive_memblock_callback = NULL; } -void pa_pstream_enable_shm(pa_pstream *p, pa_bool_t enable) { +void pa_pstream_enable_shm(pa_pstream *p, bool enable) { pa_assert(p); pa_assert(PA_REFCNT_VALUE(p) > 0); @@ -1018,7 +1004,7 @@ void pa_pstream_enable_shm(pa_pstream *p, pa_bool_t enable) { } } -pa_bool_t pa_pstream_get_shm(pa_pstream *p) { +bool pa_pstream_get_shm(pa_pstream *p) { pa_assert(p); pa_assert(PA_REFCNT_VALUE(p) > 0);