flist_test_LDADD = $(AM_LDADD) libpulsecore.la
flist_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS)
-asyncq_test_SOURCES = tests/asyncq-test.c pulsecore/thread-posix.c pulsecore/thread.h pulsecore/asyncq.c pulsecore/asyncq.h pulsecore/core-util.c pulsecore/core-util.h pulse/xmalloc.c pulse/xmalloc.h pulsecore/log.h pulsecore/log.c pulsecore/core-error.h pulsecore/core-error.c pulsecore/once-posix.c pulsecore/once.h pulsecore/mutex-posix.c pulsecore/mutex.h pulse/utf8.c pulse/utf8.h pulse/util.h pulse/util.c
+asyncq_test_SOURCES = tests/asyncq-test.c
asyncq_test_CFLAGS = $(AM_CFLAGS)
-asyncq_test_LDADD = $(AM_LDADD) #libpulsecore.la
+asyncq_test_LDADD = $(AM_LDADD) libpulsecore.la
asyncq_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS)
-asyncmsgq_test_SOURCES = tests/asyncmsgq-test.c pulsecore/thread-posix.c pulsecore/thread.h pulsecore/asyncq.c pulsecore/asyncq.h pulsecore/asyncmsgq.c pulsecore/asyncmsgq.h pulsecore/core-util.c pulsecore/core-util.h pulse/xmalloc.c pulse/xmalloc.h pulsecore/log.h pulsecore/log.c pulsecore/core-error.h pulsecore/core-error.c pulsecore/once-posix.c pulsecore/once.h pulsecore/mutex-posix.c pulsecore/mutex.h pulse/utf8.c pulse/utf8.h pulse/util.h pulse/util.c pulsecore/semaphore.h pulsecore/semaphore-posix.c pulsecore/flist.h pulsecore/flist.c
+asyncmsgq_test_SOURCES = tests/asyncmsgq-test.c
asyncmsgq_test_CFLAGS = $(AM_CFLAGS)
-asyncmsgq_test_LDADD = $(AM_LDADD) #libpulsecore.la
+asyncmsgq_test_LDADD = $(AM_LDADD) libpulsecore.la
asyncmsgq_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS)
mcalign_test_SOURCES = tests/mcalign-test.c
pulsecore/hook-list.c pulsecore/hook-list.h \
pulsecore/shm.c pulsecore/shm.h \
pulsecore/flist.c pulsecore/flist.h \
- pulsecore/anotify.c pulsecore/anotify.h \
+ pulsecore/asyncmsgq.c pulsecore/asyncmsgqq.h \
+ pulsecore/asyncq.c pulsecore/asyncq.h \
+ pulsecore/object.c pulsecore/object.h \
+ pulsecore/msgobject.c pulsecore/msgobject.h \
$(PA_THREAD_OBJS)
if OS_IS_WIN32
libauthkey-prop.la \
libstrlist.la \
libprotocol-simple.la \
- libprotocol-esound.la \
- libprotocol-native.la \
- libprotocol-http.la
+ libprotocol-http.la
+
+# libprotocol-esound.la
+# libprotocol-native.la
# We need to emulate sendmsg/recvmsg to support this on Win32
if !OS_IS_WIN32
module-cli-protocol-tcp.la \
module-simple-protocol-tcp.la \
module-null-sink.la
+ module-detect.la \
+ module-volume-restore.la \
+ module-rescue-streams.la \
+ module-http-protocol-tcp.la
+
# module-esound-protocol-tcp.la \
# module-native-protocol-tcp.la \
# module-native-protocol-fd.la \
# module-combine.la \
# module-tunnel-sink.la \
# module-tunnel-source.la \
-# module-esound-sink.la \
-# module-http-protocol-tcp.la \
-# module-detect.la \
-# module-volume-restore.la \
-# module-rescue-streams.la
+# module-esound-sink.la
# See comment at librtp.la above
#if !OS_IS_WIN32
modlibexec_LTLIBRARIES += \
module-cli-protocol-unix.la \
module-simple-protocol-unix.la
+ module-http-protocol-unix.la
# module-esound-protocol-unix.la \
-# module-native-protocol-unix.la \
-# module-http-protocol-unix.la
+# module-native-protocol-unix.la
endif
if HAVE_MKFIFO
# Native protocol
-module_native_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c
-module_native_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
-module_native_protocol_tcp_la_LDFLAGS = -module -avoid-version
-module_native_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la
+#module_native_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c
+#module_native_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
+#module_native_protocol_tcp_la_LDFLAGS = -module -avoid-version
+#module_native_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la
-module_native_protocol_unix_la_SOURCES = modules/module-protocol-stub.c
-module_native_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
-module_native_protocol_unix_la_LDFLAGS = -module -avoid-version
-module_native_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la
+#module_native_protocol_unix_la_SOURCES = modules/module-protocol-stub.c
+#module_native_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
+#module_native_protocol_unix_la_LDFLAGS = -module -avoid-version
+#module_native_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la
-module_native_protocol_fd_la_SOURCES = modules/module-native-protocol-fd.c
-module_native_protocol_fd_la_CFLAGS = $(AM_CFLAGS)
-module_native_protocol_fd_la_LDFLAGS = -module -avoid-version
-module_native_protocol_fd_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la libiochannel.la
+#module_native_protocol_fd_la_SOURCES = modules/module-native-protocol-fd.c
+#module_native_protocol_fd_la_CFLAGS = $(AM_CFLAGS)
+#module_native_protocol_fd_la_LDFLAGS = -module -avoid-version
+#module_native_protocol_fd_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la libiochannel.la
# EsounD protocol
-module_esound_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c
-module_esound_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS)
-module_esound_protocol_tcp_la_LDFLAGS = -module -avoid-version
-module_esound_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la
+#module_esound_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c
+#module_esound_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS)
+#module_esound_protocol_tcp_la_LDFLAGS = -module -avoid-version
+#module_esound_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la
-module_esound_protocol_unix_la_SOURCES = modules/module-protocol-stub.c
-module_esound_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS)
-module_esound_protocol_unix_la_LDFLAGS = -module -avoid-version
-module_esound_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la libsocket-util.la
+#module_esound_protocol_unix_la_SOURCES = modules/module-protocol-stub.c
+#module_esound_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS)
+#module_esound_protocol_unix_la_LDFLAGS = -module -avoid-version
+#module_esound_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la libsocket-util.la
-module_esound_compat_spawnfd_la_SOURCES = modules/module-esound-compat-spawnfd.c
-module_esound_compat_spawnfd_la_LDFLAGS = -module -avoid-version
-module_esound_compat_spawnfd_la_LIBADD = $(AM_LIBADD) libpulsecore.la
+#module_esound_compat_spawnfd_la_SOURCES = modules/module-esound-compat-spawnfd.c
+#module_esound_compat_spawnfd_la_LDFLAGS = -module -avoid-version
+#module_esound_compat_spawnfd_la_LIBADD = $(AM_LIBADD) libpulsecore.la
-module_esound_compat_spawnpid_la_SOURCES = modules/module-esound-compat-spawnpid.c
-module_esound_compat_spawnpid_la_LDFLAGS = -module -avoid-version
-module_esound_compat_spawnpid_la_LIBADD = $(AM_LIBADD) libpulsecore.la
+#module_esound_compat_spawnpid_la_SOURCES = modules/module-esound-compat-spawnpid.c
+#module_esound_compat_spawnpid_la_LDFLAGS = -module -avoid-version
+#module_esound_compat_spawnpid_la_LIBADD = $(AM_LIBADD) libpulsecore.la
-module_esound_sink_la_SOURCES = modules/module-esound-sink.c
-module_esound_sink_la_LDFLAGS = -module -avoid-version
-module_esound_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la libiochannel.la libsocket-client.la libauthkey.la
+#module_esound_sink_la_SOURCES = modules/module-esound-sink.c
+#module_esound_sink_la_LDFLAGS = -module -avoid-version
+#module_esound_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la libiochannel.la libsocket-client.la libauthkey.la
# Pipes
# Couplings
-module_combine_la_SOURCES = modules/module-combine.c
-module_combine_la_LDFLAGS = -module -avoid-version
-module_combine_la_LIBADD = $(AM_LIBADD) libpulsecore.la
+#module_combine_la_SOURCES = modules/module-combine.c
+#module_combine_la_LDFLAGS = -module -avoid-version
+#module_combine_la_LIBADD = $(AM_LIBADD) libpulsecore.la
module_match_la_SOURCES = modules/module-match.c
module_match_la_LDFLAGS = -module -avoid-version
module_match_la_LIBADD = $(AM_LIBADD) libpulsecore.la
-module_tunnel_sink_la_SOURCES = modules/module-tunnel.c
-module_tunnel_sink_la_CFLAGS = -DTUNNEL_SINK=1 $(AM_CFLAGS)
-module_tunnel_sink_la_LDFLAGS = -module -avoid-version
-module_tunnel_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la libauthkey-prop.la libsocket-util.la libiochannel.la
+#module_tunnel_sink_la_SOURCES = modules/module-tunnel.c
+#module_tunnel_sink_la_CFLAGS = -DTUNNEL_SINK=1 $(AM_CFLAGS)
+#module_tunnel_sink_la_LDFLAGS = -module -avoid-version
+#module_tunnel_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la libauthkey-prop.la libsocket-util.la libiochannel.la
-module_tunnel_source_la_SOURCES = modules/module-tunnel.c
-module_tunnel_source_la_LDFLAGS = -module -avoid-version
-module_tunnel_source_la_LIBADD = $(AM_LIBADD) libpulsecore.la libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la libauthkey-prop.la libsocket-util.la libiochannel.la
+#module_tunnel_source_la_SOURCES = modules/module-tunnel.c
+#module_tunnel_source_la_LDFLAGS = -module -avoid-version
+#module_tunnel_source_la_LIBADD = $(AM_LIBADD) libpulsecore.la libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la libauthkey-prop.la libsocket-util.la libiochannel.la
# X11
# OSS
-liboss_util_la_SOURCES = modules/oss-util.c modules/oss-util.h
-liboss_util_la_LDFLAGS = -avoid-version
-liboss_util_la_LIBADD = libpulsecore.la
+#liboss_util_la_SOURCES = modules/oss-util.c modules/oss-util.h
+#liboss_util_la_LDFLAGS = -avoid-version
+#liboss_util_la_LIBADD = libpulsecore.la
-module_oss_la_SOURCES = modules/module-oss.c
-module_oss_la_LDFLAGS = -module -avoid-version
-module_oss_la_LIBADD = $(AM_LIBADD) libiochannel.la liboss-util.la
+#module_oss_la_SOURCES = modules/module-oss.c
+#module_oss_la_LDFLAGS = -module -avoid-version
+#module_oss_la_LIBADD = $(AM_LIBADD) libiochannel.la liboss-util.la
-module_oss_mmap_la_SOURCES = modules/module-oss-mmap.c
-module_oss_mmap_la_LDFLAGS = -module -avoid-version
-module_oss_mmap_la_LIBADD = $(AM_LIBADD) liboss-util.la libpulsecore.la
+#module_oss_mmap_la_SOURCES = modules/module-oss-mmap.c
+#module_oss_mmap_la_LDFLAGS = -module -avoid-version
+#module_oss_mmap_la_LIBADD = $(AM_LIBADD) liboss-util.la libpulsecore.la
# ALSA
-libalsa_util_la_SOURCES = modules/alsa-util.c modules/alsa-util.h
-libalsa_util_la_LDFLAGS = -avoid-version
-libalsa_util_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libpulsecore.la
-libalsa_util_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS)
+#libalsa_util_la_SOURCES = modules/alsa-util.c modules/alsa-util.h
+#libalsa_util_la_LDFLAGS = -avoid-version
+#libalsa_util_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libpulsecore.la
+#libalsa_util_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS)
-module_alsa_sink_la_SOURCES = modules/module-alsa-sink.c
-module_alsa_sink_la_LDFLAGS = -module -avoid-version
-module_alsa_sink_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libalsa-util.la libpulsecore.la
-module_alsa_sink_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS)
+#module_alsa_sink_la_SOURCES = modules/module-alsa-sink.c
+#module_alsa_sink_la_LDFLAGS = -module -avoid-version
+#module_alsa_sink_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libalsa-util.la libpulsecore.la
+#module_alsa_sink_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS)
-module_alsa_source_la_SOURCES = modules/module-alsa-source.c
-module_alsa_source_la_LDFLAGS = -module -avoid-version
-module_alsa_source_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libalsa-util.la libpulsecore.la
-module_alsa_source_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS)
+#module_alsa_source_la_SOURCES = modules/module-alsa-source.c
+#module_alsa_source_la_LDFLAGS = -module -avoid-version
+#module_alsa_source_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libalsa-util.la libpulsecore.la
+#module_alsa_source_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS)
# Solaris
# JACK
-module_jack_sink_la_SOURCES = modules/module-jack-sink.c
-module_jack_sink_la_LDFLAGS = -module -avoid-version
-module_jack_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la $(JACK_LIBS)
-module_jack_sink_la_CFLAGS = $(AM_LIBADD) $(JACK_CFLAGS)
+#module_jack_sink_la_SOURCES = modules/module-jack-sink.c
+#module_jack_sink_la_LDFLAGS = -module -avoid-version
+#module_jack_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la $(JACK_LIBS)
+#module_jack_sink_la_CFLAGS = $(AM_LIBADD) $(JACK_CFLAGS)
-module_jack_source_la_SOURCES = modules/module-jack-source.c
-module_jack_source_la_LDFLAGS = -module -avoid-version
-module_jack_source_la_LIBADD = $(AM_LIBADD) libpulsecore.la $(JACK_LIBS)
-module_jack_source_la_CFLAGS = $(AM_LIBADD) $(JACK_CFLAGS)
+#module_jack_source_la_SOURCES = modules/module-jack-source.c
+#module_jack_source_la_LDFLAGS = -module -avoid-version
+#module_jack_source_la_LIBADD = $(AM_LIBADD) libpulsecore.la $(JACK_LIBS)
+#module_jack_source_la_CFLAGS = $(AM_LIBADD) $(JACK_CFLAGS)
# HAL
libdbus_util_la_SOURCES = modules/dbus-util.c modules/dbus-util.h
pa_mainloop_get_api(mainloop)->time_free(timer);
#endif
- pa_core_free(c);
+ pa_core_unref(c);
if (!conf->no_cpu_limit)
pa_cpu_limit_done();
pa_log("failed to get sink '%s'", u->sink_name);
else {
int i;
- pa_cvolume cv = *pa_sink_get_volume(s, PA_MIXER_HARDWARE);
+ pa_cvolume cv = *pa_sink_get_volume(s);
#define DELTA (PA_VOLUME_NORM/20)
cv.values[i] = PA_VOLUME_NORM;
}
- pa_sink_set_volume(s, PA_MIXER_HARDWARE, &cv);
+ pa_sink_set_volume(s, &cv);
break;
case DOWN:
cv.values[i] = PA_VOLUME_MUTED;
}
- pa_sink_set_volume(s, PA_MIXER_HARDWARE, &cv);
+ pa_sink_set_volume(s, &cv);
break;
case MUTE:
- pa_sink_set_mute(s, PA_MIXER_HARDWARE, 0);
+ pa_sink_set_mute(s, 0);
break;
case RESET:
- pa_sink_set_mute(s, PA_MIXER_HARDWARE, 1);
+ pa_sink_set_mute(s, 1);
break;
case MUTE_TOGGLE:
- pa_sink_set_mute(s, PA_MIXER_HARDWARE, !pa_sink_get_mute(s, PA_MIXER_HARDWARE));
+ pa_sink_set_mute(s, !pa_sink_get_mute(s));
break;
case INVALID:
pa_log("failed to get sink '%s'", u->sink_name);
else {
int i;
- pa_cvolume cv = *pa_sink_get_volume(s, PA_MIXER_HARDWARE);
+ pa_cvolume cv = *pa_sink_get_volume(s);
#define DELTA (PA_VOLUME_NORM/20)
cv.values[i] = PA_VOLUME_NORM;
}
- pa_sink_set_volume(s, PA_MIXER_HARDWARE, &cv);
+ pa_sink_set_volume(s, &cv);
break;
case DOWN:
cv.values[i] = PA_VOLUME_MUTED;
}
- pa_sink_set_volume(s, PA_MIXER_HARDWARE, &cv);
+ pa_sink_set_volume(s, &cv);
break;
case MUTE_TOGGLE:
- pa_sink_set_mute(s, PA_MIXER_HARDWARE, !pa_sink_get_mute(s, PA_MIXER_HARDWARE));
+ pa_sink_set_mute(s, !pa_sink_get_mute(s));
break;
case INVALID:
#include <fcntl.h>
#include <unistd.h>
#include <limits.h>
+#include <sys/poll.h>
#include <pulse/timeval.h>
#include <pulse/xmalloc.h>
#include <pulsecore/macro.h>
-#include <pulsecore/iochannel.h>
#include <pulsecore/sink.h>
#include <pulsecore/module.h>
#include <pulsecore/core-util.h>
+#include <pulsecore/core-error.h>
#include <pulsecore/modargs.h>
#include <pulsecore/log.h>
+#include <pulsecore/thread.h>
#include "module-null-sink-symdef.h"
pa_module *module;
pa_sink *sink;
pa_thread *thread;
+ pa_asyncmsgq *asyncmsgq;
size_t block_size;
+
struct timeval timestamp;
};
NULL
};
+static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+ struct userdata *u = PA_SINK(o)->userdata;
+
+ switch (code) {
+ case PA_SINK_MESSAGE_SET_STATE:
+
+ if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING)
+ pa_gettimeofday(&u->timestamp);
+
+ break;
+
+ case PA_SINK_MESSAGE_GET_LATENCY: {
+ struct timeval now;
+
+ pa_gettimeofday(&now);
+
+ if (pa_timeval_cmp(&u->timestamp, &now) > 0)
+ *((pa_usec_t*) data) = 0;
+ else
+ *((pa_usec_t*) data) = pa_timeval_diff(&u->timestamp, &now);
+ break;
+ }
+ }
+
+ return pa_sink_process_msg(o, code, data, chunk);
+}
+
static void thread_func(void *userdata) {
struct userdata *u = userdata;
- int quit = 0;
struct pollfd pollfd;
- int running = 1;
pa_assert(u);
pa_log_debug("Thread starting up");
+ pa_gettimeofday(&u->timestamp);
+
memset(&pollfd, 0, sizeof(pollfd));
- pollfd.fd = pa_asyncmsgq_get_fd(u->sink->asyncmsgq, PA_ASYNCQ_POP);
+ pollfd.fd = pa_asyncmsgq_get_fd(u->asyncmsgq);
pollfd.events = POLLIN;
- pa_gettimeofday(u->timestamp);
-
for (;;) {
+ pa_msgobject *object;
int code;
- void *data, *object;
+ void *data;
+ pa_memchunk chunk;
int r, timeout;
struct timeval now;
/* Check whether there is a message for us to process */
- if (pa_asyncmsgq_get(u->sink->asyncmsgq, &object, &code, &data) == 0) {
-
-
- /* Now process these messages our own way */
- if (!object) {
-
- switch (code) {
- case PA_MESSAGE_SHUTDOWN:
- goto finish;
-
- default:
- pa_sink_process_msg(u->sink->asyncmsgq, object, code, data);
-
- }
+ if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+ int ret;
- } else if (object == u->sink) {
-
- switch (code) {
- case PA_SINK_MESSAGE_STOP:
- pa_assert(running);
- running = 0;
- break;
-
- case PA_SINK_MESSAGE_START:
- pa_assert(!running);
- running = 1;
-
- pa_gettimeofday(u->timestamp);
- break;
-
- case PA_SINK_MESSAGE_GET_LATENCY:
-
- if (pa_timeval_cmp(&u->timestamp, &now) > 0)
- *((pa_usec_t*) data) = 0;
- else
- *((pa_usec_t*) data) = pa_timeval_diff(&u->timestamp, &now);
- break;
-
- /* ... */
-
- default:
- pa_sink_process_msg(u->sink->asyncmsgq, object, code, data);
- }
+ if (!object && code == PA_MESSAGE_SHUTDOWN) {
+ pa_asyncmsgq_done(u->asyncmsgq, 0);
+ goto finish;
}
-
- pa_asyncmsgq_done(u->sink->asyncmsgq);
+
+ ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+ pa_asyncmsgq_done(u->asyncmsgq, ret);
continue;
}
/* Render some data and drop it immediately */
-
- if (running) {
+ if (u->sink->thread_info.state == PA_SINK_RUNNING) {
pa_gettimeofday(&now);
- if (pa_timeval_cmp(u->timestamp, &now) <= 0) {
- pa_memchunk chunk;
+ if (pa_timeval_cmp(&u->timestamp, &now) <= 0) {
size_t l;
if (pa_sink_render(u->sink, u->block_size, &chunk) >= 0) {
/* Hmm, nothing to do. Let's sleep */
- if (pa_asyncmsgq_before_poll(u->sink->asyncmsgq) < 0)
+ if (pa_asyncmsgq_before_poll(u->asyncmsgq) < 0)
continue;
r = poll(&pollfd, 1, timeout);
- pa_asyncmsgq_after_poll(u->sink->asyncmsgq);
+ pa_asyncmsgq_after_poll(u->asyncmsgq);
if (r < 0) {
if (errno == EINTR)
fail:
/* We have to continue processing messages until we receive the
* SHUTDOWN message */
- pa_asyncmsgq_post(u->core->asyncmsgq, u->core, PA_CORE_MESSAGE_UNLOAD_MODULE, pa_module_ref(u->module), NULL, pa_module_unref);
- pa_asyncmsgq_wait_for(PA_MESSAGE_SHUTDOWN);
+ pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+ pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
finish:
pa_log_debug("Thread shutting down");
u->module = m;
m->userdata = u;
+ pa_assert_se(u->asyncmsgq = pa_asyncmsgq_new(0));
+
if (!(u->sink = pa_sink_new(c, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, &map))) {
pa_log("Failed to create sink.");
goto fail;
}
+ u->sink->parent.process_msg = sink_process_msg;
u->sink->userdata = u;
- pa_sink_set_owner(u->sink, m);
+
+ pa_sink_set_module(u->sink, m);
+ pa_sink_set_asyncmsgq(u->sink, u->asyncmsgq);
pa_sink_set_description(u->sink, pa_modargs_get_value(ma, "description", "NULL sink"));
u->block_size = pa_bytes_per_second(&ss) / 20; /* 50 ms */
-
if (u->block_size <= 0)
u->block_size = pa_frame_size(&ss);
-
+
if (!(u->thread = pa_thread_new(thread_func, u))) {
pa_log("Failed to create thread.");
goto fail;
if (!(u = m->userdata))
return;
- pa_sink_disconnect(u->sink);
+ if (u->sink)
+ pa_sink_disconnect(u->sink);
if (u->thread) {
- pa_asyncmsgq_send(u->sink->asyncmsgq, PA_SINK_MESSAGE_SHUTDOWN, NULL);
+ pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
pa_thread_free(u->thread);
}
- pa_sink_unref(u->sink);
+ if (u->asyncmsgq)
+ pa_asyncmsgq_free(u->asyncmsgq);
+
+ if (u->sink)
+ pa_sink_unref(u->sink);
pa_xfree(u);
}
#include <fcntl.h>
#include <unistd.h>
#include <limits.h>
+#include <sys/ioctl.h>
+#include <sys/poll.h>
#include <pulse/xmalloc.h>
#include <pulsecore/core-util.h>
#include <pulsecore/modargs.h>
#include <pulsecore/log.h>
+#include <pulsecore/thread.h>
#include "module-pipe-sink-symdef.h"
pa_core *core;
pa_module *module;
pa_sink *sink;
+ pa_thread *thread;
+ pa_asyncmsgq *asyncmsgq;
char *filename;
int fd;
- pa_thread *thread;
+
+ pa_memchunk memchunk;
};
static const char* const valid_modargs[] = {
NULL
};
-enum {
- POLLFD_ASYNCQ,
- POLLFD_FIFO,
- POLLFD_MAX,
-};
+static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+ struct userdata *u = PA_SINK(o)->userdata;
+
+ switch (code) {
+
+ case PA_SINK_MESSAGE_GET_LATENCY: {
+ size_t n = 0;
+ int l;
+
+ if (ioctl(u->fd, TIOCINQ, &l) >= 0 && l > 0)
+ n = (size_t) l;
+
+ n += u->memchunk.length;
+
+ *((pa_usec_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec);
+ break;
+ }
+ }
+
+ return pa_sink_process_msg(o, code, data, chunk);
+}
static void thread_func(void *userdata) {
+ enum {
+ POLLFD_ASYNCQ,
+ POLLFD_FIFO,
+ POLLFD_MAX,
+ };
+
struct userdata *u = userdata;
- int quit = 0;
struct pollfd pollfd[POLLFD_MAX];
- int running = 1, underrun = 0;
- pa_memchunk memchunk;
+ int underrun = 0;
+ int write_type = 0;
pa_assert(u);
pa_log_debug("Thread starting up");
+ pa_memchunk_reset(&u->memchunk);
+
memset(&pollfd, 0, sizeof(pollfd));
- pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->sink->asyncmsgq, PA_ASYNCQ_POP);
+
+ pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->asyncmsgq);
pollfd[POLLFD_ASYNCQ].events = POLLIN;
-
pollfd[POLLFD_FIFO].fd = u->fd;
- memset(&memchunk, 0, sizeof(memchunk));
-
for (;;) {
+ pa_msgobject *object;
int code;
- void *object, *data;
+ void *data;
+ pa_memchunk chunk;
int r;
- struct timeval now;
/* Check whether there is a message for us to process */
- if (pa_asyncmsgq_get(u->sink->asyncmsgq, &object, &code, &data) == 0) {
-
-
- /* Now process these messages our own way */
- if (!object) {
- switch (code) {
- case PA_SINK_MESSAGE_SHUTDOWN:
- goto finish;
-
- default:
- pa_sink_process_msg(u->sink->asyncmsgq, object, code, data);
- }
+ if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+ int ret;
- } else if (object == u->sink) {
-
- case PA_SINK_MESSAGE_STOP:
- pa_assert(running);
- running = 0;
- break;
-
- case PA_SINK_MESSAGE_START:
- pa_assert(!running);
- running = 1;
- break;
-
- case PA_SINK_MESSAGE_GET_LATENCY: {
- size_t n = 0;
- int l;
-
- if (ioctl(u->fd, TIOCINQ, &l) >= 0 && l > 0)
- n = (size_t) l;
-
- n += memchunk.length;
-
- *((pa_usec_t*) data) pa_bytes_to_usec(n, &u->sink->sample_spec);
- break;
- }
-
- /* ... */
-
- default:
- pa_sink_process_msg(u->sink->asyncmsgq, object, code, data);
+ if (!object && code == PA_MESSAGE_SHUTDOWN) {
+ pa_asyncmsgq_done(u->asyncmsgq, 0);
+ goto finish;
}
- pa_asyncmsgq_done(u->sink->asyncmsgq);
+ ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+ pa_asyncmsgq_done(u->asyncmsgq, ret);
continue;
}
/* Render some data and write it to the fifo */
- if (running && (pollfd[POLLFD_FIFO].revents || underrun)) {
+ if (u->sink->thread_info.state == PA_SINK_RUNNING && (pollfd[POLLFD_FIFO].revents || underrun)) {
- if (chunk.length <= 0)
- pa_sink_render(u->fd, PIPE_BUF, &chunk);
+ if (u->memchunk.length <= 0)
+ pa_sink_render(u->sink, PIPE_BUF, &u->memchunk);
- underrun = chunk.length <= 0;
+ underrun = u->memchunk.length <= 0;
if (!underrun) {
ssize_t l;
+ void *p;
p = pa_memblock_acquire(u->memchunk.memblock);
- l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length);
- pa_memblock_release(p);
+ l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &write_type);
+ pa_memblock_release(u->memchunk.memblock);
if (l < 0) {
- if (errno != EINTR && errno != EAGAIN) {
+ if (errno == EINTR)
+ continue;
+ else if (errno != EAGAIN) {
pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
goto fail;
}
-
+
} else {
u->memchunk.index += l;
if (u->memchunk.length <= 0) {
pa_memblock_unref(u->memchunk.memblock);
- u->memchunk.memblock = NULL;
+ pa_memchunk_reset(&u->memchunk);
}
- }
- pollfd[POLLFD_FIFO].revents = 0;
- continue;
+ pollfd[POLLFD_FIFO].revents = 0;
+ continue;
+ }
}
}
- pollfd[POLLFD_FIFO].events = running && !underrun ? POLLOUT : 0;
+ pollfd[POLLFD_FIFO].events = (u->sink->thread_info.state == PA_SINK_RUNNING && !underrun) ? POLLOUT : 0;
/* Hmm, nothing to do. Let's sleep */
- if (pa_asyncmsgq_before_poll(u->sink->asyncmsgq) < 0)
+ if (pa_asyncmsgq_before_poll(u->asyncmsgq) < 0)
continue;
- r = poll(&pollfd, 1, 0);
- pa_asyncmsgq_after_poll(u->sink->asyncmsgq);
+ r = poll(pollfd, POLLFD_MAX, -1);
+ pa_asyncmsgq_after_poll(u->asyncmsgq);
if (r < 0) {
if (errno == EINTR)
goto fail;
}
- if (pollfd[POLLFD_FIFO].revents & ~POLLIN) {
+ if (pollfd[POLLFD_FIFO].revents & ~POLLOUT) {
pa_log("FIFO shutdown.");
goto fail;
}
- pa_assert(pollfd[POLLFD_ASYNCQ].revents & ~POLLIN == 0);
+ pa_assert((pollfd[POLLFD_ASYNCQ].revents & ~POLLIN) == 0);
}
fail:
/* We have to continue processing messages until we receive the
* SHUTDOWN message */
- pa_asyncmsgq_post(u->core->asyncmsgq, u->core, PA_CORE_MESSAGE_UNLOAD_MODULE, pa_module_ref(u->module), pa_module_unref);
- pa_asyncmsgq_wait_for(PA_SINK_MESSAGE_SHUTDOWN);
+ pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+ pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
finish:
pa_log_debug("Thread shutting down");
ss = c->default_sample_spec;
if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
- pa_log("Invalid sample format specification");
+ pa_log("Invalid sample format specification or channel map");
goto fail;
}
u = pa_xnew0(struct userdata, 1);
u->core = c;
u->module = m;
- u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FIFO_NAME));
- u->fd = fd;
- u->memchunk.memblock = NULL;
- u->memchunk.length = 0;
m->userdata = u;
- mkfifo(u->filename, 0666);
+ pa_assert_se(u->asyncmsgq = pa_asyncmsgq_new(0));
+
+ u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME));
- if ((u->fd = open(u->filename, O_RDWR)) < 0) {
- pa_log("open('%s'): %s", p, pa_cstrerror(errno));
+ mkfifo(u->filename, 0666);
+ if ((u->fd = open(u->filename, O_RDWR|O_NOCTTY)) < 0) {
+ pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
goto fail;
}
pa_make_nonblock_fd(u->fd);
if (fstat(u->fd, &st) < 0) {
- pa_log("fstat('%s'): %s", p, pa_cstrerror(errno));
+ pa_log("fstat('%s'): %s", u->filename, pa_cstrerror(errno));
goto fail;
}
if (!S_ISFIFO(st.st_mode)) {
- pa_log("'%s' is not a FIFO.", p);
+ pa_log("'%s' is not a FIFO.", u->filename);
goto fail;
}
goto fail;
}
+ u->sink->parent.process_msg = sink_process_msg;
u->sink->userdata = u;
- pa_sink_set_owner(u->sink, m);
- pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Unix FIFO sink '%s'", p));
+
+ pa_sink_set_module(u->sink, m);
+ pa_sink_set_asyncmsgq(u->sink, u->asyncmsgq);
+ pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Unix FIFO sink '%s'", u->filename));
pa_xfree(t);
if (!(u->thread = pa_thread_new(thread_func, u))) {
void pa__done(pa_core *c, pa_module*m) {
struct userdata *u;
+
pa_assert(c);
pa_assert(m);
if (!(u = m->userdata))
return;
- pa_sink_disconnect(u->sink);
+ if (u->sink)
+ pa_sink_disconnect(u->sink);
if (u->thread) {
- pa_asyncmsgq_send(u->sink->asyncmsgq, PA_SINK_MESSAGE_SHUTDOWN, NULL);
+ pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
pa_thread_free(u->thread);
}
- pa_sink_unref(u->sink);
+ if (u->asyncmsgq)
+ pa_asyncmsgq_free(u->asyncmsgq);
+
+ if (u->sink)
+ pa_sink_unref(u->sink);
if (u->memchunk.memblock)
pa_memblock_unref(u->memchunk.memblock);
goto fail;
}
u->source->userdata = u;
- pa_source_set_owner(u->source, m);
+ pa_source_set_module(u->source, m);
pa_source_set_description(u->source, t = pa_sprintf_malloc("Unix FIFO source '%s'", p));
pa_xfree(t);
pa_free_cb_t free_cb;
pa_memchunk memchunk;
pa_semaphore *semaphore;
+ int ret;
};
struct pa_asyncmsgq {
pa_msgobject_unref(i->object);
if (i->memchunk.memblock)
- pa_memblock_unref(i->object);
+ pa_memblock_unref(i->memchunk.memblock);
- if (i->userdata_free_cb)
- i->userdata_free_cb(i->userdata);
+ if (i->free_cb)
+ i->free_cb(i->userdata);
if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
pa_xfree(i);
i = pa_xnew(struct asyncmsgq_item, 1);
i->code = code;
- i->object = pa_msgobject_ref(object);
+ i->object = object ? pa_msgobject_ref(object) : NULL;
i->userdata = (void*) userdata;
i->free_cb = free_cb;
if (chunk) {
i.ret = -1;
if (chunk) {
pa_assert(chunk->memblock);
- i->memchunk = *chunk;
+ i.memchunk = *chunk;
} else
- pa_memchunk_reset(&i->memchunk);
+ pa_memchunk_reset(&i.memchunk);
pa_assert_se(i.semaphore = pa_semaphore_new(0));
/* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
if (object)
*object = a->current->object;
if (chunk)
- *chunk = a->chunk;
+ *chunk = a->current->memchunk;
+ pa_log_debug("q=%p object=%p code=%i data=%p", a, a->current->object, a->current->code, a->current->userdata);
+
return 0;
}
pa_assert(a);
do {
+ pa_msgobject *o;
+ void *data;
+ pa_memchunk chunk;
+ int ret;
- if (pa_asyncmsgq_get(a, NULL, &c, NULL, 1) < 0)
+ if (pa_asyncmsgq_get(a, &o, &c, &data, &chunk, 1) < 0)
return -1;
- pa_asyncmsgq_done(a);
+ ret = pa_asyncmsgq_dispatch(o, c, data, &chunk);
+ pa_asyncmsgq_done(a, ret);
} while (c != code);
}
int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, pa_memchunk *memchunk) {
- pa_assert(q);
if (object)
- return object->msg_process(object, code, userdata, memchunk);
+ return object->process_msg(object, code, userdata, memchunk);
return 0;
}
unsigned size;
unsigned read_idx;
unsigned write_idx;
- pa_atomic_int_t read_waiting;
- pa_atomic_int_t write_waiting;
+ pa_atomic_t read_waiting, n_read;
+ pa_atomic_t write_waiting, n_written;
int read_fds[2], write_fds[2];
};
l->size = size;
pa_atomic_store(&l->read_waiting, 0);
pa_atomic_store(&l->write_waiting, 0);
+ pa_atomic_store(&l->n_written, 0);
+ pa_atomic_store(&l->n_read, 0);
if (pipe(l->read_fds) < 0) {
pa_xfree(l);
if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
- /* First try failed. Let's wait for changes. */
-
- if (!wait)
+ if (!wait) {
+ /* Let's empty the FIFO from old notifications, before we return */
+
+ while (pa_atomic_load(&l->n_read) > 0) {
+ ssize_t r;
+ int x[20];
+
+ errno = 0;
+ if ((r = read(l->write_fds[0], x, sizeof(x))) <= 0 && errno != EINTR)
+ return -1;
+
+ if (r > 0)
+ if (pa_atomic_sub(&l->n_read, r) <= r)
+ break;
+ }
+
return -1;
+ }
+
+ /* First try failed. Let's wait for changes. */
_Y;
for (;;) {
char x[20];
+ ssize_t r;
_Y;
_Y;
- if (read(l->write_fds[0], x, sizeof(x)) < 0 && errno != EINTR) {
+ if ((r = read(l->write_fds[0], x, sizeof(x))) < 0 && errno != EINTR) {
pa_atomic_dec(&l->write_waiting);
return -1;
}
+
+ if (r > 0)
+ pa_atomic_sub(&l->n_read, r);
}
_Y;
if (pa_atomic_load(&l->read_waiting)) {
char x = 'x';
_Y;
- write(l->read_fds[1], &x, sizeof(x));
+ if (write(l->read_fds[1], &x, sizeof(x)) > 0)
+ pa_atomic_inc(&l->n_written);
}
return 0;
/* First try failed. Let's wait for changes. */
- if (!wait)
+ if (!wait) {
+ /* Let's empty the FIFO from old notifications, before we return */
+
+ while (pa_atomic_load(&l->n_written) > 0) {
+ ssize_t r;
+ int x[20];
+
+ errno = 0;
+ if ((r = read(l->read_fds[0], x, sizeof(x))) <= 0 && errno != EINTR)
+ return NULL;
+
+ if (r > 0)
+ if (pa_atomic_sub(&l->n_written, r) <= r)
+ break;
+ }
+
return NULL;
+ }
_Y;
for (;;) {
char x[20];
+ ssize_t r;
_Y;
_Y;
- if (read(l->read_fds[0], x, sizeof(x)) < 0 && errno != EINTR) {
+ if ((r = read(l->read_fds[0], x, sizeof(x)) < 0) && errno != EINTR) {
pa_atomic_dec(&l->read_waiting);
return NULL;
}
+
+ if (r > 0)
+ pa_atomic_sub(&l->n_written, r);
}
_Y;
if (pa_atomic_load(&l->write_waiting)) {
char x = 'x';
_Y;
- write(l->write_fds[1], &x, sizeof(x));
+ if (write(l->write_fds[1], &x, sizeof(x)) >= 0)
+ pa_atomic_inc(&l->n_read);
}
return ret;
return 0;
}
-int pa_asyncq_after_poll(pa_asyncq *l) {
+void pa_asyncq_after_poll(pa_asyncq *l) {
pa_assert(l);
pa_assert(pa_atomic_load(&l->read_waiting) > 0);
pa_atomic_dec(&l->read_waiting);
+
+
+
}
int pa_asyncq_get_fd(pa_asyncq *q);
int pa_asyncq_before_poll(pa_asyncq *a);
-int pa_asyncq_after_poll(pa_asyncq *a);
+void pa_asyncq_after_poll(pa_asyncq *a);
#endif
static int pa_cli_command_move_sink_input(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
static int pa_cli_command_move_source_output(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
static int pa_cli_command_vacuum(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
+static int pa_cli_command_suspend_sink(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
+static int pa_cli_command_suspend_source(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
/* A method table for all available commands */
{ "load-module", pa_cli_command_load, "Load a module (args: name, arguments)", 3},
{ "unload-module", pa_cli_command_unload, "Unload a module (args: index)", 2},
{ "set-sink-volume", pa_cli_command_sink_volume, "Set the volume of a sink (args: index|name, volume)", 3},
- { "set-sink-input-volume", pa_cli_command_sink_input_volume, "Set the volume of a sink input (args: index|name, volume)", 3},
+ { "set-sink-input-volume", pa_cli_command_sink_input_volume, "Set the volume of a sink input (args: index, volume)", 3},
{ "set-source-volume", pa_cli_command_source_volume, "Set the volume of a source (args: index|name, volume)", 3},
- { "set-sink-mute", pa_cli_command_sink_mute, "Set the mute switch of a sink (args: index|name, mute)", 3},
- { "set-sink-input-mute", pa_cli_command_sink_input_mute, "Set the mute switch of a sink input (args: index|name, mute)", 3},
- { "set-source-mute", pa_cli_command_source_mute, "Set the mute switch of a source (args: index|name, mute)", 3},
+ { "set-sink-mute", pa_cli_command_sink_mute, "Set the mute switch of a sink (args: index|name, bool)", 3},
+ { "set-sink-input-mute", pa_cli_command_sink_input_mute, "Set the mute switch of a sink input (args: index, bool)", 3},
+ { "set-source-mute", pa_cli_command_source_mute, "Set the mute switch of a source (args: index|name, bool)", 3},
{ "set-default-sink", pa_cli_command_sink_default, "Set the default sink (args: index|name)", 2},
{ "set-default-source", pa_cli_command_source_default, "Set the default source (args: index|name)", 2},
{ "kill-client", pa_cli_command_kill_client, "Kill a client (args: index)", 2},
{ "move-sink-input", pa_cli_command_move_sink_input, "Move sink input to another sink (args: index, sink)", 3},
{ "move-source-output", pa_cli_command_move_source_output, "Move source output to another source (args: index, source)", 3},
{ "vacuum", pa_cli_command_vacuum, NULL, 1},
+ { "suspend-sink", pa_cli_command_suspend_sink, "Suspend sink (args: index|name, bool)", 3},
+ { "suspend-source", pa_cli_command_suspend_source, "Suspend source (args: index|name, bool)", 3},
{ NULL, NULL, NULL, 0 }
};
return 0;
}
+static int pa_cli_command_suspend_sink(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) {
+ const char *n, *m;
+ pa_sink *sink;
+ int suspend;
+
+ if (!(n = pa_tokenizer_get(t, 1))) {
+ pa_strbuf_puts(buf, "You need to specify a sink either by its name or its index.\n");
+ return -1;
+ }
+
+ if (!(m = pa_tokenizer_get(t, 2))) {
+ pa_strbuf_puts(buf, "You need to specify a suspend switch setting (0/1).\n");
+ return -1;
+ }
+
+ if (pa_atoi(m, &suspend) < 0) {
+ pa_strbuf_puts(buf, "Failed to parse suspend switch.\n");
+ return -1;
+ }
+
+ if (!(sink = pa_namereg_get(c, n, PA_NAMEREG_SINK, 1))) {
+ pa_strbuf_puts(buf, "No sink found by this name or index.\n");
+ return -1;
+ }
+
+ pa_sink_suspend(sink, suspend);
+ return 0;
+}
+
+static int pa_cli_command_suspend_source(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) {
+ const char *n, *m;
+ pa_source *source;
+ int suspend;
+
+ if (!(n = pa_tokenizer_get(t, 1))) {
+ pa_strbuf_puts(buf, "You need to specify a source either by its name or its index.\n");
+ return -1;
+ }
+
+ if (!(m = pa_tokenizer_get(t, 2))) {
+ pa_strbuf_puts(buf, "You need to specify a suspend switch setting (0/1).\n");
+ return -1;
+ }
+
+ if (pa_atoi(m, &suspend) < 0) {
+ pa_strbuf_puts(buf, "Failed to parse suspend switch.\n");
+ return -1;
+ }
+
+ if (!(source = pa_namereg_get(c, n, PA_NAMEREG_SOURCE, 1))) {
+ pa_strbuf_puts(buf, "No source found by this name or index.\n");
+ return -1;
+ }
+
+ pa_source_suspend(source, suspend);
+ return 0;
+}
+
static int pa_cli_command_dump(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, PA_GCC_UNUSED int *fail) {
pa_module *m;
pa_sink *sink;
return 0;
}
+
" %c index: %u\n"
"\tname: <%s>\n"
"\tdriver: <%s>\n"
- "\tis_hardware: <%i>\n"
+ "\tis hardware: <%i>\n"
"\tstate: %s\n"
"\tvolume: <%s>\n"
"\tmute: <%i>\n"
"\tlatency: <%0.0f usec>\n"
- "\tmonitor_source: <%u>\n"
+ "\tmonitor source: <%u>\n"
"\tsample spec: <%s>\n"
- "\tchannel map: <%s>\n",
+ "\tchannel map: <%s>\n"
+ "\tused by: <%u>\n",
c->default_sink_name && !strcmp(sink->name, c->default_sink_name) ? '*' : ' ',
sink->index,
sink->name,
(double) pa_sink_get_latency(sink),
sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
pa_sample_spec_snprint(ss, sizeof(ss), &sink->sample_spec),
- pa_channel_map_snprint(cm, sizeof(cm), &sink->channel_map));
+ pa_channel_map_snprint(cm, sizeof(cm), &sink->channel_map),
+ pa_sink_used_by(sink));
if (sink->module)
pa_strbuf_printf(s, "\tmodule: <%u>\n", sink->module->index);
" %c index: %u\n"
"\tname: <%s>\n"
"\tdriver: <%s>\n"
- "\tis_hardware: <%i>\n"
+ "\tis hardware: <%i>\n"
"\tstate: %s\n"
"\tvolume: <%s>\n"
"\tmute: <%u>\n"
"\tlatency: <%0.0f usec>\n"
"\tsample spec: <%s>\n"
- "\tchannel map: <%s>\n",
+ "\tchannel map: <%s>\n"
+ "\tused by: <%u>\n",
c->default_source_name && !strcmp(source->name, c->default_source_name) ? '*' : ' ',
source->index,
source->name,
!!pa_source_get_mute(source),
(double) pa_source_get_latency(source),
pa_sample_spec_snprint(ss, sizeof(ss), &source->sample_spec),
- pa_channel_map_snprint(cm, sizeof(cm), &source->channel_map));
+ pa_channel_map_snprint(cm, sizeof(cm), &source->channel_map),
+ pa_source_used_by(source));
if (source->monitor_of)
pa_strbuf_printf(s, "\tmonitor_of: <%u>\n", source->monitor_of->index);
}
/* Append a new subscription event to the subscription event queue and schedule a main loop event */
-void pa_subscription_post(pa_core *c, pa_subscription_event_type_t t, uint32_t index) {
+void pa_subscription_post(pa_core *c, pa_subscription_event_type_t t, uint32_t idx) {
pa_subscription_event *e;
assert(c);
continue;
/* not the same object */
- if (i->index != index)
+ if (i->index != idx)
continue;
if ((t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_REMOVE) {
e = pa_xnew(pa_subscription_event, 1);
e->core = c;
e->type = t;
- e->index = index;
+ e->index = idx;
PA_LLIST_INSERT_AFTER(pa_subscription_event, c->subscription_event_queue, c->subscription_event_last, e);
c->subscription_event_last = e;
#include "core.h"
+static PA_DEFINE_CHECK_TYPE(pa_core, core_check_type, pa_msgobject_check_type);
+
static int core_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
pa_core *c = PA_CORE(o);
/* Check whether there is a message for us to process */
while (pa_asyncmsgq_get(c->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
- pa_asyncmsgq_dispatch(object, code, data, &chunk);
- pa_asyncmsgq_done(c->asyncmsgq, 0);
+ int ret;
+
+ ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+ pa_asyncmsgq_done(c->asyncmsgq, ret);
}
if (pa_asyncmsgq_before_poll(c->asyncmsgq) == 0)
}
}
- c = pa_msgobject_new(pa_core);
+ c = pa_msgobject_new(pa_core, core_check_type);
c->parent.parent.free = core_free;
c->parent.process_msg = core_process_msg;
static void core_free(pa_object *o) {
pa_core *c = PA_CORE(o);
- pa_core_assert_ref(c);
+ pa_assert(c);
pa_module_unload_all(c);
assert(!c->modules);
pa_xfree(c->default_source_name);
pa_xfree(c->default_sink_name);
+ pa_asyncmsgq_after_poll(c->asyncmsgq);
+ pa_asyncmsgq_free(c->asyncmsgq);
+
pa_mempool_free(c->mempool);
pa_property_cleanup(c);
c->mainloop->io_free(c->asyncmsgq_event);
- pa_asyncmsgq_after_poll(c->asyncmsgq);
- pa_asyncmsgq_free(c->asyncmsgq);
pa_hook_free(&c->hook_sink_input_new);
pa_hook_free(&c->hook_sink_disconnect);
};
PA_DECLARE_CLASS(pa_core);
-#define PA_CORE(o) ((pa_core*) o)
+#define PA_CORE(o) pa_core_cast(o)
enum {
PA_CORE_MESSAGE_UNLOAD_MODULE,
};
void pa_log_set_ident(const char *p) {
- if (log_ident)
- pa_xfree(log_ident);
- if (log_ident_local)
- pa_xfree(log_ident_local);
+ pa_xfree(log_ident);
+ pa_xfree(log_ident_local);
log_ident = pa_xstrdup(p);
- log_ident_local = pa_utf8_to_locale(log_ident);
- if (!log_ident_local)
+ if (!(log_ident_local = pa_utf8_to_locale(log_ident)))
log_ident_local = pa_xstrdup(log_ident);
}
#include "msgobject.h"
-pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name) {
+PA_DEFINE_CHECK_TYPE(pa_msgobject, pa_msgobject_check_type, pa_object_check_type);
+
+pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name)) {
pa_msgobject *o;
pa_assert(size > sizeof(pa_msgobject));
pa_assert(type_name);
- o = PA_MSGOBJECT(pa_object_new_internal(size, type_name));
+ o = PA_MSGOBJECT(pa_object_new_internal(size, type_name, check_type ? check_type : pa_msgobject_check_type));
o->process_msg = NULL;
return o;
}
int (*process_msg)(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
};
-pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name);
+pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name));
-#define pa_msgobject_new(type) ((type*) pa_msgobject_new_internal(sizeof(type), #type))
+int pa_msgobject_check_type(pa_object *o, const char *type);
+
+#define pa_msgobject_new(type, check_type) ((type*) pa_msgobject_new_internal(sizeof(type), #type, check_type))
#define pa_msgobject_free ((void (*) (pa_msgobject* o)) pa_object_free)
-#define PA_MSGOBJECT(o) ((pa_msgobject*) (o))
+#define PA_MSGOBJECT(o) pa_msgobject_cast(o)
PA_DECLARE_CLASS(pa_msgobject);
#include "object.h"
-pa_object *pa_object_new_internal(size_t size, const char *type_name) {
+pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name)) {
pa_object *o;
pa_assert(size > sizeof(pa_object));
PA_REFCNT_INIT(o);
o->type_name = type_name;
o->free = pa_object_free;
+ o->check_type = check_type ? check_type : pa_object_check_type;
return o;
}
pa_object *pa_object_ref(pa_object *o) {
- pa_assert(o);
- pa_assert(PA_REFCNT_VALUE(o) >= 1);
+ pa_object_assert_ref(o);
PA_REFCNT_INC(o);
return o;
}
void pa_object_unref(pa_object *o) {
- pa_assert(o);
- pa_assert(PA_REFCNT_VALUE(o) >= 1);
+ pa_object_assert_ref(o);
if (PA_REFCNT_DEC(o) <= 0) {
pa_assert(o->free);
o->free(o);
}
}
+
+int pa_object_check_type(pa_object *o, const char *type_name) {
+ pa_assert(o);
+ pa_assert(type_name);
+
+ return type_name == "pa_object" || strcmp(type_name, "pa_object") == 0;
+}
USA.
***/
+#include <string.h>
#include <sys/types.h>
+
#include <pulse/xmalloc.h>
#include <pulsecore/refcnt.h>
#include <pulsecore/macro.h>
PA_REFCNT_DECLARE;
const char *type_name;
void (*free)(pa_object *o);
+ int (*check_type)(pa_object *o, const char *type_name);
};
-pa_object *pa_object_new_internal(size_t size, const char *type_name);
-#define pa_object_new(type) ((type*) pa_object_new_internal(sizeof(type), #type))
+pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name));
+#define pa_object_new(type, check_type) ((type*) pa_object_new_internal(sizeof(type), #type, check_type)
#define pa_object_free ((void (*) (pa_object* o)) pa_xfree)
+int pa_object_check_type(pa_object *o, const char *type);
+
+static inline int pa_object_isinstance(void *o) {
+ pa_object *obj = (pa_object*) o;
+ pa_assert(obj);
+ return obj->check_type(obj, "pa_object");
+}
+
pa_object *pa_object_ref(pa_object *o);
void pa_object_unref(pa_object *o);
return o ? PA_REFCNT_VALUE(o) : 0;
}
+static inline pa_object* pa_object_cast(void *o) {
+ pa_object *obj = (pa_object*) o;
+ pa_assert(obj->check_type(obj, "pa_object"));
+ return obj;
+}
+
#define pa_object_assert_ref(o) pa_assert(pa_object_refcnt(o))
-#define PA_OBJECT(o) ((pa_object*) (o))
-
-#define PA_DECLARE_CLASS(c) \
- static inline c* c##_ref(c *o) { \
- return (c*) pa_object_ref(PA_OBJECT(o)); \
- } \
- static inline void c##_unref(c* o) { \
- pa_object_unref(PA_OBJECT(o)); \
- } \
- static inline int c##_refcnt(c* o) { \
- return pa_object_refcnt(PA_OBJECT(o)); \
- } \
- static inline void c##_assert_ref(c *o) { \
- pa_object_assert_ref(PA_OBJECT(o)); \
- } \
+#define PA_OBJECT(o) pa_object_cast(o)
+
+#define PA_DECLARE_CLASS(c) \
+ static inline int c##_isinstance(void *o) { \
+ pa_object *obj = (pa_object*) o; \
+ pa_assert(obj); \
+ return obj->check_type(obj, #c); \
+ } \
+ static inline c* c##_cast(void *o) { \
+ pa_assert(c##_isinstance(o)); \
+ return (c*) o; \
+ } \
+ static inline c* c##_ref(c *o) { \
+ return (c*) pa_object_ref(PA_OBJECT(o)); \
+ } \
+ static inline void c##_unref(c* o) { \
+ pa_object_unref(PA_OBJECT(o)); \
+ } \
+ static inline int c##_refcnt(c* o) { \
+ return pa_object_refcnt(PA_OBJECT(o)); \
+ } \
+ static inline void c##_assert_ref(c *o) { \
+ pa_object_assert_ref(PA_OBJECT(o)); \
+ } \
+ struct __stupid_useless_struct_to_allow_trailing_semicolon
+
+#define PA_DEFINE_CHECK_TYPE(c, func, parent) \
+ int func(pa_object *o, const char *type) { \
+ pa_assert(o); \
+ pa_assert(type); \
+ if (type == #c || \
+ strcmp(type, #c) == 0) \
+ return 1; \
+ return parent(o, type); \
+ } \
struct __stupid_useless_struct_to_allow_trailing_semicolon
+
#endif
#include <pulsecore/namereg.h>
#include <pulsecore/log.h>
#include <pulsecore/core-error.h>
+#include <pulsecore/atomic.h>
#include "protocol-simple.h"
/* Don't allow more than this many concurrent connections */
#define MAX_CONNECTIONS 10
-struct connection {
+typedef struct connection {
+ pa_msgobject parent;
pa_protocol_simple *protocol;
pa_iochannel *io;
pa_sink_input *sink_input;
struct {
pa_memblock *current_memblock;
size_t memblock_index, fragment_size;
- pa_atomic_int missing;
+ pa_atomic_t missing;
} playback;
-};
+} connection;
+
+PA_DECLARE_CLASS(connection);
+#define CONNECTION(o) (connection_cast(o))
+static PA_DEFINE_CHECK_TYPE(connection, connection_check_type, pa_msgobject_check_type);
+
struct pa_protocol_simple {
pa_module *module;
pa_core *core;
pa_socket_server*server;
pa_idxset *connections;
- pa_asyncmsgq *asyncmsgq;
-
enum {
RECORD = 1,
PLAYBACK = 2,
};
enum {
- MESSAGE_REQUEST_DATA, /* data from source output to main loop */
- MESSAGE_POST_DATA /* data from source output to main loop */
+ MESSAGE_REQUEST_DATA, /* data requested from sink input from the main loop */
+ MESSAGE_POST_DATA, /* data from source output to main loop */
+ MESSAGE_DROP_CONNECTION /* Please drop a aconnection now */
};
#define RECORD_BUFFER_SECONDS (5)
#define RECORD_BUFFER_FRAGMENTS (100)
-static void connection_free(struct connection *c) {
+static void connection_free(pa_object *o) {
+ connection *c = CONNECTION(o);
pa_assert(c);
+ if (c->playback.current_memblock)
+ pa_memblock_unref(c->playback.current_memblock);
+
+ if (c->io)
+ pa_iochannel_free(c->io);
+ if (c->input_memblockq)
+ pa_memblockq_free(c->input_memblockq);
+ if (c->output_memblockq)
+ pa_memblockq_free(c->output_memblockq);
+
+ pa_xfree(c);
+}
+
+static void connection_drop(connection *c) {
+ pa_assert(c);
+
pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
if (c->sink_input) {
pa_sink_input_disconnect(c->sink_input);
pa_sink_input_unref(c->sink_input);
+ c->sink_input = NULL;
}
if (c->source_output) {
pa_source_output_disconnect(c->source_output);
pa_source_output_unref(c->source_output);
+ c->source_output = NULL;
}
- if (c->playback.current_memblock)
- pa_memblock_unref(c->playback.current_memblock);
-
- if (c->client)
+ if (c->client) {
pa_client_free(c->client);
- if (c->io)
- pa_iochannel_free(c->io);
- if (c->input_memblockq)
- pa_memblockq_free(c->input_memblockq);
- if (c->output_memblockq)
- pa_memblockq_free(c->output_memblockq);
+ c->client = NULL;
+ }
- pa_xfree(c);
+ connection_unref(c);
}
-static int do_read(struct connection *c) {
+static int do_read(connection *c) {
pa_memchunk chunk;
ssize_t r;
size_t l;
c->playback.memblock_index += r;
- pa_asyncmsgq_post(c->protocol->asyncmsgq, c, MESSAGE_POST_DATA, NULL, &chunk, NULL, NULL);
+ pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, &chunk, NULL);
return 0;
}
-static int do_write(struct connection *c) {
+static int do_write(connection *c) {
pa_memchunk chunk;
ssize_t r;
void *p;
- p_assert(c);
+ pa_assert(c);
if (!c->source_output)
return 0;
return 0;
}
-static void do_work(struct connection *c) {
+static void do_work(connection *c) {
pa_assert(c);
if (c->dead)
pa_memblockq_prebuf_disable(c->input_memblockq);
} else
- connection_free(c);
+ connection_drop(c);
+}
+
+static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) {
+ connection *c = CONNECTION(o);
+
+ connection_assert_ref(c);
+
+ switch (code) {
+ case MESSAGE_REQUEST_DATA:
+ do_work(c);
+ break;
+
+ case MESSAGE_POST_DATA:
+ pa_memblockq_push(c->output_memblockq, chunk);
+ do_work(c);
+ break;
+
+ case MESSAGE_DROP_CONNECTION:
+ connection_drop(c);
+ break;
+
+ }
+
+ return 0;
}
/*** sink_input callbacks ***/
/* Called from thread context */
-static int sink_input_process_msg(pa_sink_input *i, int code, void *userdata, const pa_memchunk *chunk) {
- struct connection*c;
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
+ pa_sink_input *i = PA_SINK_INPUT(o);
+ connection*c;
pa_assert(i);
c = i->userdata;
/* New data from the main loop */
pa_memblockq_push_align(c->input_memblockq, chunk);
+ pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq));
+
return 0;
}
}
default:
- return pa_sink_input_process_msg(i, code, userdata);
+ return pa_sink_input_process_msg(o, code, userdata, chunk);
}
}
/* Called from thread context */
static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
- struct connection*c;
+ connection*c;
+ int r;
pa_assert(i);
c = i->userdata;
r = pa_memblockq_peek(c->input_memblockq, chunk);
if (c->dead && r < 0)
- connection_free(c);
+ pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, c, NULL, NULL);
return r;
}
/* Called from thread context */
static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) {
- struct connection*c = i->userdata;
+ connection*c = i->userdata;
size_t old, new;
pa_assert(i);
pa_memblockq_drop(c->input_memblockq, chunk, length);
new = pa_memblockq_missing(c->input_memblockq);
- pa_atomic_store(&c->playback.missing, &new);
+ pa_atomic_store(&c->playback.missing, new);
if (new > old)
- pa_asyncmsgq_post(c->protocol->asyncmsgq, c, MESSAGE_REQUEST_DATA, NULL, NULL, NULL, NULL);
+ pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_REQUEST_DATA, NULL, NULL, NULL);
}
/* Called from main context */
pa_assert(i);
pa_assert(i->userdata);
- connection_free((struct connection *) i->userdata);
+ connection_drop((connection *) i->userdata);
}
/*** source_output callbacks ***/
static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
- struct connection *c;
+ connection *c;
pa_assert(o);
c = o->userdata;
pa_assert(c);
pa_assert(chunk);
- pa_asyncmsgq_post(c->protocol->asyncmsgq, c, MESSAGE_REQUEST_DATA, NULL, chunk, NULL, NULL);
+ pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_POST_DATA, NULL, chunk, NULL);
}
static void source_output_kill_cb(pa_source_output *o) {
- struct connection*c;
+ connection*c;
pa_assert(o);
c = o->userdata;
pa_assert(c);
- connection_free(c);
+ connection_drop(c);
}
static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
- struct connection*c;
+ connection*c;
pa_assert(o);
c = o->userdata;
/*** client callbacks ***/
static void client_kill_cb(pa_client *client) {
- struct connection*c;
+ connection*c;
pa_assert(client);
c = client->userdata;
pa_assert(c);
- connection_free(client);
+ connection_drop(c);
}
/*** pa_iochannel callbacks ***/
static void io_callback(pa_iochannel*io, void *userdata) {
- struct connection *c = userdata;
+ connection *c = userdata;
pa_assert(io);
pa_assert(c);
static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) {
pa_protocol_simple *p = userdata;
- struct connection *c = NULL;
+ connection *c = NULL;
char cname[256];
pa_assert(s);
return;
}
- c = pa_xnew(struct connection, 1);
+ c = pa_msgobject_new(connection, connection_check_type);
+ c->parent.parent.free = connection_free;
+ c->parent.process_msg = connection_process_msg;
c->io = io;
c->sink_input = NULL;
c->source_output = NULL;
c->client->kill = client_kill_cb;
c->client->userdata = c;
-
if (p->mode & PLAYBACK) {
pa_sink_input_new_data data;
size_t l;
goto fail;
}
+ c->sink_input->parent.process_msg = sink_input_process_msg;
c->sink_input->peek = sink_input_peek_cb;
c->sink_input->drop = sink_input_drop_cb;
c->sink_input->kill = sink_input_kill_cb;
- c->sink_input->get_latency = sink_input_get_latency_cb;
c->sink_input->userdata = c;
l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
NULL);
pa_assert(c->input_memblockq);
pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
- c->playback.fragment_size = l/10;
+ c->playback.fragment_size = l/PLAYBACK_BUFFER_FRAGMENTS;
pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq));
fail:
if (c)
- connection_free(c);
-}
-
-static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
- pa_protocol_simple *p = userdata;
- int do_some_work = 0;
-
- pa_assert(pa_asyncmsgq_get_fd(p->asyncmsgq) == fd);
- pa_assert(events == PA_IO_EVENT_INPUT);
-
- pa_asyncmsgq_after_poll(p->asyncmsgq);
-
- for (;;) {
- int code;
- void *object, *data;
-
- /* Check whether there is a message for us to process */
- while (pa_asyncmsgq_get(p->asyncmsgq, &object, &code, &data) == 0) {
-
- connection *c = object;
-
- pa_assert(c);
-
- switch (code) {
-
- case MESSAGE_REQUEST_DATA:
- do_work(c);
- break;
-
- case MESSAGE_POST_DATA:
- pa_memblockq_push(c->output_memblockq, chunk);
- do_work(c);
- break;
- }
-
- pa_asyncmsgq_done(p->asyncmsgq);
- }
-
- if (pa_asyncmsgq_before_poll(p->asyncmsgq) == 0)
- break;
- }
+ connection_drop(c);
}
pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) {
p->core = core;
p->server = server;
p->connections = pa_idxset_new(NULL, NULL);
- pa_assert_se(p->asyncmsgq = pa_asyncmsgq_new(0));
p->sample_spec = core->default_sample_spec;
if (pa_modargs_get_sample_spec(ma, &p->sample_spec) < 0) {
pa_socket_server_set_callback(p->server, on_connection, p);
- pa_assert_se(pa_asyncmsgq_before_poll(p->asyncmsgq) == 0);
- pa_assert_se(p->asyncmsgq_event = core->mainloop->io_event_new(core->mainloop, pa_asyncmsgq_get_fd(p->asyncmsgq), PA_IO_EVENT_INPUT, p));
-
return p;
fail:
void pa_protocol_simple_free(pa_protocol_simple *p) {
- struct connection *c;
+ connection *c;
pa_assert(p);
if (p->connections) {
while((c = pa_idxset_first(p->connections, NULL)))
- connection_free(c);
+ connection_drop(c);
pa_idxset_free(p->connections, NULL, NULL);
}
if (p->server)
pa_socket_server_unref(p->server);
- if (p->asyncmsgq) {
- c->mainloop->io_event_free(c->asyncmsgq_event);
- pa_asyncmsgq_after_poll(c->asyncmsgq);
- pa_asyncmsgq_free(p->asyncmsgq);
- }
-
pa_xfree(p);
}
void (*impl_free)(pa_resampler *r);
void (*impl_update_input_rate)(pa_resampler *r, uint32_t rate);
+ void (*impl_update_output_rate)(pa_resampler *r, uint32_t rate);
void (*impl_run)(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out);
void *impl_data;
};
r->impl_update_input_rate(r, rate);
}
+void pa_resampler_set_output_rate(pa_resampler *r, uint32_t rate) {
+ assert(r);
+ assert(rate > 0);
+
+ if (r->o_ss.rate == rate)
+ return;
+
+ r->o_ss.rate = rate;
+
+ if (r->impl_update_output_rate)
+ r->impl_update_output_rate(r, rate);
+}
+
void pa_resampler_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out) {
assert(r && in && out && r->impl_run);
}
}
+
+static void libsamplerate_update_output_rate(pa_resampler *r, uint32_t rate) {
+ struct impl_libsamplerate *u;
+
+ assert(r);
+ assert(rate > 0);
+ assert(r->impl_data);
+ u = r->impl_data;
+
+ if (!u->src_state) {
+ int err;
+ u->src_state = src_new(r->resample_method, r->o_ss.channels, &err);
+ assert(u->src_state);
+ } else {
+ int ret = src_set_ratio(u->src_state, (double) rate / r->i_ss.rate);
+ assert(ret == 0);
+ }
+}
+
static int libsamplerate_init(pa_resampler *r) {
struct impl_libsamplerate *u = NULL;
int err;
r->impl_free = libsamplerate_free;
r->impl_update_input_rate = libsamplerate_update_input_rate;
+ r->impl_update_output_rate = libsamplerate_update_output_rate;
r->impl_run = libsamplerate_run;
calc_map_table(r);
pa_xfree(r->impl_data);
}
-static void trivial_update_input_rate(pa_resampler *r, uint32_t rate) {
+static void trivial_update_rate(pa_resampler *r, uint32_t rate) {
struct impl_trivial *u;
assert(r);
r->impl_run = trivial_run;
r->impl_free = trivial_free;
- r->impl_update_input_rate = trivial_update_input_rate;
+ r->impl_update_input_rate = trivial_update_rate;
+ r->impl_update_output_rate = trivial_update_rate;
return 0;
}
/* Change the input rate of the resampler object */
void pa_resampler_set_input_rate(pa_resampler *r, uint32_t rate);
+/* Change the output rate of the resampler object */
+void pa_resampler_set_output_rate(pa_resampler *r, uint32_t rate);
+
/* Return the resampling method of the resampler object */
pa_resample_method_t pa_resampler_get_method(pa_resampler *r);
#define MOVE_BUFFER_LENGTH (1024*1024)
#define SILENCE_BUFFER_LENGTH (64*1024)
-static void sink_input_free(pa_msgobject *o);
+static PA_DEFINE_CHECK_TYPE(pa_sink_input, sink_input_check_type, pa_msgobject_check_type);
+
+static void sink_input_free(pa_object *o);
pa_sink_input_new_data* pa_sink_input_new_data_init(pa_sink_input_new_data *data) {
pa_assert(data);
data->resample_method = pa_resampler_get_method(resampler);
}
- i = pa_msgobject_new(pa_sink_input);
-
+ i = pa_msgobject_new(pa_sink_input, sink_input_check_type);
i->parent.parent.free = sink_input_free;
i->parent.process_msg = pa_sink_input_process_msg;
i->core = core;
- pa_atomic_load(&i->state, PA_SINK_INPUT_DRAINED);
+ pa_atomic_store(&i->state, PA_SINK_INPUT_DRAINED);
i->flags = flags;
i->name = pa_xstrdup(data->name);
i->driver = pa_xstrdup(data->driver);
i->userdata = NULL;
i->thread_info.silence_memblock = NULL;
- i->thread_info.move_silence = 0;
+/* i->thread_info.move_silence = 0; */
pa_memchunk_reset(&i->thread_info.resampled_chunk);
i->thread_info.resampler = resampler;
- i->thread_info.soft_volume = i->volume;
- i->thread_info.soft_muted = i->muted;
+ i->thread_info.volume = i->volume;
+ i->thread_info.muted = i->muted;
pa_assert_se(pa_idxset_put(core->sink_inputs, i, &i->index) == 0);
pa_assert_se(pa_idxset_put(i->sink->inputs, i, NULL) == 0);
pa_assert(i);
pa_return_if_fail(pa_sink_input_get_state(i) != PA_SINK_INPUT_DISCONNECTED);
- pa_asyncmsgq_send(i->sink->asyncmsgq, i->sink, PA_SINK_MESSAGE_REMOVE_INPUT, i, NULL);
+ pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_REMOVE_INPUT, i, NULL);
pa_idxset_remove_by_data(i->sink->core->sink_inputs, i, NULL);
pa_idxset_remove_by_data(i->sink->inputs, i, NULL);
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_REMOVE, i->index);
- i->sink = NULL;
+ pa_sink_update_status(i->sink);
+
+ i->sink = NULL;
i->process_msg = NULL;
i->peek = NULL;
i->drop = NULL;
i->get_latency = NULL;
i->underrun = NULL;
- pa_atomic_load(&i->state, PA_SINK_INPUT_DISCONNECTED);
+ pa_atomic_store(&i->state, PA_SINK_INPUT_DISCONNECTED);
}
-static void sink_input_free(pa_msgobject *o) {
+static void sink_input_free(pa_object *o) {
pa_sink_input* i = PA_SINK_INPUT(o);
pa_assert(i);
pa_log_info("Freeing output %u \"%s\"", i->index, i->name);
- if (i->resampled_chunk.memblock)
- pa_memblock_unref(i->resampled_chunk.memblock);
+ if (i->thread_info.resampled_chunk.memblock)
+ pa_memblock_unref(i->thread_info.resampled_chunk.memblock);
if (i->thread_info.resampler)
pa_resampler_free(i->thread_info.resampler);
i->thread_info.volume = i->volume;
i->thread_info.muted = i->muted;
- pa_asyncmsgq_post(i->sink->asyncmsgq, i->sink, PA_SINK_MESSAGE_ADD_INPUT, i, NULL, pa_sink_unref, pa_sink_input_unref);
+ pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, pa_sink_input_ref(i), NULL, (pa_free_cb_t) pa_sink_input_unref);
pa_sink_update_status(i->sink);
- pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, i->index);
+ pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, i->index);
}
void pa_sink_input_kill(pa_sink_input*i) {
pa_sink_input_assert_ref(i);
- if (pa_asyncmsgq_send(i->sink->asyncmsgq, i->sink, PA_SINK_INPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
+ if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
r = 0;
if (i->get_latency)
/* goto finish; */
/* } */
- if (!i->resampler) {
+ if (!i->thread_info.resampler) {
do_volume_adj_here = 0;
ret = i->peek(i, chunk);
goto finish;
}
do_volume_adj_here = !pa_channel_map_equal(&i->channel_map, &i->sink->channel_map);
- volume_is_norm = pa_cvolume_is_norm(&i->thread_info.soft_volume) && !i->thread_info.soft_muted;
+ volume_is_norm = pa_cvolume_is_norm(&i->thread_info.volume) && !i->thread_info.muted;
while (!i->thread_info.resampled_chunk.memblock) {
pa_memchunk tchunk;
pa_assert(tchunk.length);
- l = pa_resampler_request(i->resampler, CONVERT_BUFFER_LENGTH);
+ l = pa_resampler_request(i->thread_info.resampler, CONVERT_BUFFER_LENGTH);
if (l > tchunk.length)
l = tchunk.length;
/* It might be necessary to adjust the volume here */
if (do_volume_adj_here && !volume_is_norm) {
pa_memchunk_make_writable(&tchunk, 0);
- pa_volume_memchunk(&tchunk, &i->sample_spec, &i->thread_info.soft_volume);
+ pa_volume_memchunk(&tchunk, &i->sample_spec, &i->thread_info.volume);
}
- pa_resampler_run(i->resampler, &tchunk, &i->thread_info.resampled_chunk);
+ pa_resampler_run(i->thread_info.resampler, &tchunk, &i->thread_info.resampled_chunk);
pa_memblock_unref(tchunk.memblock);
}
if (ret >= 0)
pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_RUNNING);
- else if (ret < 0 && i->state == PA_SINK_INPUT_RUNNING)
+ else if (ret < 0 && state == PA_SINK_INPUT_RUNNING)
pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_DRAINED);
if (ret >= 0) {
/* return; */
/* } */
- if (!i->resampler) {
+ if (!i->thread_info.resampler) {
if (i->drop)
i->drop(i, chunk, length);
return;
i->volume = *volume;
- pa_asyncmsgq_post(s->asyncmsgq, pa_sink_input_ref(i), PA_SINK_INPUT_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), pa_sink_input_unref, pa_xfree);
+ pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree);
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
}
i->muted = mute;
- pa_asyncmsgq_post(s->asyncmsgq, pa_sink_input_ref(i), PA_SINK_INPUT_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), pa_sink_input_unref, NULL);
+ pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
}
int pa_sink_input_get_mute(pa_sink_input *i) {
pa_sink_input_assert_ref(i);
- return !!i->mute;
+ return !!i->muted;
}
void pa_sink_input_cork(pa_sink_input *i, int b) {
- int n;
pa_sink_input_state_t state;
pa_sink_input_assert_ref(i);
pa_assert(state != PA_SINK_INPUT_DISCONNECTED);
if (b && state != PA_SINK_INPUT_CORKED)
- pa_atomic_store(i->state, PA_SINK_INPUT_CORKED);
+ pa_atomic_store(&i->state, PA_SINK_INPUT_CORKED);
else if (!b && state == PA_SINK_INPUT_CORKED)
- pa_atomic_cmpxchg(i->state, state, PA_SINK_INPUT_DRAINED);
+ pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_DRAINED);
}
int pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate) {
pa_sink_input_assert_ref(i);
- pa_return_val_if_fail(u->thread_info.resampler, -1);
+ pa_return_val_if_fail(i->thread_info.resampler, -1);
if (i->sample_spec.rate == rate)
return 0;
i->sample_spec.rate = rate;
- pa_asyncmsgq_post(s->asyncmsgq, pa_sink_input_ref(i), PA_SINK_INPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, pa_sink_input_unref, NULL);
+ pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, NULL);
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
- return 0
+ return 0;
}
void pa_sink_input_set_name(pa_sink_input *i, const char *name) {
}
int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
- pa_resampler *new_resampler = NULL;
- pa_memblockq *buffer = NULL;
- pa_sink *origin;
+/* pa_resampler *new_resampler = NULL; */
+/* pa_memblockq *buffer = NULL; */
+/* pa_sink *origin; */
pa_sink_input_assert_ref(i);
pa_sink_assert_ref(dest);
switch (code) {
case PA_SINK_INPUT_MESSAGE_SET_VOLUME:
- s->thread_info.soft_volume = *((pa_cvolume*) userdata);
+ i->thread_info.volume = *((pa_cvolume*) userdata);
return 0;
case PA_SINK_INPUT_MESSAGE_SET_MUTE:
- s->thread_info.soft_muted = PA_PTR_TO_UINT(userdata);
+ i->thread_info.muted = PA_PTR_TO_UINT(userdata);
return 0;
case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
pa_usec_t *r = userdata;
if (i->thread_info.resampled_chunk.memblock)
- *r += pa_bytes_to_usec(i->resampled_chunk.length, &i->sink->sample_spec);
+ *r += pa_bytes_to_usec(i->thread_info.resampled_chunk.length, &i->sink->sample_spec);
/* if (i->move_silence) */
/* r += pa_bytes_to_usec(i->move_silence, &i->sink->sample_spec); */
case PA_SINK_INPUT_MESSAGE_SET_RATE: {
i->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata);
- pa_resampler_set_input_rate(i->resampler, PA_PTR_TO_UINT(userdata));
+ pa_resampler_set_input_rate(i->thread_info.resampler, PA_PTR_TO_UINT(userdata));
return 0;
}
};
PA_DECLARE_CLASS(pa_sink_input);
-#define PA_SINK_INPUT(o) ((pa_sink_input*) (o))
+#define PA_SINK_INPUT(o) pa_sink_input_cast(o)
enum {
PA_SINK_INPUT_MESSAGE_SET_VOLUME,
void pa_sink_input_cork(pa_sink_input *i, int b);
-void pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate);
+int pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate);
pa_resample_method_t pa_sink_input_get_resample_method(pa_sink_input *i);
#define MAX_MIX_CHANNELS 32
+static PA_DEFINE_CHECK_TYPE(pa_sink, sink_check_type, pa_msgobject_check_type);
+
static void sink_free(pa_object *s);
pa_sink* pa_sink_new(
pa_return_null_if_fail(!driver || pa_utf8_valid(driver));
pa_return_null_if_fail(name && pa_utf8_valid(name) && *name);
- s = pa_msgobject_new(pa_sink);
+ s = pa_msgobject_new(pa_sink, sink_check_type);
if (!(name = pa_namereg_register(core, name, PA_NAMEREG_SINK, s, fail))) {
pa_xfree(s);
s->parent.process_msg = pa_sink_process_msg;
s->core = core;
- pa_atomic_store(&s->state, PA_SINK_IDLE);
+ s->state = PA_SINK_IDLE;
s->name = pa_xstrdup(name);
s->description = NULL;
s->driver = pa_xstrdup(driver);
s->get_volume = NULL;
s->set_mute = NULL;
s->get_mute = NULL;
- s->start = NULL;
- s->stop = NULL;
+ s->set_state = NULL;
s->userdata = NULL;
- pa_assert_se(s->asyncmsgq = pa_asyncmsgq_new(0));
+ s->asyncmsgq = NULL;
r = pa_idxset_put(core->sinks, s, &s->index);
pa_assert(s->index != PA_IDXSET_INVALID && r >= 0);
s->thread_info.inputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
s->thread_info.soft_volume = s->volume;
s->thread_info.soft_muted = s->muted;
+ s->thread_info.state = s->state;
pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_NEW, s->index);
return s;
}
-static void sink_start(pa_sink *s) {
- pa_sink_state_t state;
+static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
+ int ret;
+
pa_assert(s);
- state = pa_sink_get_state(s);
- pa_return_if_fail(state == PA_SINK_IDLE || state == PA_SINK_SUSPENDED);
-
- pa_atomic_store(&s->state, PA_SINK_RUNNING);
-
- if (s->start)
- s->start(s);
- else
- pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_START, NULL, NULL, NULL);
-}
-
-static void sink_stop(pa_sink *s) {
- pa_sink_state_t state;
- int stop;
+ if (s->state == state)
+ return 0;
- pa_assert(s);
- state = pa_sink_get_state(s);
- pa_return_if_fail(state == PA_SINK_RUNNING || state == PA_SINK_SUSPENDED);
+ if (s->set_state)
+ if ((ret = s->set_state(s, state)) < 0)
+ return -1;
- stop = state == PA_SINK_RUNNING;
- pa_atomic_store(&s->state, PA_SINK_IDLE);
+ if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+ return -1;
- if (stop) {
- if (s->stop)
- s->stop(s);
- else
- pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_STOP, NULL, NULL, NULL);
- }
+ s->state = state;
+ return 0;
}
void pa_sink_disconnect(pa_sink* s) {
pa_sink_input *i, *j = NULL;
pa_assert(s);
- pa_return_if_fail(pa_sink_get_state(s) != PA_SINK_DISCONNECTED);
-
- sink_stop(s);
+ pa_return_if_fail(s->state != PA_SINK_DISCONNECTED);
- pa_atomic_store(&s->state, PA_SINK_DISCONNECTED);
pa_namereg_unregister(s->core, s->name);
+ pa_idxset_remove_by_data(s->core->sinks, s, NULL);
pa_hook_fire(&s->core->hook_sink_disconnect, s);
if (s->monitor_source)
pa_source_disconnect(s->monitor_source);
- pa_idxset_remove_by_data(s->core->sinks, s, NULL);
+ sink_set_state(s, PA_SINK_DISCONNECTED);
s->get_latency = NULL;
s->get_volume = NULL;
s->set_volume = NULL;
s->set_mute = NULL;
s->get_mute = NULL;
- s->start = NULL;
- s->stop = NULL;
+ s->set_state = NULL;
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_REMOVE, s->index);
}
static void sink_free(pa_object *o) {
pa_sink *s = PA_SINK(o);
+ pa_sink_input *i;
pa_assert(s);
pa_assert(pa_sink_refcnt(s) == 0);
- pa_sink_disconnect(s);
+ if (s->state != PA_SINK_DISCONNECTED)
+ pa_sink_disconnect(s);
pa_log_info("Freeing sink %u \"%s\"", s->index, s->name);
pa_idxset_free(s->inputs, NULL, NULL);
- pa_hashmap_free(s->thread_info.inputs, (pa_free2_cb_t) pa_sink_input_unref, NULL);
-
- pa_asyncmsgq_free(s->asyncmsgq);
+ while ((i = pa_hashmap_steal_first(s->thread_info.inputs)))
+ pa_sink_input_unref(i);
+
+ pa_hashmap_free(s->thread_info.inputs, NULL, NULL);
pa_xfree(s->name);
pa_xfree(s->description);
pa_xfree(s);
}
-void pa_sink_update_status(pa_sink*s) {
+void pa_sink_set_asyncmsgq(pa_sink *s, pa_asyncmsgq *q) {
pa_sink_assert_ref(s);
+ pa_assert(q);
- if (pa_sink_get_state(s) == PA_SINK_SUSPENDED)
- return;
+ s->asyncmsgq = q;
- if (pa_sink_used_by(s) > 0)
- sink_start(s);
- else
- sink_stop(s);
+ if (s->monitor_source)
+ pa_source_set_asyncmsgq(s->monitor_source, q);
}
-void pa_sink_suspend(pa_sink *s, int suspend) {
- pa_sink_state_t state;
-
+int pa_sink_update_status(pa_sink*s) {
pa_sink_assert_ref(s);
- state = pa_sink_get_state(s);
- pa_return_if_fail(suspend && (state == PA_SINK_RUNNING || state == PA_SINK_IDLE));
- pa_return_if_fail(!suspend && (state == PA_SINK_SUSPENDED));
+ if (s->state == PA_SINK_SUSPENDED)
+ return 0;
+ return sink_set_state(s, pa_sink_used_by(s) ? PA_SINK_RUNNING : PA_SINK_IDLE);
+}
- if (suspend) {
- pa_atomic_store(&s->state, PA_SINK_SUSPENDED);
+int pa_sink_suspend(pa_sink *s, int suspend) {
+ pa_sink_assert_ref(s);
- if (s->stop)
- s->stop(s);
- else
- pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_STOP, NULL, NULL, NULL);
+ if (suspend)
+ return sink_set_state(s, PA_SINK_SUSPENDED);
+ else
+ return sink_set_state(s, pa_sink_used_by(s) ? PA_SINK_RUNNING : PA_SINK_IDLE);
+}
- } else {
- pa_atomic_store(&s->state, PA_SINK_RUNNING);
+void pa_sink_ping(pa_sink *s) {
+ pa_sink_assert_ref(s);
- if (s->start)
- s->start(s);
- else
- pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_START, NULL, NULL, NULL);
- }
+ pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_PING, NULL, NULL, NULL);
}
static unsigned fill_mix_info(pa_sink *s, pa_mix_info *info, unsigned maxinfo) {
pa_sink *s = PA_SINK(o);
pa_sink_assert_ref(s);
- switch (code) {
+ switch ((pa_sink_message_t) code) {
case PA_SINK_MESSAGE_ADD_INPUT: {
pa_sink_input *i = userdata;
pa_hashmap_put(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index), pa_sink_input_ref(i));
*((int*) userdata) = s->thread_info.soft_muted;
return 0;
- default:
- return -1;
+ case PA_SINK_MESSAGE_PING:
+ return 0;
+
+ case PA_SINK_MESSAGE_SET_STATE:
+ s->thread_info.state = PA_PTR_TO_UINT(userdata);
+ return 0;
+
+ case PA_SINK_MESSAGE_GET_LATENCY:
+ case PA_SINK_MESSAGE_MAX:
+ ;
}
+
+ return -1;
}
uint32_t index;
pa_core *core;
- pa_atomic_t state;
+ pa_sink_state_t state;
char *name;
char *description, *driver; /* may be NULL */
int refresh_volume;
int refresh_mute;
- int (*start)(pa_sink *s);
- int (*stop)(pa_sink *s);
+ int (*set_state)(pa_sink *s, pa_sink_state_t state);
int (*set_volume)(pa_sink *s); /* dito */
int (*get_volume)(pa_sink *s); /* dito */
int (*get_mute)(pa_sink *s); /* dito */
/* Contains copies of the above data so that the real-time worker
* thread can work without access locking */
struct {
+ pa_sink_state_t state;
pa_hashmap *inputs;
pa_cvolume soft_volume;
int soft_muted;
};
PA_DECLARE_CLASS(pa_sink);
-#define PA_SINK(s) ((pa_sink*) (s))
+#define PA_SINK(s) (pa_sink_cast(s))
typedef enum pa_sink_message {
PA_SINK_MESSAGE_ADD_INPUT,
PA_SINK_MESSAGE_GET_MUTE,
PA_SINK_MESSAGE_SET_MUTE,
PA_SINK_MESSAGE_GET_LATENCY,
- PA_SINK_MESSAGE_START,
- PA_SINK_MESSAGE_STOP,
+ PA_SINK_MESSAGE_SET_STATE,
+ PA_SINK_MESSAGE_PING,
PA_SINK_MESSAGE_MAX
} pa_sink_message_t;
void pa_sink_set_module(pa_sink *sink, pa_module *m);
void pa_sink_set_description(pa_sink *s, const char *description);
+void pa_sink_set_asyncmsgq(pa_sink *s, pa_asyncmsgq *q);
/* Usable by everyone */
pa_usec_t pa_sink_get_latency(pa_sink *s);
-void pa_sink_update_status(pa_sink*s);
-void pa_sink_suspend(pa_sink *s, int suspend);
+int pa_sink_update_status(pa_sink*s);
+int pa_sink_suspend(pa_sink *s, int suspend);
+
+/* Sends a ping message to the sink thread, to make it wake up and
+ * check for data to process even if there is no real message is
+ * sent */
+void pa_sink_ping(pa_sink *s);
void pa_sink_set_volume(pa_sink *sink, const pa_cvolume *volume);
const pa_cvolume *pa_sink_get_volume(pa_sink *sink);
int pa_sink_get_mute(pa_sink *sink);
unsigned pa_sink_used_by(pa_sink *s);
-#define pa_sink_get_state(s) ((pa_sink_state_t) pa_atomic_load(&(s)->state))
+#define pa_sink_get_state(s) ((s)->state)
/* To be used exclusively by the sink driver thread */
void pa_sink_render_into_full(pa_sink *s, pa_memchunk *target);
int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
-
+
#endif
u->sink_input->kill = sink_input_kill;
u->sink_input->userdata = u;
- pa_sink_notify(u->sink_input->sink);
+/* pa_sink_notify(u->sink_input->sink); */
return 0;
#include "source-output.h"
+static PA_DEFINE_CHECK_TYPE(pa_source_output, source_output_check_type, pa_msgobject_check_type);
+
+static void source_output_free(pa_object* mo);
+
pa_source_output_new_data* pa_source_output_new_data_init(pa_source_output_new_data *data) {
pa_assert(data);
data->resample_method = pa_resampler_get_method(resampler);
}
- o = pa_source_output_new(pa_source_output);
-
+ o = pa_msgobject_new(pa_source_output, source_output_check_type);
o->parent.parent.free = source_output_free;
o->parent.process_msg = pa_source_output_process_msg;
o->core = core;
- pa_atomic_load(&o->state, PA_SOURCE_OUTPUT_RUNNING);
+ pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_RUNNING);
o->flags = flags;
o->name = pa_xstrdup(data->name);
o->driver = pa_xstrdup(data->driver);
void pa_source_output_disconnect(pa_source_output*o) {
pa_assert(o);
- pa_return_if_fail(pa_source_output_get_state(i) != PA_SOURCE_OUTPUT_DISCONNECTED);
+ pa_return_if_fail(pa_source_output_get_state(o) != PA_SOURCE_OUTPUT_DISCONNECTED);
pa_assert(o->source);
pa_assert(o->source->core);
- pa_asyncmsgq_send(i->sink->asyncmsgq, i->sink, PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, NULL);
+ pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, NULL);
pa_idxset_remove_by_data(o->source->core->source_outputs, o, NULL);
pa_idxset_remove_by_data(o->source->outputs, o, NULL);
pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_REMOVE, o->index);
- o->source = NULL;
+ pa_source_update_status(o->source);
+
+ o->source = NULL;
o->process_msg = NULL;
o->push = NULL;
o->kill = NULL;
o->get_latency = NULL;
- pa_atomic_load(&i->state, PA_SOURCE_OUTPUT_DISCONNECTED);
+ pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_DISCONNECTED);
}
-static void source_output_free(pa_msgobject* mo) {
+static void source_output_free(pa_object* mo) {
pa_source_output *o = PA_SOURCE_OUTPUT(mo);
pa_assert(pa_source_output_refcnt(o) == 0);
void pa_source_output_put(pa_source_output *o) {
pa_source_output_assert_ref(o);
- pa_asyncmsgq_post(o->source->asyncmsgq, o->source, PA_SOURCE_MESSAGE_ADD_OUTPUT, o, NULL, pa_source_unref, pa_source_output_unref);
+ pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, pa_source_output_ref(o), NULL, (pa_free_cb_t) pa_source_output_unref);
pa_source_update_status(o->source);
- pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index);
+ pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index);
}
void pa_source_output_kill(pa_source_output*o) {
pa_source_output_assert_ref(o);
- if (pa_asyncmsgq_send(o->source->asyncmsgq, i->source, PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
+ if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
r = 0;
if (o->get_latency)
pa_assert(state = PA_SOURCE_OUTPUT_RUNNING);
- if (!o->resampler) {
+ if (!o->thread_info.resampler) {
o->push(o, chunk);
return;
}
- pa_resampler_run(o->resampler, chunk, &rchunk);
+ pa_resampler_run(o->thread_info.resampler, chunk, &rchunk);
if (!rchunk.length)
return;
}
void pa_source_output_cork(pa_source_output *o, int b) {
- int n;
pa_source_output_state_t state;
pa_source_output_assert_ref(o);
pa_assert(state != PA_SOURCE_OUTPUT_DISCONNECTED);
if (b && state != PA_SOURCE_OUTPUT_CORKED)
- pa_atomic_store(o->state, PA_SOURCE_OUTPUT_CORKED);
+ pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_CORKED);
else if (!b && state == PA_SOURCE_OUTPUT_CORKED)
- pa_atomic_cmpxchg(o->state, state, PA_SOURCE_OUTPUT_RUNNING);
+ pa_atomic_cmpxchg(&o->state, state, PA_SOURCE_OUTPUT_RUNNING);
}
int pa_source_output_set_rate(pa_source_output *o, uint32_t rate) {
pa_source_output_assert_ref(o);
pa_return_val_if_fail(o->thread_info.resampler, -1);
- if (i->sample_spec.rate == rate)
+ if (o->sample_spec.rate == rate)
return 0;
- i->sample_spec.rate = rate;
+ o->sample_spec.rate = rate;
- pa_asyncmsgq_post(s->asyncmsgq, pa_source_output_ref(i), PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, pa_source_output_unref, NULL);
+ pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, NULL);
- pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT!|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
+ pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, o->index);
return 0;
}
}
int pa_source_output_move_to(pa_source_output *o, pa_source *dest) {
- pa_source *origin;
- pa_resampler *new_resampler = NULL;
+/* pa_source *origin; */
+/* pa_resampler *new_resampler = NULL; */
pa_source_output_assert_ref(o);
pa_source_assert_ref(dest);
/* else if (!pa_sample_spec_equal(&o->sample_spec, &dest->sample_spec) || */
/* !pa_channel_map_equal(&o->channel_map, &dest->channel_map)) { */
-/* /\* Okey, we need a new resampler for the new sink *\/ */
+/* /\* Okey, we need a new resampler for the new source *\/ */
/* if (!(new_resampler = pa_resampler_new( */
/* dest->core->mempool, */
}
int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, pa_memchunk* chunk) {
- pa_source_output *o = PA_SOURCE_OUTPUT(o);
+ pa_source_output *o = PA_SOURCE_OUTPUT(mo);
- pa_source_output_assert_ref(i);
+ pa_source_output_assert_ref(o);
switch (code) {
case PA_SOURCE_OUTPUT_MESSAGE_SET_RATE: {
- i->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata);
- pa_resampler_set_output_rate(i->resampler, PA_PTR_TO_UINT(userdata));
+ o->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata);
+ pa_resampler_set_output_rate(o->thread_info.resampler, PA_PTR_TO_UINT(userdata));
return 0;
}
};
PA_DECLARE_CLASS(pa_source_output);
-#define PA_SOURCE_OUTPUT(o) ((pa_source_output*) (o))
+#define PA_SOURCE_OUTPUT(o) pa_source_output_cast(o)
enum {
PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY,
void pa_source_output_cork(pa_source_output *i, int b);
-void pa_source_output_set_rate(pa_source_output *o, uint32_t rate);
+int pa_source_output_set_rate(pa_source_output *o, uint32_t rate);
pa_resample_method_t pa_source_output_get_resample_method(pa_source_output *o);
#include "source.h"
+static PA_DEFINE_CHECK_TYPE(pa_source, source_check_type, pa_msgobject_check_type);
+
+static void source_free(pa_object *o);
+
pa_source* pa_source_new(
pa_core *core,
const char *driver,
pa_return_null_if_fail(!driver || pa_utf8_valid(driver));
pa_return_null_if_fail(pa_utf8_valid(name) && *name);
- s = pa_msgobject_new(pa_source);
+ s = pa_msgobject_new(pa_source, source_check_type);
if (!(name = pa_namereg_register(core, name, PA_NAMEREG_SOURCE, s, fail))) {
pa_xfree(s);
s->parent.process_msg = pa_source_process_msg;
s->core = core;
- pa_atomic_store(&s->state, PA_SOURCE_IDLE);
+ s->state = PA_SOURCE_IDLE;
s->name = pa_xstrdup(name);
s->description = NULL;
s->driver = pa_xstrdup(driver);
pa_cvolume_reset(&s->volume, spec->channels);
s->muted = 0;
- s->refresh_volume = s->refresh_mute = 0;
+ s->refresh_volume = s->refresh_muted = 0;
s->is_hardware = 0;
s->get_volume = NULL;
s->set_mute = NULL;
s->get_mute = NULL;
- s->start = NULL;
- s->stop = NULL;
+ s->set_state = NULL;
s->userdata = NULL;
- pa_assert_se(s->asyncmsgq = pa_asyncmsgq_new(0));
+ s->asyncmsgq = NULL;
r = pa_idxset_put(core->sources, s, &s->index);
assert(s->index != PA_IDXSET_INVALID && r >= 0);
s->thread_info.outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
s->thread_info.soft_volume = s->volume;
s->thread_info.soft_muted = s->muted;
+ s->thread_info.state = s->state;
pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_NEW, s->index);
return s;
}
-static void source_start(pa_source *s) {
- pa_source_state_t state;
+static int source_set_state(pa_source *s, pa_source_state_t state) {
+ int ret;
+
pa_assert(s);
- state = pa_source_get_state(s);
- pa_return_if_fail(state == PA_SOURCE_IDLE || state == PA_SOURCE_SUSPENDED);
-
- pa_atomic_store(&s->state, PA_SOURCE_RUNNING);
-
- if (s->start)
- s->start(s);
- else
- pa_asyncmsgq_post(s->asyncmsgq, s, PA_SOURCE_MESSAGE_START, NULL, NULL, pa_source_unref, NULL);
-}
-
-static void source_stop(pa_source *s) {
- pa_source_state_t state;
- int stop;
+ if (s->state == state)
+ return 0;
- pa_assert(s);
- state = pa_source_get_state(s);
- pa_return_if_fail(state == PA_SOURCE_RUNNING || state == PA_SOURCE_SUSPENDED);
+ if (s->set_state)
+ if ((ret = s->set_state(s, state)) < 0)
+ return -1;
- stop = state == PA_SOURCE_RUNNING;
- pa_atomic_store(&s->state, PA_SOURCE_IDLE);
+ if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+ return -1;
- if (stop) {
- if (s->stop)
- s->stop(s);
- else
- pa_asyncmsgq_post(s->asyncmsgq, s, PA_SOURCE_MESSAGE_STOP, NULL, NULL, pa_source_unref, NULL);
- }
+ s->state = state;
+ return 0;
}
void pa_source_disconnect(pa_source *s) {
pa_source_output *o, *j = NULL;
pa_assert(s);
- pa_return_if_fail(pa_sink_get_state(s) != PA_SINK_DISCONNECT);
-
- source_stop(s);
+ pa_return_if_fail(s->state != PA_SOURCE_DISCONNECTED);
- pa_atomic_store(&s->state, PA_SOURCE_DISCONNECTED);
pa_namereg_unregister(s->core, s->name);
+ pa_idxset_remove_by_data(s->core->sources, s, NULL);
pa_hook_fire(&s->core->hook_source_disconnect, s);
j = o;
}
- pa_idxset_remove_by_data(s->core->sources, s, NULL);
+ source_set_state(s, PA_SOURCE_DISCONNECTED);
s->get_latency = NULL;
s->get_volume = NULL;
s->set_volume = NULL;
s->set_mute = NULL;
s->get_mute = NULL;
- s->start = NULL;
- s->stop = NULL;
+ s->set_state = NULL;
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index);
}
-static void source_free(pa_msgobject *o) {
+static void source_free(pa_object *o) {
+ pa_source_output *so;
pa_source *s = PA_SOURCE(o);
pa_assert(s);
pa_assert(pa_source_refcnt(s) == 0);
- pa_source_disconnect(s);
+ if (s->state != PA_SOURCE_DISCONNECTED)
+ pa_source_disconnect(s);
pa_log_info("Freeing source %u \"%s\"", s->index, s->name);
pa_idxset_free(s->outputs, NULL, NULL);
- pa_hashmap_free(s->thread_info.outputs, pa_sink_output_unref, NULL);
- pa_asyncmsgq_free(s->asyncmsgq);
+ while ((so = pa_hashmap_steal_first(s->thread_info.outputs)))
+ pa_source_output_unref(so);
+
+ pa_hashmap_free(s->thread_info.outputs, NULL, NULL);
pa_xfree(s->name);
pa_xfree(s->description);
pa_xfree(s);
}
-void pa_source_update_status(pa_source*s) {
+int pa_source_update_status(pa_source*s) {
pa_source_assert_ref(s);
- if (pa_source_get_state(s) == PA_SOURCE_STATE_SUSPENDED)
- return;
+ if (s->state == PA_SOURCE_SUSPENDED)
+ return 0;
- if (pa_source_used_by(s) > 0)
- source_start(s);
- else
- source_stop(s);
+ return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
}
-void pa_source_suspend(pa_source *s, int suspend) {
- pa_source_state_t state;
-
+int pa_source_suspend(pa_source *s, int suspend) {
pa_source_assert_ref(s);
- state = pa_source_get_state(s);
- pa_return_if_fail(suspend && (s->state == PA_SOURCE_RUNNING || s->state == PA_SOURCE_IDLE));
- pa_return_if_fail(!suspend && (s->state == PA_SOURCE_SUSPENDED));
-
-
- if (suspend) {
- pa_atomic_store(&s->state, PA_SOURCE_SUSPENDED);
-
- if (s->stop)
- s->stop(s);
- else
- pa_asyncmsgq_post(s->asyncmsgq, s, PA_SOURCE_MESSAGE_STOP, NULL, NULL, pa_source_unref, NULL);
+ if (suspend)
+ return source_set_state(s, PA_SOURCE_SUSPENDED);
+ else
+ return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
+}
- } else {
- pa_atomic_store(&s->state, PA_SOURCE_RUNNING);
+void pa_source_ping(pa_source *s) {
+ pa_source_assert_ref(s);
- if (s->start)
- s->start(s);
- else
- pa_asyncmsgq_post(s->asyncmsgq, s, PA_SOURCE_MESSAGE_START, NULL, NULL, pa_source_unref, NULL);
- }
+ pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_PING, NULL, NULL, NULL);
}
void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
pa_source_assert_ref(s);
pa_assert(chunk);
- if (s->sw_muted || !pa_cvolume_is_norm(&s->sw_volume)) {
+ if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
pa_memchunk vchunk = *chunk;
pa_memblock_ref(vchunk.memblock);
pa_memchunk_make_writable(&vchunk, 0);
- if (s->thread_info.muted || pa_cvolume_is_muted(s->thread_info.volume))
+ if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
pa_silence_memchunk(&vchunk, &s->sample_spec);
else
- pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.volume);
+ pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
pa_source_output_push(o, &vchunk);
if (s->get_latency)
return s->get_latency(s);
- if (pa_asyncmsgq_send(s->asyncmsgq, s, PA_SOURCE_MESSAGE_GET_LATENCY, &usec, NULL) < 0)
+ if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY, &usec, NULL) < 0)
return 0;
return usec;
}
void pa_source_set_volume(pa_source *s, const pa_cvolume *volume) {
- pa_cvolume *v;
+ int changed;
pa_source_assert_ref(s);
pa_assert(volume);
- changed = !pa_cvolume_equal(volume, s->volume);
+ changed = !pa_cvolume_equal(volume, &s->volume);
s->volume = *volume;
if (s->set_volume && s->set_volume(s) < 0)
s->set_volume = NULL;
if (!s->set_volume)
- pa_asyncmsgq_post(s->asyncmsgq, pa_source_ref(s), PA_SOURCE_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), pa_source_unref, pa_xfree);
+ pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree);
if (changed)
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
}
const pa_cvolume *pa_source_get_volume(pa_source *s) {
+ pa_cvolume old_volume;
pa_source_assert_ref(s);
old_volume = s->volume;
s->get_volume = NULL;
if (!s->get_volume && s->refresh_volume)
- pa_asyncmsgq_send(s->asyncmsgq, s, PA_SOURCE_MESSAGE_GET_VOLUME, &s->volume);
+ pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_VOLUME, &s->volume, NULL);
if (!pa_cvolume_equal(&old_volume, &s->volume))
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
return &s->volume;
}
-void pa_source_set_mute(pa_source *s, pa_mixer_t m, int mute) {
+void pa_source_set_mute(pa_source *s, int mute) {
int changed;
pa_source_assert_ref(s);
s->set_mute = NULL;
if (!s->set_mute)
- pa_asyncmsgq_post(s->asyncmsgq, pa_source_ref(s), PA_SOURCE_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), pa_source_unref, NULL);
+ pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
if (changed)
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
}
-int pa_source_get_mute(pa_source *s, pa_mixer_t m) {
+int pa_source_get_mute(pa_source *s) {
int old_muted;
pa_source_assert_ref(s);
if (s->get_mute && s->get_mute(s) < 0)
s->get_mute = NULL;
- if (!s->get_mute && s->refresh_mute)
- pa_asyncmsgq_send(s->asyncmsgq, s, PA_SOURCE_MESSAGE_GET_MUTE, &s->muted);
+ if (!s->get_mute && s->refresh_muted)
+ pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MUTE, &s->muted, NULL);
if (old_muted != s->muted)
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
}
+void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q) {
+ pa_source_assert_ref(s);
+ pa_assert(q);
+
+ s->asyncmsgq = q;
+}
+
unsigned pa_source_used_by(pa_source *s) {
pa_source_assert_ref(s);
return pa_idxset_size(s->outputs);
}
-int pa_source_process_msg(pa_msgobject *o, void *object, int code, pa_memchunk *chunk, void *userdata) {
+int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
pa_source *s = PA_SOURCE(o);
pa_source_assert_ref(s);
- switch (code) {
+ switch ((pa_source_message_t) code) {
case PA_SOURCE_MESSAGE_ADD_OUTPUT: {
pa_source_output *i = userdata;
pa_hashmap_put(s->thread_info.outputs, PA_UINT32_TO_PTR(i->index), pa_source_output_ref(i));
return 0;
}
- case PA_SOURCE_MESSAGE_REMOVE_INPUT: {
- pa_source_input *i = userdata;
- pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(i->index), pa_source_output_ref(i));
+ case PA_SOURCE_MESSAGE_REMOVE_OUTPUT: {
+ pa_source_output *i = userdata;
+ pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(i->index));
return 0;
}
*((int*) userdata) = s->thread_info.soft_muted;
return 0;
- default:
- return -1;
+ case PA_SOURCE_MESSAGE_PING:
+ return 0;
+
+ case PA_SOURCE_MESSAGE_SET_STATE:
+ s->thread_info.state = PA_PTR_TO_UINT(userdata);
+ return 0;
+
+ case PA_SOURCE_MESSAGE_GET_LATENCY:
+ case PA_SOURCE_MESSAGE_MAX:
+ ;
}
+
+ return -1;
}
uint32_t index;
pa_core *core;
- pa_atomic_t state;
+ pa_source_state_t state;
char *name;
char *description, *driver; /* may be NULL */
pa_cvolume volume;
int muted;
int refresh_volume;
- int referesh_mute;
+ int refresh_muted;
- void (*start)(pa_source*source); /* may be NULL */
- void (*stop)(pa_source*source); /* may be NULL */
+ int (*set_state)(pa_source*source, pa_source_state_t state); /* may be NULL */
int (*set_volume)(pa_source *s); /* dito */
int (*get_volume)(pa_source *s); /* dito */
int (*set_mute)(pa_source *s); /* dito */
pa_asyncmsgq *asyncmsgq;
struct {
+ pa_source_state_t state;
pa_hashmap *outputs;
pa_cvolume soft_volume;
int soft_muted;
};
PA_DECLARE_CLASS(pa_source);
-#define PA_SOURCE(s) ((pa_source*) (s))
+#define PA_SOURCE(s) pa_source_cast(s)
typedef enum pa_source_message {
PA_SOURCE_MESSAGE_ADD_OUTPUT,
PA_SOURCE_MESSAGE_GET_MUTE,
PA_SOURCE_MESSAGE_SET_MUTE,
PA_SOURCE_MESSAGE_GET_LATENCY,
- PA_SOURCE_MESSAGE_START,
- PA_SOURCE_MESSAGE_STOP,
+ PA_SOURCE_MESSAGE_SET_STATE,
+ PA_SOURCE_MESSAGE_PING,
PA_SOURCE_MESSAGE_MAX
} pa_source_message_t;
void pa_source_set_module(pa_source *s, pa_module *m);
void pa_source_set_description(pa_source *s, const char *description);
+void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q);
/* Callable by everyone */
pa_usec_t pa_source_get_latency(pa_source *s);
-void pa_source_update_status(pa_source*s);
-void pa_source_suspend(pa_source *s);
+int pa_source_update_status(pa_source*s);
+int pa_source_suspend(pa_source *s, int suspend);
+void pa_source_ping(pa_source *s);
void pa_source_set_volume(pa_source *source, const pa_cvolume *volume);
const pa_cvolume *pa_source_get_volume(pa_source *source);
int pa_source_get_mute(pa_source *source);
unsigned pa_source_used_by(pa_source *s);
-#define pa_source_get_state(s) ((pa_source_state_t) pa_atomic_load(&(s)->state))
+#define pa_source_get_state(s) ((pa_source_state_t) (s)->state)
/* To be used exclusively by the source driver thread */
void pa_source_post(pa_source*s, const pa_memchunk *b);
-void pa_source_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
+int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
#endif
do {
int code = 0;
- pa_assert_se(pa_asyncmsgq_get(q, NULL, &code, NULL, 1) == 0);
+ pa_assert_se(pa_asyncmsgq_get(q, NULL, &code, NULL, NULL, 1) == 0);
switch (code) {
break;
}
- pa_asyncmsgq_done(q);
+ pa_asyncmsgq_done(q, 0);
} while (!quit);
}
printf("Operation B post\n");
pa_asyncmsgq_post(q, NULL, OPERATION_B, NULL, NULL, NULL);
-
+
pa_thread_yield();
printf("Operation C send\n");
- pa_asyncmsgq_send(q, NULL, OPERATION_C, NULL);
+ pa_asyncmsgq_send(q, NULL, OPERATION_C, NULL, NULL);
pa_thread_yield();