]> code.delx.au - pulseaudio/commitdiff
A lot of more work to get the lock-free stuff in place
authorLennart Poettering <lennart@poettering.net>
Wed, 13 Jun 2007 22:08:14 +0000 (22:08 +0000)
committerLennart Poettering <lennart@poettering.net>
Wed, 13 Jun 2007 22:08:14 +0000 (22:08 +0000)
git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1474 fefdeb5f-60dc-0310-8127-8f9354f1896f

33 files changed:
src/Makefile.am
src/daemon/main.c
src/modules/module-lirc.c
src/modules/module-mmkbd-evdev.c
src/modules/module-null-sink.c
src/modules/module-pipe-sink.c
src/modules/module-pipe-source.c
src/pulsecore/asyncmsgq.c
src/pulsecore/asyncq.c
src/pulsecore/asyncq.h
src/pulsecore/cli-command.c
src/pulsecore/cli-text.c
src/pulsecore/core-subscribe.c
src/pulsecore/core.c
src/pulsecore/core.h
src/pulsecore/log.c
src/pulsecore/msgobject.c
src/pulsecore/msgobject.h
src/pulsecore/object.c
src/pulsecore/object.h
src/pulsecore/protocol-simple.c
src/pulsecore/resampler.c
src/pulsecore/resampler.h
src/pulsecore/sink-input.c
src/pulsecore/sink-input.h
src/pulsecore/sink.c
src/pulsecore/sink.h
src/pulsecore/sound-file-stream.c
src/pulsecore/source-output.c
src/pulsecore/source-output.h
src/pulsecore/source.c
src/pulsecore/source.h
src/tests/asyncmsgq-test.c

index eab465c871c5745aaf100f7b32116fdca79a83f6..0a5d529779b30e78311732fe1c3dc821e7f4bf8d 100644 (file)
@@ -283,14 +283,14 @@ flist_test_CFLAGS = $(AM_CFLAGS)
 flist_test_LDADD = $(AM_LDADD) libpulsecore.la
 flist_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS)
 
-asyncq_test_SOURCES = tests/asyncq-test.c pulsecore/thread-posix.c pulsecore/thread.h pulsecore/asyncq.c pulsecore/asyncq.h pulsecore/core-util.c pulsecore/core-util.h pulse/xmalloc.c pulse/xmalloc.h pulsecore/log.h pulsecore/log.c pulsecore/core-error.h pulsecore/core-error.c pulsecore/once-posix.c pulsecore/once.h pulsecore/mutex-posix.c pulsecore/mutex.h pulse/utf8.c pulse/utf8.h pulse/util.h pulse/util.c
+asyncq_test_SOURCES = tests/asyncq-test.c
 asyncq_test_CFLAGS = $(AM_CFLAGS)
-asyncq_test_LDADD = $(AM_LDADD) #libpulsecore.la
+asyncq_test_LDADD = $(AM_LDADD) libpulsecore.la
 asyncq_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS)
 
-asyncmsgq_test_SOURCES = tests/asyncmsgq-test.c pulsecore/thread-posix.c pulsecore/thread.h pulsecore/asyncq.c pulsecore/asyncq.h pulsecore/asyncmsgq.c pulsecore/asyncmsgq.h pulsecore/core-util.c pulsecore/core-util.h pulse/xmalloc.c pulse/xmalloc.h pulsecore/log.h pulsecore/log.c pulsecore/core-error.h pulsecore/core-error.c pulsecore/once-posix.c pulsecore/once.h pulsecore/mutex-posix.c pulsecore/mutex.h pulse/utf8.c pulse/utf8.h pulse/util.h pulse/util.c pulsecore/semaphore.h pulsecore/semaphore-posix.c pulsecore/flist.h pulsecore/flist.c
+asyncmsgq_test_SOURCES = tests/asyncmsgq-test.c
 asyncmsgq_test_CFLAGS = $(AM_CFLAGS)
-asyncmsgq_test_LDADD = $(AM_LDADD) #libpulsecore.la
+asyncmsgq_test_LDADD = $(AM_LDADD) libpulsecore.la
 asyncmsgq_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS)
 
 mcalign_test_SOURCES = tests/mcalign-test.c
@@ -653,7 +653,10 @@ libpulsecore_la_SOURCES += \
                pulsecore/hook-list.c pulsecore/hook-list.h \
                pulsecore/shm.c pulsecore/shm.h \
                pulsecore/flist.c pulsecore/flist.h \
-               pulsecore/anotify.c pulsecore/anotify.h \
+               pulsecore/asyncmsgq.c pulsecore/asyncmsgqq.h \
+               pulsecore/asyncq.c pulsecore/asyncq.h \
+               pulsecore/object.c pulsecore/object.h \
+               pulsecore/msgobject.c pulsecore/msgobject.h \
                $(PA_THREAD_OBJS)
 
 if OS_IS_WIN32
@@ -718,9 +721,10 @@ modlibexec_LTLIBRARIES = \
                libauthkey-prop.la \
                libstrlist.la \
                libprotocol-simple.la \
-               libprotocol-esound.la \
-               libprotocol-native.la \
-               libprotocol-http.la
+               libprotocol-http.la 
+
+#              libprotocol-esound.la
+#              libprotocol-native.la
 
 # We need to emulate sendmsg/recvmsg to support this on Win32
 if !OS_IS_WIN32
@@ -870,6 +874,11 @@ modlibexec_LTLIBRARIES += \
                module-cli-protocol-tcp.la \
                module-simple-protocol-tcp.la \
                module-null-sink.la
+               module-detect.la \
+               module-volume-restore.la \
+               module-rescue-streams.la \
+               module-http-protocol-tcp.la 
+
 #              module-esound-protocol-tcp.la \
 #              module-native-protocol-tcp.la \
 #              module-native-protocol-fd.la \
@@ -877,11 +886,7 @@ modlibexec_LTLIBRARIES += \
 #              module-combine.la \
 #              module-tunnel-sink.la \
 #              module-tunnel-source.la \
-#              module-esound-sink.la \
-#              module-http-protocol-tcp.la \
-#              module-detect.la \
-#              module-volume-restore.la \
-#              module-rescue-streams.la
+#              module-esound-sink.la
 
 # See comment at librtp.la above
 #if !OS_IS_WIN32
@@ -894,9 +899,9 @@ if HAVE_AF_UNIX
 modlibexec_LTLIBRARIES += \
                module-cli-protocol-unix.la \
                module-simple-protocol-unix.la
+               module-http-protocol-unix.la
 #              module-esound-protocol-unix.la \
-#              module-native-protocol-unix.la \
-#              module-http-protocol-unix.la
+#              module-native-protocol-unix.la
 endif
 
 if HAVE_MKFIFO
@@ -1079,44 +1084,44 @@ module_http_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-h
 
 # Native protocol
 
-module_native_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c
-module_native_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
-module_native_protocol_tcp_la_LDFLAGS = -module -avoid-version
-module_native_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la
+#module_native_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c
+#module_native_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
+#module_native_protocol_tcp_la_LDFLAGS = -module -avoid-version
+#module_native_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la
 
-module_native_protocol_unix_la_SOURCES = modules/module-protocol-stub.c
-module_native_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
-module_native_protocol_unix_la_LDFLAGS = -module -avoid-version
-module_native_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la
+#module_native_protocol_unix_la_SOURCES = modules/module-protocol-stub.c
+#module_native_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_NATIVE $(AM_CFLAGS)
+#module_native_protocol_unix_la_LDFLAGS = -module -avoid-version
+#module_native_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la
 
-module_native_protocol_fd_la_SOURCES = modules/module-native-protocol-fd.c
-module_native_protocol_fd_la_CFLAGS = $(AM_CFLAGS)
-module_native_protocol_fd_la_LDFLAGS = -module -avoid-version
-module_native_protocol_fd_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la libiochannel.la
+#module_native_protocol_fd_la_SOURCES = modules/module-native-protocol-fd.c
+#module_native_protocol_fd_la_CFLAGS = $(AM_CFLAGS)
+#module_native_protocol_fd_la_LDFLAGS = -module -avoid-version
+#module_native_protocol_fd_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-native.la libsocket-server.la libsocket-util.la libiochannel.la
 
 # EsounD protocol
 
-module_esound_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c
-module_esound_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS)
-module_esound_protocol_tcp_la_LDFLAGS = -module -avoid-version
-module_esound_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la
+#module_esound_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c
+#module_esound_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS)
+#module_esound_protocol_tcp_la_LDFLAGS = -module -avoid-version
+#module_esound_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la
 
-module_esound_protocol_unix_la_SOURCES = modules/module-protocol-stub.c
-module_esound_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS)
-module_esound_protocol_unix_la_LDFLAGS = -module -avoid-version
-module_esound_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la libsocket-util.la
+#module_esound_protocol_unix_la_SOURCES = modules/module-protocol-stub.c
+#module_esound_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS)
+#module_esound_protocol_unix_la_LDFLAGS = -module -avoid-version
+#module_esound_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la libsocket-util.la
 
-module_esound_compat_spawnfd_la_SOURCES = modules/module-esound-compat-spawnfd.c
-module_esound_compat_spawnfd_la_LDFLAGS = -module -avoid-version
-module_esound_compat_spawnfd_la_LIBADD = $(AM_LIBADD) libpulsecore.la
+#module_esound_compat_spawnfd_la_SOURCES = modules/module-esound-compat-spawnfd.c
+#module_esound_compat_spawnfd_la_LDFLAGS = -module -avoid-version
+#module_esound_compat_spawnfd_la_LIBADD = $(AM_LIBADD) libpulsecore.la
 
-module_esound_compat_spawnpid_la_SOURCES = modules/module-esound-compat-spawnpid.c
-module_esound_compat_spawnpid_la_LDFLAGS = -module -avoid-version
-module_esound_compat_spawnpid_la_LIBADD = $(AM_LIBADD) libpulsecore.la
+#module_esound_compat_spawnpid_la_SOURCES = modules/module-esound-compat-spawnpid.c
+#module_esound_compat_spawnpid_la_LDFLAGS = -module -avoid-version
+#module_esound_compat_spawnpid_la_LIBADD = $(AM_LIBADD) libpulsecore.la
 
-module_esound_sink_la_SOURCES = modules/module-esound-sink.c
-module_esound_sink_la_LDFLAGS = -module -avoid-version
-module_esound_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la libiochannel.la libsocket-client.la libauthkey.la
+#module_esound_sink_la_SOURCES = modules/module-esound-sink.c
+#module_esound_sink_la_LDFLAGS = -module -avoid-version
+#module_esound_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la libiochannel.la libsocket-client.la libauthkey.la
 
 # Pipes
 
@@ -1140,22 +1145,22 @@ module_null_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la
 
 # Couplings
 
-module_combine_la_SOURCES = modules/module-combine.c
-module_combine_la_LDFLAGS = -module -avoid-version
-module_combine_la_LIBADD = $(AM_LIBADD) libpulsecore.la
+#module_combine_la_SOURCES = modules/module-combine.c
+#module_combine_la_LDFLAGS = -module -avoid-version
+#module_combine_la_LIBADD = $(AM_LIBADD) libpulsecore.la
 
 module_match_la_SOURCES = modules/module-match.c
 module_match_la_LDFLAGS = -module -avoid-version
 module_match_la_LIBADD = $(AM_LIBADD) libpulsecore.la
 
-module_tunnel_sink_la_SOURCES = modules/module-tunnel.c
-module_tunnel_sink_la_CFLAGS = -DTUNNEL_SINK=1 $(AM_CFLAGS)
-module_tunnel_sink_la_LDFLAGS = -module -avoid-version
-module_tunnel_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la libauthkey-prop.la libsocket-util.la libiochannel.la
+#module_tunnel_sink_la_SOURCES = modules/module-tunnel.c
+#module_tunnel_sink_la_CFLAGS = -DTUNNEL_SINK=1 $(AM_CFLAGS)
+#module_tunnel_sink_la_LDFLAGS = -module -avoid-version
+#module_tunnel_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la libauthkey-prop.la libsocket-util.la libiochannel.la
 
-module_tunnel_source_la_SOURCES = modules/module-tunnel.c
-module_tunnel_source_la_LDFLAGS = -module -avoid-version
-module_tunnel_source_la_LIBADD = $(AM_LIBADD) libpulsecore.la libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la libauthkey-prop.la libsocket-util.la libiochannel.la
+#module_tunnel_source_la_SOURCES = modules/module-tunnel.c
+#module_tunnel_source_la_LDFLAGS = -module -avoid-version
+#module_tunnel_source_la_LIBADD = $(AM_LIBADD) libpulsecore.la libsocket-client.la libpstream.la libpstream-util.la libpdispatch.la libtagstruct.la libauthkey.la libauthkey-prop.la libsocket-util.la libiochannel.la
 
 # X11
 
@@ -1171,34 +1176,34 @@ module_x11_publish_la_LIBADD = $(AM_LIBADD) $(X_PRE_LIBS) -lX11 $(X_LIBS) $(X_EX
 
 # OSS
 
-liboss_util_la_SOURCES = modules/oss-util.c modules/oss-util.h
-liboss_util_la_LDFLAGS = -avoid-version
-liboss_util_la_LIBADD = libpulsecore.la
+#liboss_util_la_SOURCES = modules/oss-util.c modules/oss-util.h
+#liboss_util_la_LDFLAGS = -avoid-version
+#liboss_util_la_LIBADD = libpulsecore.la
 
-module_oss_la_SOURCES = modules/module-oss.c
-module_oss_la_LDFLAGS = -module -avoid-version
-module_oss_la_LIBADD = $(AM_LIBADD) libiochannel.la liboss-util.la
+#module_oss_la_SOURCES = modules/module-oss.c
+#module_oss_la_LDFLAGS = -module -avoid-version
+#module_oss_la_LIBADD = $(AM_LIBADD) libiochannel.la liboss-util.la
 
-module_oss_mmap_la_SOURCES = modules/module-oss-mmap.c
-module_oss_mmap_la_LDFLAGS = -module -avoid-version
-module_oss_mmap_la_LIBADD = $(AM_LIBADD) liboss-util.la libpulsecore.la
+#module_oss_mmap_la_SOURCES = modules/module-oss-mmap.c
+#module_oss_mmap_la_LDFLAGS = -module -avoid-version
+#module_oss_mmap_la_LIBADD = $(AM_LIBADD) liboss-util.la libpulsecore.la
 
 # ALSA
 
-libalsa_util_la_SOURCES = modules/alsa-util.c modules/alsa-util.h
-libalsa_util_la_LDFLAGS = -avoid-version
-libalsa_util_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libpulsecore.la
-libalsa_util_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS)
+#libalsa_util_la_SOURCES = modules/alsa-util.c modules/alsa-util.h
+#libalsa_util_la_LDFLAGS = -avoid-version
+#libalsa_util_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libpulsecore.la
+#libalsa_util_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS)
 
-module_alsa_sink_la_SOURCES = modules/module-alsa-sink.c
-module_alsa_sink_la_LDFLAGS = -module -avoid-version
-module_alsa_sink_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libalsa-util.la libpulsecore.la
-module_alsa_sink_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS)
+#module_alsa_sink_la_SOURCES = modules/module-alsa-sink.c
+#module_alsa_sink_la_LDFLAGS = -module -avoid-version
+#module_alsa_sink_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libalsa-util.la libpulsecore.la
+#module_alsa_sink_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS)
 
-module_alsa_source_la_SOURCES = modules/module-alsa-source.c
-module_alsa_source_la_LDFLAGS = -module -avoid-version
-module_alsa_source_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libalsa-util.la libpulsecore.la
-module_alsa_source_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS)
+#module_alsa_source_la_SOURCES = modules/module-alsa-source.c
+#module_alsa_source_la_LDFLAGS = -module -avoid-version
+#module_alsa_source_la_LIBADD = $(AM_LIBADD) $(ASOUNDLIB_LIBS) libalsa-util.la libpulsecore.la
+#module_alsa_source_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS)
 
 # Solaris
 
@@ -1265,15 +1270,15 @@ module_rtp_recv_la_CFLAGS = $(AM_CFLAGS)
 
 # JACK
 
-module_jack_sink_la_SOURCES = modules/module-jack-sink.c
-module_jack_sink_la_LDFLAGS = -module -avoid-version
-module_jack_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la $(JACK_LIBS)
-module_jack_sink_la_CFLAGS = $(AM_LIBADD) $(JACK_CFLAGS)
+#module_jack_sink_la_SOURCES = modules/module-jack-sink.c
+#module_jack_sink_la_LDFLAGS = -module -avoid-version
+#module_jack_sink_la_LIBADD = $(AM_LIBADD) libpulsecore.la $(JACK_LIBS)
+#module_jack_sink_la_CFLAGS = $(AM_LIBADD) $(JACK_CFLAGS)
 
-module_jack_source_la_SOURCES = modules/module-jack-source.c
-module_jack_source_la_LDFLAGS = -module -avoid-version
-module_jack_source_la_LIBADD = $(AM_LIBADD) libpulsecore.la $(JACK_LIBS)
-module_jack_source_la_CFLAGS = $(AM_LIBADD) $(JACK_CFLAGS)
+#module_jack_source_la_SOURCES = modules/module-jack-source.c
+#module_jack_source_la_LDFLAGS = -module -avoid-version
+#module_jack_source_la_LIBADD = $(AM_LIBADD) libpulsecore.la $(JACK_LIBS)
+#module_jack_source_la_CFLAGS = $(AM_LIBADD) $(JACK_CFLAGS)
 
 # HAL
 libdbus_util_la_SOURCES = modules/dbus-util.c modules/dbus-util.h
index 2424efa72207152a68f118a9ec0509727e0e4205..a1926fe5d540cc2723a1c69107e3ff7ba2d136b7 100644 (file)
@@ -656,7 +656,7 @@ int main(int argc, char *argv[]) {
     pa_mainloop_get_api(mainloop)->time_free(timer);
 #endif
 
-    pa_core_free(c);
+    pa_core_unref(c);
 
     if (!conf->no_cpu_limit)
         pa_cpu_limit_done();
index c8adbc8bec9d05ffec7d6a707dcb133101bc84b9..452fa1f3c810f5ab1512373cfa1ad118ab4b8236 100644 (file)
@@ -121,7 +121,7 @@ static void io_callback(pa_mainloop_api *io, PA_GCC_UNUSED pa_io_event *e, PA_GC
                     pa_log("failed to get sink '%s'", u->sink_name);
                 else {
                     int i;
-                    pa_cvolume cv = *pa_sink_get_volume(s, PA_MIXER_HARDWARE);
+                    pa_cvolume cv = *pa_sink_get_volume(s);
 
 #define DELTA (PA_VOLUME_NORM/20)
 
@@ -134,7 +134,7 @@ static void io_callback(pa_mainloop_api *io, PA_GCC_UNUSED pa_io_event *e, PA_GC
                                     cv.values[i] = PA_VOLUME_NORM;
                             }
 
-                            pa_sink_set_volume(s, PA_MIXER_HARDWARE, &cv);
+                            pa_sink_set_volume(s, &cv);
                             break;
 
                         case DOWN:
@@ -145,20 +145,20 @@ static void io_callback(pa_mainloop_api *io, PA_GCC_UNUSED pa_io_event *e, PA_GC
                                     cv.values[i] = PA_VOLUME_MUTED;
                             }
 
-                            pa_sink_set_volume(s, PA_MIXER_HARDWARE, &cv);
+                            pa_sink_set_volume(s, &cv);
                             break;
 
                         case MUTE:
-                            pa_sink_set_mute(s, PA_MIXER_HARDWARE, 0);
+                            pa_sink_set_mute(s, 0);
                             break;
 
                         case RESET:
-                            pa_sink_set_mute(s, PA_MIXER_HARDWARE, 1);
+                            pa_sink_set_mute(s, 1);
                             break;
 
                         case MUTE_TOGGLE:
 
-                            pa_sink_set_mute(s, PA_MIXER_HARDWARE, !pa_sink_get_mute(s, PA_MIXER_HARDWARE));
+                            pa_sink_set_mute(s, !pa_sink_get_mute(s));
                             break;
 
                         case INVALID:
index b7433ac8250e4131f8b32d6c7f24956a50dbc8e9..919b399d76b9c2d4254eea5faf9e9804d6b783b7 100644 (file)
@@ -114,7 +114,7 @@ static void io_callback(pa_mainloop_api *io, PA_GCC_UNUSED pa_io_event *e, PA_GC
                     pa_log("failed to get sink '%s'", u->sink_name);
                 else {
                     int i;
-                    pa_cvolume cv = *pa_sink_get_volume(s, PA_MIXER_HARDWARE);
+                    pa_cvolume cv = *pa_sink_get_volume(s);
 
 #define DELTA (PA_VOLUME_NORM/20)
 
@@ -127,7 +127,7 @@ static void io_callback(pa_mainloop_api *io, PA_GCC_UNUSED pa_io_event *e, PA_GC
                                     cv.values[i] = PA_VOLUME_NORM;
                             }
 
-                            pa_sink_set_volume(s, PA_MIXER_HARDWARE, &cv);
+                            pa_sink_set_volume(s, &cv);
                             break;
 
                         case DOWN:
@@ -138,12 +138,12 @@ static void io_callback(pa_mainloop_api *io, PA_GCC_UNUSED pa_io_event *e, PA_GC
                                     cv.values[i] = PA_VOLUME_MUTED;
                             }
 
-                            pa_sink_set_volume(s, PA_MIXER_HARDWARE, &cv);
+                            pa_sink_set_volume(s, &cv);
                             break;
 
                         case MUTE_TOGGLE:
 
-                            pa_sink_set_mute(s, PA_MIXER_HARDWARE, !pa_sink_get_mute(s, PA_MIXER_HARDWARE));
+                            pa_sink_set_mute(s, !pa_sink_get_mute(s));
                             break;
 
                         case INVALID:
index ce3b29b090b94bd61ae0b55192ab0ba845452afb..afe130d9bbf232d255772db2b0d8eedb7080b48f 100644 (file)
 #include <fcntl.h>
 #include <unistd.h>
 #include <limits.h>
+#include <sys/poll.h>
 
 #include <pulse/timeval.h>
 #include <pulse/xmalloc.h>
 
 #include <pulsecore/macro.h>
-#include <pulsecore/iochannel.h>
 #include <pulsecore/sink.h>
 #include <pulsecore/module.h>
 #include <pulsecore/core-util.h>
+#include <pulsecore/core-error.h>
 #include <pulsecore/modargs.h>
 #include <pulsecore/log.h>
+#include <pulsecore/thread.h>
 
 #include "module-null-sink-symdef.h"
 
@@ -65,7 +67,9 @@ struct userdata {
     pa_module *module;
     pa_sink *sink;
     pa_thread *thread;
+    pa_asyncmsgq *asyncmsgq;
     size_t block_size;
+    
     struct timeval timestamp;
 };
 
@@ -79,85 +83,74 @@ static const char* const valid_modargs[] = {
     NULL
 };
 
+static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+    struct userdata *u = PA_SINK(o)->userdata;
+
+    switch (code) {
+        case PA_SINK_MESSAGE_SET_STATE:
+
+            if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING)
+                pa_gettimeofday(&u->timestamp);
+            
+            break;
+            
+        case PA_SINK_MESSAGE_GET_LATENCY: {
+            struct timeval now;
+    
+            pa_gettimeofday(&now);
+            
+            if (pa_timeval_cmp(&u->timestamp, &now) > 0)
+                *((pa_usec_t*) data) = 0;
+            else
+                *((pa_usec_t*) data) = pa_timeval_diff(&u->timestamp, &now);
+            break;
+        }
+    }
+    
+    return pa_sink_process_msg(o, code, data, chunk);
+}
+
 static void thread_func(void *userdata) {
     struct userdata *u = userdata;
-    int quit = 0;
     struct pollfd pollfd;
-    int running = 1;
 
     pa_assert(u);
 
     pa_log_debug("Thread starting up");
 
+    pa_gettimeofday(&u->timestamp);
+
     memset(&pollfd, 0, sizeof(pollfd));
-    pollfd.fd = pa_asyncmsgq_get_fd(u->sink->asyncmsgq, PA_ASYNCQ_POP);
+    pollfd.fd = pa_asyncmsgq_get_fd(u->asyncmsgq);
     pollfd.events = POLLIN;
 
-    pa_gettimeofday(u->timestamp);
-
     for (;;) {
+        pa_msgobject *object;
         int code;
-        void *data, *object;
+        void *data;
+        pa_memchunk chunk;
         int r, timeout;
         struct timeval now;
 
         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->sink->asyncmsgq, &object, &code, &data) == 0) {
-
-
-            /* Now process these messages our own way */
-            if (!object) {
-
-                switch (code) {
-                    case PA_MESSAGE_SHUTDOWN:
-                        goto finish;
-
-                    default:
-                        pa_sink_process_msg(u->sink->asyncmsgq, object, code, data);
-
-                }
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+            int ret;
 
-            } else if (object == u->sink) {
-
-                switch (code) {
-                    case PA_SINK_MESSAGE_STOP:
-                        pa_assert(running);
-                        running = 0;
-                        break;
-
-                    case PA_SINK_MESSAGE_START:
-                        pa_assert(!running);
-                        running = 1;
-
-                        pa_gettimeofday(u->timestamp);
-                        break;
-
-                    case PA_SINK_MESSAGE_GET_LATENCY:
-
-                        if (pa_timeval_cmp(&u->timestamp, &now) > 0)
-                            *((pa_usec_t*) data) = 0;
-                        else
-                            *((pa_usec_t*) data) = pa_timeval_diff(&u->timestamp, &now);
-                        break;
-
-                        /* ... */
-
-                    default:
-                        pa_sink_process_msg(u->sink->asyncmsgq, object, code, data);
-                }
+            if (!object && code == PA_MESSAGE_SHUTDOWN) {
+                pa_asyncmsgq_done(u->asyncmsgq, 0);
+                goto finish;
             }
-
-            pa_asyncmsgq_done(u->sink->asyncmsgq);
+                    
+            ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         }
 
         /* Render some data and drop it immediately */
-
-        if (running) {
+        if (u->sink->thread_info.state == PA_SINK_RUNNING) {
             pa_gettimeofday(&now);
 
-            if (pa_timeval_cmp(u->timestamp, &now) <= 0) {
-                pa_memchunk chunk;
+            if (pa_timeval_cmp(&u->timestamp, &now) <= 0) {
                 size_t l;
 
                 if (pa_sink_render(u->sink, u->block_size, &chunk) >= 0) {
@@ -179,11 +172,11 @@ static void thread_func(void *userdata) {
 
         /* Hmm, nothing to do. Let's sleep */
 
-        if (pa_asyncmsgq_before_poll(u->sink->asyncmsgq) < 0)
+        if (pa_asyncmsgq_before_poll(u->asyncmsgq) < 0)
             continue;
 
         r = poll(&pollfd, 1, timeout);
-        pa_asyncmsgq_after_poll(u->sink->asyncmsgq);
+        pa_asyncmsgq_after_poll(u->asyncmsgq);
 
         if (r < 0) {
             if (errno == EINTR)
@@ -199,8 +192,8 @@ static void thread_func(void *userdata) {
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, u->core, PA_CORE_MESSAGE_UNLOAD_MODULE, pa_module_ref(u->module), NULL, pa_module_unref);
-    pa_asyncmsgq_wait_for(PA_MESSAGE_SHUTDOWN);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 
 finish:
     pa_log_debug("Thread shutting down");
@@ -231,20 +224,24 @@ int pa__init(pa_core *c, pa_module*m) {
     u->module = m;
     m->userdata = u;
 
+    pa_assert_se(u->asyncmsgq = pa_asyncmsgq_new(0));
+    
     if (!(u->sink = pa_sink_new(c, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, &map))) {
         pa_log("Failed to create sink.");
         goto fail;
     }
 
+    u->sink->parent.process_msg = sink_process_msg;
     u->sink->userdata = u;
-    pa_sink_set_owner(u->sink, m);
+
+    pa_sink_set_module(u->sink, m);
+    pa_sink_set_asyncmsgq(u->sink, u->asyncmsgq);
     pa_sink_set_description(u->sink, pa_modargs_get_value(ma, "description", "NULL sink"));
 
     u->block_size = pa_bytes_per_second(&ss) / 20; /* 50 ms */
-
     if (u->block_size <= 0)
         u->block_size = pa_frame_size(&ss);
-
+    
     if (!(u->thread = pa_thread_new(thread_func, u))) {
         pa_log("Failed to create thread.");
         goto fail;
@@ -272,14 +269,19 @@ void pa__done(pa_core *c, pa_module*m) {
     if (!(u = m->userdata))
         return;
 
-    pa_sink_disconnect(u->sink);
+    if (u->sink)
+        pa_sink_disconnect(u->sink);
 
     if (u->thread) {
-        pa_asyncmsgq_send(u->sink->asyncmsgq, PA_SINK_MESSAGE_SHUTDOWN, NULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
         pa_thread_free(u->thread);
     }
 
-    pa_sink_unref(u->sink);
+    if (u->asyncmsgq)
+        pa_asyncmsgq_free(u->asyncmsgq);
+
+    if (u->sink)
+        pa_sink_unref(u->sink);
 
     pa_xfree(u);
 }
index e4735f615b99943a596a6dba32cc37fe75d00f05..da9124a70ea663ae697a4ca0b2b647d095689a52 100644 (file)
@@ -34,6 +34,8 @@
 #include <fcntl.h>
 #include <unistd.h>
 #include <limits.h>
+#include <sys/ioctl.h>
+#include <sys/poll.h>
 
 #include <pulse/xmalloc.h>
 
@@ -44,6 +46,7 @@
 #include <pulsecore/core-util.h>
 #include <pulsecore/modargs.h>
 #include <pulsecore/log.h>
+#include <pulsecore/thread.h>
 
 #include "module-pipe-sink-symdef.h"
 
@@ -65,9 +68,12 @@ struct userdata {
     pa_core *core;
     pa_module *module;
     pa_sink *sink;
+    pa_thread *thread;
+    pa_asyncmsgq *asyncmsgq;
     char *filename;
     int fd;
-    pa_thread *thread;
+
+    pa_memchunk memchunk;
 };
 
 static const char* const valid_modargs[] = {
@@ -80,109 +86,99 @@ static const char* const valid_modargs[] = {
     NULL
 };
 
-enum {
-    POLLFD_ASYNCQ,
-    POLLFD_FIFO,
-    POLLFD_MAX,
-};
+static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memchunk *chunk) {
+    struct userdata *u = PA_SINK(o)->userdata;
+
+    switch (code) {
+            
+        case PA_SINK_MESSAGE_GET_LATENCY: {
+            size_t n = 0;
+            int l;
+            
+            if (ioctl(u->fd, TIOCINQ, &l) >= 0 && l > 0)
+                n = (size_t) l;
+            
+            n += u->memchunk.length;
+            
+            *((pa_usec_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec);
+            break;
+        }
+    }
+    
+    return pa_sink_process_msg(o, code, data, chunk);
+}
 
 static void thread_func(void *userdata) {
+    enum {
+        POLLFD_ASYNCQ,
+        POLLFD_FIFO,
+        POLLFD_MAX,
+    };
+    
     struct userdata *u = userdata;
-    int quit = 0;
     struct pollfd pollfd[POLLFD_MAX];
-    int running = 1, underrun = 0;
-    pa_memchunk memchunk;
+    int underrun = 0;
+    int write_type = 0;
 
     pa_assert(u);
 
     pa_log_debug("Thread starting up");
 
+    pa_memchunk_reset(&u->memchunk);
+    
     memset(&pollfd, 0, sizeof(pollfd));
-    pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->sink->asyncmsgq, PA_ASYNCQ_POP);
+    
+    pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->asyncmsgq);
     pollfd[POLLFD_ASYNCQ].events = POLLIN;
-
     pollfd[POLLFD_FIFO].fd = u->fd;
 
-    memset(&memchunk, 0, sizeof(memchunk));
-
     for (;;) {
+        pa_msgobject *object;
         int code;
-        void *object, *data;
+        void *data;
+        pa_memchunk chunk;
         int r;
-        struct timeval now;
 
         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->sink->asyncmsgq, &object, &code, &data) == 0) {
-
-
-            /* Now process these messages our own way */
-            if (!object) {
-                switch (code) {
-                    case PA_SINK_MESSAGE_SHUTDOWN:
-                        goto finish;
-
-                    default:
-                        pa_sink_process_msg(u->sink->asyncmsgq, object, code, data);
-                }
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
+            int ret;
 
-            } else if (object == u->sink) {
-
-                case PA_SINK_MESSAGE_STOP:
-                    pa_assert(running);
-                    running = 0;
-                    break;
-
-                case PA_SINK_MESSAGE_START:
-                    pa_assert(!running);
-                    running = 1;
-                    break;
-
-                case PA_SINK_MESSAGE_GET_LATENCY: {
-                    size_t n = 0;
-                    int l;
-
-                    if (ioctl(u->fd, TIOCINQ, &l) >= 0 && l > 0)
-                        n = (size_t) l;
-
-                    n += memchunk.length;
-
-                    *((pa_usec_t*) data) pa_bytes_to_usec(n, &u->sink->sample_spec);
-                    break;
-                }
-
-                /* ... */
-
-                default:
-                    pa_sink_process_msg(u->sink->asyncmsgq, object, code, data);
+            if (!object && code == PA_MESSAGE_SHUTDOWN) {
+                pa_asyncmsgq_done(u->asyncmsgq, 0);
+                goto finish;
             }
 
-            pa_asyncmsgq_done(u->sink->asyncmsgq);
+            ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         }
 
         /* Render some data and write it to the fifo */
 
-        if (running && (pollfd[POLLFD_FIFO].revents || underrun)) {
+        if (u->sink->thread_info.state == PA_SINK_RUNNING && (pollfd[POLLFD_FIFO].revents || underrun)) {
 
-            if (chunk.length <= 0)
-                pa_sink_render(u->fd, PIPE_BUF, &chunk);
+            if (u->memchunk.length <= 0)
+                pa_sink_render(u->sink, PIPE_BUF, &u->memchunk);
 
-            underrun = chunk.length <= 0;
+            underrun = u->memchunk.length <= 0;
 
             if (!underrun) {
                 ssize_t l;
+                void *p;
 
                 p = pa_memblock_acquire(u->memchunk.memblock);
-                l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length);
-                pa_memblock_release(p);
+                l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &write_type);
+                pa_memblock_release(u->memchunk.memblock);
 
                 if (l < 0) {
 
-                    if (errno != EINTR && errno != EAGAIN) {
+                    if (errno == EINTR)
+                        continue;
+                    else if (errno != EAGAIN) {
                         pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
                         goto fail;
                     }
-
+                    
                 } else {
 
                     u->memchunk.index += l;
@@ -190,24 +186,24 @@ static void thread_func(void *userdata) {
 
                     if (u->memchunk.length <= 0) {
                         pa_memblock_unref(u->memchunk.memblock);
-                        u->memchunk.memblock = NULL;
+                        pa_memchunk_reset(&u->memchunk);
                     }
-                }
 
-                pollfd[POLLFD_FIFO].revents = 0;
-                continue;
+                    pollfd[POLLFD_FIFO].revents = 0;
+                    continue;
+                }
             }
         }
 
-        pollfd[POLLFD_FIFO].events = running && !underrun ? POLLOUT : 0;
+        pollfd[POLLFD_FIFO].events = (u->sink->thread_info.state == PA_SINK_RUNNING && !underrun) ? POLLOUT : 0;
 
         /* Hmm, nothing to do. Let's sleep */
 
-        if (pa_asyncmsgq_before_poll(u->sink->asyncmsgq) < 0)
+        if (pa_asyncmsgq_before_poll(u->asyncmsgq) < 0)
             continue;
 
-        r = poll(&pollfd, 1, 0);
-        pa_asyncmsgq_after_poll(u->sink->asyncmsgq);
+        r = poll(pollfd, POLLFD_MAX, -1);
+        pa_asyncmsgq_after_poll(u->asyncmsgq);
 
         if (r < 0) {
             if (errno == EINTR)
@@ -217,19 +213,19 @@ static void thread_func(void *userdata) {
             goto fail;
         }
 
-        if (pollfd[POLLFD_FIFO].revents & ~POLLIN) {
+        if (pollfd[POLLFD_FIFO].revents & ~POLLOUT) {
             pa_log("FIFO shutdown.");
             goto fail;
         }
 
-        pa_assert(pollfd[POLLFD_ASYNCQ].revents & ~POLLIN == 0);
+        pa_assert((pollfd[POLLFD_ASYNCQ].revents & ~POLLIN) == 0);
     }
 
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, u->core, PA_CORE_MESSAGE_UNLOAD_MODULE, pa_module_ref(u->module), pa_module_unref);
-    pa_asyncmsgq_wait_for(PA_SINK_MESSAGE_SHUTDOWN);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 
 finish:
     pa_log_debug("Thread shutting down");
@@ -253,23 +249,22 @@ int pa__init(pa_core *c, pa_module*m) {
 
     ss = c->default_sample_spec;
     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
-        pa_log("Invalid sample format specification");
+        pa_log("Invalid sample format specification or channel map");
         goto fail;
     }
 
     u = pa_xnew0(struct userdata, 1);
     u->core = c;
     u->module = m;
-    u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FIFO_NAME));
-    u->fd = fd;
-    u->memchunk.memblock = NULL;
-    u->memchunk.length = 0;
     m->userdata = u;
 
-    mkfifo(u->filename, 0666);
+    pa_assert_se(u->asyncmsgq = pa_asyncmsgq_new(0));
+    
+    u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME));
 
-    if ((u->fd = open(u->filename, O_RDWR)) < 0) {
-        pa_log("open('%s'): %s", p, pa_cstrerror(errno));
+    mkfifo(u->filename, 0666);
+    if ((u->fd = open(u->filename, O_RDWR|O_NOCTTY)) < 0) {
+        pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
         goto fail;
     }
 
@@ -277,12 +272,12 @@ int pa__init(pa_core *c, pa_module*m) {
     pa_make_nonblock_fd(u->fd);
 
     if (fstat(u->fd, &st) < 0) {
-        pa_log("fstat('%s'): %s", p, pa_cstrerror(errno));
+        pa_log("fstat('%s'): %s", u->filename, pa_cstrerror(errno));
         goto fail;
     }
 
     if (!S_ISFIFO(st.st_mode)) {
-        pa_log("'%s' is not a FIFO.", p);
+        pa_log("'%s' is not a FIFO.", u->filename);
         goto fail;
     }
 
@@ -291,9 +286,12 @@ int pa__init(pa_core *c, pa_module*m) {
         goto fail;
     }
 
+    u->sink->parent.process_msg = sink_process_msg;
     u->sink->userdata = u;
-    pa_sink_set_owner(u->sink, m);
-    pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Unix FIFO sink '%s'", p));
+    
+    pa_sink_set_module(u->sink, m);
+    pa_sink_set_asyncmsgq(u->sink, u->asyncmsgq);
+    pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Unix FIFO sink '%s'", u->filename));
     pa_xfree(t);
 
     if (!(u->thread = pa_thread_new(thread_func, u))) {
@@ -316,20 +314,26 @@ fail:
 
 void pa__done(pa_core *c, pa_module*m) {
     struct userdata *u;
+    
     pa_assert(c);
     pa_assert(m);
 
     if (!(u = m->userdata))
         return;
 
-    pa_sink_disconnect(u->sink);
+    if (u->sink)
+        pa_sink_disconnect(u->sink);
 
     if (u->thread) {
-        pa_asyncmsgq_send(u->sink->asyncmsgq, PA_SINK_MESSAGE_SHUTDOWN, NULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, NULL);
         pa_thread_free(u->thread);
     }
 
-    pa_sink_unref(u->sink);
+    if (u->asyncmsgq)
+        pa_asyncmsgq_free(u->asyncmsgq);
+    
+    if (u->sink)
+        pa_sink_unref(u->sink);
 
     if (u->memchunk.memblock)
        pa_memblock_unref(u->memchunk.memblock);
index f275c5d44192c86d4da1bc089854bc84e635e49a..ac2bef7d42baa6e22c7097c96c7bd785121d35d7 100644 (file)
@@ -179,7 +179,7 @@ int pa__init(pa_core *c, pa_module*m) {
         goto fail;
     }
     u->source->userdata = u;
-    pa_source_set_owner(u->source, m);
+    pa_source_set_module(u->source, m);
     pa_source_set_description(u->source, t = pa_sprintf_malloc("Unix FIFO source '%s'", p));
     pa_xfree(t);
 
index de5b2f9d94ec0c222f3de1cfbb642ebfe2856dcd..6becb629d153074862add61b07744e21fa944400 100644 (file)
@@ -48,6 +48,7 @@ struct asyncmsgq_item {
     pa_free_cb_t free_cb;
     pa_memchunk memchunk;
     pa_semaphore *semaphore;
+    int ret;
 };
 
 struct pa_asyncmsgq {
@@ -81,10 +82,10 @@ void pa_asyncmsgq_free(pa_asyncmsgq *a) {
             pa_msgobject_unref(i->object);
 
         if (i->memchunk.memblock)
-            pa_memblock_unref(i->object);
+            pa_memblock_unref(i->memchunk.memblock);
 
-        if (i->userdata_free_cb)
-            i->userdata_free_cb(i->userdata);
+        if (i->free_cb)
+            i->free_cb(i->userdata);
 
         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
             pa_xfree(i);
@@ -103,7 +104,7 @@ void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const vo
         i = pa_xnew(struct asyncmsgq_item, 1);
 
     i->code = code;
-    i->object = pa_msgobject_ref(object);
+    i->object = object ? pa_msgobject_ref(object) : NULL;
     i->userdata = (void*) userdata;
     i->free_cb = free_cb;
     if (chunk) {
@@ -131,9 +132,9 @@ int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const voi
     i.ret = -1;
     if (chunk) {
         pa_assert(chunk->memblock);
-        i->memchunk = *chunk;
+        i.memchunk = *chunk;
     } else
-        pa_memchunk_reset(&i->memchunk);
+        pa_memchunk_reset(&i.memchunk);
     pa_assert_se(i.semaphore = pa_semaphore_new(0));
 
     /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
@@ -161,8 +162,10 @@ int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **u
     if (object)
         *object = a->current->object;
     if (chunk)
-        *chunk = a->chunk;
+        *chunk = a->current->memchunk;
 
+    pa_log_debug("q=%p object=%p code=%i data=%p", a, a->current->object, a->current->code, a->current->userdata);
+    
     return 0;
 }
 
@@ -196,11 +199,16 @@ int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
     pa_assert(a);
 
     do {
+        pa_msgobject *o;
+        void *data;
+        pa_memchunk chunk;
+        int ret;
 
-        if (pa_asyncmsgq_get(a, NULL, &c, NULL, 1) < 0)
+        if (pa_asyncmsgq_get(a, &o, &c, &data, &chunk, 1) < 0)
             return -1;
 
-        pa_asyncmsgq_done(a);
+        ret = pa_asyncmsgq_dispatch(o, c, data, &chunk);
+        pa_asyncmsgq_done(a, ret);
 
     } while (c != code);
 
@@ -226,10 +234,9 @@ void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
 }
 
 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, pa_memchunk *memchunk) {
-    pa_assert(q);
 
     if (object)
-        return object->msg_process(object, code, userdata, memchunk);
+        return object->process_msg(object, code, userdata, memchunk);
 
     return 0;
 }
index 54d36dc06388e350c43765f2785e66ddb23234f4..da1f16fb75bd1f9c0438051548981101418f1a36 100644 (file)
@@ -52,8 +52,8 @@ struct pa_asyncq {
     unsigned size;
     unsigned read_idx;
     unsigned write_idx;
-    pa_atomic_int_t read_waiting;
-    pa_atomic_int_t write_waiting;
+    pa_atomic_t read_waiting, n_read;
+    pa_atomic_t write_waiting, n_written;
     int read_fds[2], write_fds[2];
 };
 
@@ -80,6 +80,8 @@ pa_asyncq *pa_asyncq_new(unsigned size) {
     l->size = size;
     pa_atomic_store(&l->read_waiting, 0);
     pa_atomic_store(&l->write_waiting, 0);
+    pa_atomic_store(&l->n_written, 0);
+    pa_atomic_store(&l->n_read, 0);
 
     if (pipe(l->read_fds) < 0) {
         pa_xfree(l);
@@ -131,10 +133,26 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
 
     if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
 
-        /* First try failed. Let's wait for changes. */
-
-        if (!wait)
+        if (!wait) {
+            /* Let's empty the FIFO from old notifications, before we return */
+            
+            while (pa_atomic_load(&l->n_read) > 0) {
+                ssize_t r;
+                int x[20];
+                
+                errno = 0;
+                if ((r = read(l->write_fds[0], x, sizeof(x))) <= 0 && errno != EINTR)
+                    return -1;
+                
+                if (r > 0)
+                    if (pa_atomic_sub(&l->n_read, r) <= r)
+                        break;
+            }
+            
             return -1;
+        }
+
+        /* First try failed. Let's wait for changes. */
 
         _Y;
 
@@ -142,6 +160,7 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
 
         for (;;) {
             char x[20];
+            ssize_t r;
 
             _Y;
 
@@ -150,10 +169,13 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
 
             _Y;
 
-            if (read(l->write_fds[0], x, sizeof(x)) < 0 && errno != EINTR) {
+            if ((r = read(l->write_fds[0], x, sizeof(x))) < 0 && errno != EINTR) {
                 pa_atomic_dec(&l->write_waiting);
                 return -1;
             }
+
+            if (r > 0)
+                pa_atomic_sub(&l->n_read, r);
         }
 
         _Y;
@@ -167,7 +189,8 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
     if (pa_atomic_load(&l->read_waiting)) {
         char x = 'x';
         _Y;
-        write(l->read_fds[1], &x, sizeof(x));
+        if (write(l->read_fds[1], &x, sizeof(x)) > 0)
+            pa_atomic_inc(&l->n_written);
     }
 
     return 0;
@@ -189,8 +212,24 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) {
 
         /* First try failed. Let's wait for changes. */
 
-        if (!wait)
+        if (!wait) {
+            /* Let's empty the FIFO from old notifications, before we return */
+            
+            while (pa_atomic_load(&l->n_written) > 0) {
+                ssize_t r;
+                int x[20];
+                
+                errno = 0;
+                if ((r = read(l->read_fds[0], x, sizeof(x))) <= 0 && errno != EINTR)
+                    return NULL;
+                
+                if (r > 0)
+                    if (pa_atomic_sub(&l->n_written, r) <= r)
+                        break;
+            }
+            
             return NULL;
+        }
 
         _Y;
 
@@ -198,6 +237,7 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) {
 
         for (;;) {
             char x[20];
+            ssize_t r;
 
             _Y;
 
@@ -206,10 +246,13 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) {
 
             _Y;
 
-            if (read(l->read_fds[0], x, sizeof(x)) < 0 && errno != EINTR) {
+            if ((r = read(l->read_fds[0], x, sizeof(x)) < 0) && errno != EINTR) {
                 pa_atomic_dec(&l->read_waiting);
                 return NULL;
             }
+
+            if (r > 0)
+                pa_atomic_sub(&l->n_written, r);
         }
 
         _Y;
@@ -226,7 +269,8 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) {
     if (pa_atomic_load(&l->write_waiting)) {
         char x = 'x';
         _Y;
-        write(l->write_fds[1], &x, sizeof(x));
+        if (write(l->write_fds[1], &x, sizeof(x)) >= 0)
+            pa_atomic_inc(&l->n_read);
     }
 
     return ret;
@@ -262,10 +306,13 @@ int pa_asyncq_before_poll(pa_asyncq *l) {
     return 0;
 }
 
-int pa_asyncq_after_poll(pa_asyncq *l) {
+void pa_asyncq_after_poll(pa_asyncq *l) {
     pa_assert(l);
 
     pa_assert(pa_atomic_load(&l->read_waiting) > 0);
 
     pa_atomic_dec(&l->read_waiting);
+
+
+    
 }
index aac45b1d16be3c1d416485d535af44fc57f632e5..729ec466297fa817682c137aa5b382dbc75bd3db 100644 (file)
@@ -51,6 +51,6 @@ int pa_asyncq_push(pa_asyncq *q, void *p, int wait);
 
 int pa_asyncq_get_fd(pa_asyncq *q);
 int pa_asyncq_before_poll(pa_asyncq *a);
-int pa_asyncq_after_poll(pa_asyncq *a);
+void pa_asyncq_after_poll(pa_asyncq *a);
 
 #endif
index 36c85d602a38b720dd29b1163c390930fecfebe8..d7613530f95916cba5e81fb95318b853b3669eb5 100644 (file)
@@ -115,6 +115,8 @@ static int pa_cli_command_list_props(pa_core *c, pa_tokenizer *t, pa_strbuf *buf
 static int pa_cli_command_move_sink_input(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
 static int pa_cli_command_move_source_output(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
 static int pa_cli_command_vacuum(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
+static int pa_cli_command_suspend_sink(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
+static int pa_cli_command_suspend_source(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
 
 /* A method table for all available commands */
 
@@ -134,11 +136,11 @@ static const struct command commands[] = {
     { "load-module",             pa_cli_command_load,               "Load a module (args: name, arguments)", 3},
     { "unload-module",           pa_cli_command_unload,             "Unload a module (args: index)", 2},
     { "set-sink-volume",         pa_cli_command_sink_volume,        "Set the volume of a sink (args: index|name, volume)", 3},
-    { "set-sink-input-volume",   pa_cli_command_sink_input_volume,  "Set the volume of a sink input (args: index|name, volume)", 3},
+    { "set-sink-input-volume",   pa_cli_command_sink_input_volume,  "Set the volume of a sink input (args: index, volume)", 3},
     { "set-source-volume",       pa_cli_command_source_volume,      "Set the volume of a source (args: index|name, volume)", 3},
-    { "set-sink-mute",           pa_cli_command_sink_mute,          "Set the mute switch of a sink (args: index|name, mute)", 3},
-    { "set-sink-input-mute",     pa_cli_command_sink_input_mute,    "Set the mute switch of a sink input (args: index|name, mute)", 3},
-    { "set-source-mute",         pa_cli_command_source_mute,        "Set the mute switch of a source (args: index|name, mute)", 3},
+    { "set-sink-mute",           pa_cli_command_sink_mute,          "Set the mute switch of a sink (args: index|name, bool)", 3},
+    { "set-sink-input-mute",     pa_cli_command_sink_input_mute,    "Set the mute switch of a sink input (args: index, bool)", 3},
+    { "set-source-mute",         pa_cli_command_source_mute,        "Set the mute switch of a source (args: index|name, bool)", 3},
     { "set-default-sink",        pa_cli_command_sink_default,       "Set the default sink (args: index|name)", 2},
     { "set-default-source",      pa_cli_command_source_default,     "Set the default source (args: index|name)", 2},
     { "kill-client",             pa_cli_command_kill_client,        "Kill a client (args: index)", 2},
@@ -161,6 +163,8 @@ static const struct command commands[] = {
     { "move-sink-input",         pa_cli_command_move_sink_input,    "Move sink input to another sink (args: index, sink)", 3},
     { "move-source-output",      pa_cli_command_move_source_output, "Move source output to another source (args: index, source)", 3},
     { "vacuum",                  pa_cli_command_vacuum,             NULL, 1},
+    { "suspend-sink",            pa_cli_command_suspend_sink,       "Suspend sink (args: index|name, bool)", 3},
+    { "suspend-source",          pa_cli_command_suspend_source,     "Suspend source (args: index|name, bool)", 3},
     { NULL, NULL, NULL, 0 }
 };
 
@@ -899,6 +903,64 @@ static int pa_cli_command_move_source_output(pa_core *c, pa_tokenizer *t, pa_str
     return 0;
 }
 
+static int pa_cli_command_suspend_sink(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) {
+    const char *n, *m;
+    pa_sink *sink;
+    int suspend;
+
+    if (!(n = pa_tokenizer_get(t, 1))) {
+        pa_strbuf_puts(buf, "You need to specify a sink either by its name or its index.\n");
+        return -1;
+    }
+
+    if (!(m = pa_tokenizer_get(t, 2))) {
+        pa_strbuf_puts(buf, "You need to specify a suspend switch setting (0/1).\n");
+        return -1;
+    }
+
+    if (pa_atoi(m, &suspend) < 0) {
+        pa_strbuf_puts(buf, "Failed to parse suspend switch.\n");
+        return -1;
+    }
+
+    if (!(sink = pa_namereg_get(c, n, PA_NAMEREG_SINK, 1))) {
+        pa_strbuf_puts(buf, "No sink found by this name or index.\n");
+        return -1;
+    }
+
+    pa_sink_suspend(sink, suspend);
+    return 0;
+}
+
+static int pa_cli_command_suspend_source(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) {
+    const char *n, *m;
+    pa_source *source;
+    int suspend;
+
+    if (!(n = pa_tokenizer_get(t, 1))) {
+        pa_strbuf_puts(buf, "You need to specify a source either by its name or its index.\n");
+        return -1;
+    }
+
+    if (!(m = pa_tokenizer_get(t, 2))) {
+        pa_strbuf_puts(buf, "You need to specify a suspend switch setting (0/1).\n");
+        return -1;
+    }
+
+    if (pa_atoi(m, &suspend) < 0) {
+        pa_strbuf_puts(buf, "Failed to parse suspend switch.\n");
+        return -1;
+    }
+
+    if (!(source = pa_namereg_get(c, n, PA_NAMEREG_SOURCE, 1))) {
+        pa_strbuf_puts(buf, "No source found by this name or index.\n");
+        return -1;
+    }
+
+    pa_source_suspend(source, suspend);
+    return 0;
+}
+
 static int pa_cli_command_dump(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, PA_GCC_UNUSED int *fail) {
     pa_module *m;
     pa_sink *sink;
@@ -1162,3 +1224,4 @@ int pa_cli_command_execute(pa_core *c, const char *s, pa_strbuf *buf, int *fail)
 
     return 0;
 }
+
index 05d681e32b9d21eb768c643a01e582fb12ba9c90..c919e46d090e03b6149a9a36b62f1e94b547c004 100644 (file)
@@ -114,14 +114,15 @@ char *pa_sink_list_to_string(pa_core *c) {
             "  %c index: %u\n"
             "\tname: <%s>\n"
             "\tdriver: <%s>\n"
-            "\tis_hardware: <%i>\n"
+            "\tis hardware: <%i>\n"
             "\tstate: %s\n"
             "\tvolume: <%s>\n"
             "\tmute: <%i>\n"
             "\tlatency: <%0.0f usec>\n"
-            "\tmonitor_source: <%u>\n"
+            "\tmonitor source: <%u>\n"
             "\tsample spec: <%s>\n"
-            "\tchannel map: <%s>\n",
+            "\tchannel map: <%s>\n"
+            "\tused by: <%u>\n", 
             c->default_sink_name && !strcmp(sink->name, c->default_sink_name) ? '*' : ' ',
             sink->index,
             sink->name,
@@ -133,7 +134,8 @@ char *pa_sink_list_to_string(pa_core *c) {
             (double) pa_sink_get_latency(sink),
             sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
             pa_sample_spec_snprint(ss, sizeof(ss), &sink->sample_spec),
-            pa_channel_map_snprint(cm, sizeof(cm), &sink->channel_map));
+            pa_channel_map_snprint(cm, sizeof(cm), &sink->channel_map),
+            pa_sink_used_by(sink));
 
         if (sink->module)
             pa_strbuf_printf(s, "\tmodule: <%u>\n", sink->module->index);
@@ -170,13 +172,14 @@ char *pa_source_list_to_string(pa_core *c) {
             "  %c index: %u\n"
             "\tname: <%s>\n"
             "\tdriver: <%s>\n"
-            "\tis_hardware: <%i>\n"
+            "\tis hardware: <%i>\n"
             "\tstate: %s\n"
             "\tvolume: <%s>\n"
             "\tmute: <%u>\n"
             "\tlatency: <%0.0f usec>\n"
             "\tsample spec: <%s>\n"
-            "\tchannel map: <%s>\n",
+            "\tchannel map: <%s>\n"
+            "\tused by: <%u>\n",
             c->default_source_name && !strcmp(source->name, c->default_source_name) ? '*' : ' ',
             source->index,
             source->name,
@@ -187,7 +190,8 @@ char *pa_source_list_to_string(pa_core *c) {
             !!pa_source_get_mute(source),
             (double) pa_source_get_latency(source),
             pa_sample_spec_snprint(ss, sizeof(ss), &source->sample_spec),
-            pa_channel_map_snprint(cm, sizeof(cm), &source->channel_map));
+            pa_channel_map_snprint(cm, sizeof(cm), &source->channel_map),
+            pa_source_used_by(source));
 
         if (source->monitor_of)
             pa_strbuf_printf(s, "\tmonitor_of: <%u>\n", source->monitor_of->index);
index 6608d57a424c324c6f26e00dac25b51161bbb329..288d10785f71980dc83b2864366c82f5837903cc 100644 (file)
@@ -207,7 +207,7 @@ static void sched_event(pa_core *c) {
 }
 
 /* Append a new subscription event to the subscription event queue and schedule a main loop event */
-void pa_subscription_post(pa_core *c, pa_subscription_event_type_t t, uint32_t index) {
+void pa_subscription_post(pa_core *c, pa_subscription_event_type_t t, uint32_t idx) {
     pa_subscription_event *e;
     assert(c);
 
@@ -227,7 +227,7 @@ void pa_subscription_post(pa_core *c, pa_subscription_event_type_t t, uint32_t i
                 continue;
 
             /* not the same object */
-            if (i->index != index)
+            if (i->index != idx)
                 continue;
 
             if ((t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_REMOVE) {
@@ -253,7 +253,7 @@ void pa_subscription_post(pa_core *c, pa_subscription_event_type_t t, uint32_t i
     e = pa_xnew(pa_subscription_event, 1);
     e->core = c;
     e->type = t;
-    e->index = index;
+    e->index = idx;
 
     PA_LLIST_INSERT_AFTER(pa_subscription_event, c->subscription_event_queue, c->subscription_event_last, e);
     c->subscription_event_last = e;
index 99ac74e116d0bb9e9301acd022b3b36643a0ec75..a940bfc0ce93aed9dea7c50687edb811de527e94 100644 (file)
@@ -49,6 +49,8 @@
 
 #include "core.h"
 
+static PA_DEFINE_CHECK_TYPE(pa_core, core_check_type, pa_msgobject_check_type);
+
 static int core_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
     pa_core *c = PA_CORE(o);
 
@@ -81,8 +83,10 @@ static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_even
 
         /* Check whether there is a message for us to process */
         while (pa_asyncmsgq_get(c->asyncmsgq, &object, &code, &data, &chunk, 0) == 0) {
-            pa_asyncmsgq_dispatch(object, code, data, &chunk);
-            pa_asyncmsgq_done(c->asyncmsgq, 0);
+            int ret;
+
+            ret = pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            pa_asyncmsgq_done(c->asyncmsgq, ret);
         }
 
         if (pa_asyncmsgq_before_poll(c->asyncmsgq) == 0)
@@ -112,7 +116,7 @@ pa_core* pa_core_new(pa_mainloop_api *m, int shared) {
         }
     }
 
-    c = pa_msgobject_new(pa_core);
+    c = pa_msgobject_new(pa_core, core_check_type);
     c->parent.parent.free = core_free;
     c->parent.process_msg = core_process_msg;
 
@@ -181,7 +185,7 @@ pa_core* pa_core_new(pa_mainloop_api *m, int shared) {
 
 static void core_free(pa_object *o) {
     pa_core *c = PA_CORE(o);
-    pa_core_assert_ref(c);
+    pa_assert(c);
 
     pa_module_unload_all(c);
     assert(!c->modules);
@@ -212,13 +216,14 @@ static void core_free(pa_object *o) {
     pa_xfree(c->default_source_name);
     pa_xfree(c->default_sink_name);
 
+    pa_asyncmsgq_after_poll(c->asyncmsgq);
+    pa_asyncmsgq_free(c->asyncmsgq);
+
     pa_mempool_free(c->mempool);
 
     pa_property_cleanup(c);
 
     c->mainloop->io_free(c->asyncmsgq_event);
-    pa_asyncmsgq_after_poll(c->asyncmsgq);
-    pa_asyncmsgq_free(c->asyncmsgq);
 
     pa_hook_free(&c->hook_sink_input_new);
     pa_hook_free(&c->hook_sink_disconnect);
index 86660b7ae0f5af99a33730ed2ce7da12fd0f3e7a..a64f217906fd18d275e426ade4d1e5cf4700233f 100644 (file)
@@ -98,7 +98,7 @@ struct pa_core {
 };
 
 PA_DECLARE_CLASS(pa_core);
-#define PA_CORE(o) ((pa_core*) o)
+#define PA_CORE(o) pa_core_cast(o)
 
 enum {
     PA_CORE_MESSAGE_UNLOAD_MODULE,
index 0033adb91ed52ad3e6c799de410d720b30210a92..a1197eb5186599b0458945ee89b83df6f0e3b15e 100644 (file)
@@ -71,14 +71,11 @@ static const char level_to_char[] = {
 };
 
 void pa_log_set_ident(const char *p) {
-    if (log_ident)
-        pa_xfree(log_ident);
-    if (log_ident_local)
-        pa_xfree(log_ident_local);
+    pa_xfree(log_ident);
+    pa_xfree(log_ident_local);
 
     log_ident = pa_xstrdup(p);
-    log_ident_local = pa_utf8_to_locale(log_ident);
-    if (!log_ident_local)
+    if (!(log_ident_local = pa_utf8_to_locale(log_ident)))
         log_ident_local = pa_xstrdup(log_ident);
 }
 
index ce9f22f2da6400ec8e48c5b28fae4d4b2f19486b..6db630c5fe43f8b980399c78ab701aa78420be6d 100644 (file)
 
 #include "msgobject.h"
 
-pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name) {
+PA_DEFINE_CHECK_TYPE(pa_msgobject, pa_msgobject_check_type, pa_object_check_type);
+
+pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name)) {
     pa_msgobject *o;
 
     pa_assert(size > sizeof(pa_msgobject));
     pa_assert(type_name);
 
-    o = PA_MSGOBJECT(pa_object_new_internal(size, type_name));
+    o = PA_MSGOBJECT(pa_object_new_internal(size, type_name, check_type ? check_type : pa_msgobject_check_type));
     o->process_msg = NULL;
     return o;
 }
index 317ebd208ac6603e61799bc19845f475e5163606..65761aea56a528c65c9babf6be1dfa646cfeded7 100644 (file)
@@ -40,12 +40,14 @@ struct pa_msgobject {
     int (*process_msg)(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
 };
 
-pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name);
+pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name));
 
-#define pa_msgobject_new(type) ((type*) pa_msgobject_new_internal(sizeof(type), #type))
+int pa_msgobject_check_type(pa_object *o, const char *type);
+
+#define pa_msgobject_new(type, check_type) ((type*) pa_msgobject_new_internal(sizeof(type), #type, check_type))
 #define pa_msgobject_free ((void (*) (pa_msgobject* o)) pa_object_free)
 
-#define PA_MSGOBJECT(o) ((pa_msgobject*) (o))
+#define PA_MSGOBJECT(o) pa_msgobject_cast(o)
 
 PA_DECLARE_CLASS(pa_msgobject);
 
index e6ed53b2c7f8ae7b97e98cc7d4ca6c452431a7e4..a983c5ae9d26551ed1fb0da78203b54b0857a1ac 100644 (file)
@@ -28,7 +28,7 @@
 
 #include "object.h"
 
-pa_object *pa_object_new_internal(size_t size, const char *type_name) {
+pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name)) {
     pa_object *o;
 
     pa_assert(size > sizeof(pa_object));
@@ -38,24 +38,30 @@ pa_object *pa_object_new_internal(size_t size, const char *type_name) {
     PA_REFCNT_INIT(o);
     o->type_name = type_name;
     o->free = pa_object_free;
+    o->check_type = check_type ? check_type : pa_object_check_type;
 
     return o;
 }
 
 pa_object *pa_object_ref(pa_object *o) {
-    pa_assert(o);
-    pa_assert(PA_REFCNT_VALUE(o) >= 1);
+    pa_object_assert_ref(o);
 
     PA_REFCNT_INC(o);
     return o;
 }
 
 void pa_object_unref(pa_object *o) {
-    pa_assert(o);
-    pa_assert(PA_REFCNT_VALUE(o) >= 1);
+    pa_object_assert_ref(o);
 
     if (PA_REFCNT_DEC(o) <= 0) {
         pa_assert(o->free);
         o->free(o);
     }
 }
+
+int pa_object_check_type(pa_object *o, const char *type_name) {
+    pa_assert(o);
+    pa_assert(type_name);
+    
+    return type_name == "pa_object" || strcmp(type_name, "pa_object") == 0;
+}
index e195a359fe889432a35abad8166d4fa7f0f814b6..270f289d6521b8b207117d2128c3ad2e22811235 100644 (file)
@@ -25,7 +25,9 @@
   USA.
 ***/
 
+#include <string.h>
 #include <sys/types.h>
+
 #include <pulse/xmalloc.h>
 #include <pulsecore/refcnt.h>
 #include <pulsecore/macro.h>
@@ -36,13 +38,22 @@ struct pa_object {
     PA_REFCNT_DECLARE;
     const char *type_name;
     void (*free)(pa_object *o);
+    int (*check_type)(pa_object *o, const char *type_name);
 };
 
-pa_object *pa_object_new_internal(size_t size, const char *type_name);
-#define pa_object_new(type) ((type*) pa_object_new_internal(sizeof(type), #type))
+pa_object *pa_object_new_internal(size_t size, const char *type_name, int (*check_type)(pa_object *o, const char *type_name));
+#define pa_object_new(type, check_type) ((type*) pa_object_new_internal(sizeof(type), #type, check_type)
 
 #define pa_object_free ((void (*) (pa_object* o)) pa_xfree)
 
+int pa_object_check_type(pa_object *o, const char *type);
+
+static inline int pa_object_isinstance(void *o) {
+    pa_object *obj = (pa_object*) o;
+    pa_assert(obj);
+    return obj->check_type(obj, "pa_object");
+}
+
 pa_object *pa_object_ref(pa_object *o);
 void pa_object_unref(pa_object *o);
 
@@ -50,23 +61,50 @@ static inline int pa_object_refcnt(pa_object *o) {
     return o ? PA_REFCNT_VALUE(o) : 0;
 }
 
+static inline pa_object* pa_object_cast(void *o) {
+    pa_object *obj = (pa_object*) o;
+    pa_assert(obj->check_type(obj, "pa_object"));
+    return obj;
+}
+
 #define pa_object_assert_ref(o) pa_assert(pa_object_refcnt(o))
 
-#define PA_OBJECT(o) ((pa_object*) (o))
-
-#define PA_DECLARE_CLASS(c) \
-    static inline c* c##_ref(c *o) {                            \
-        return (c*) pa_object_ref(PA_OBJECT(o));                \
-    }                                                           \
-    static inline void c##_unref(c* o) {                        \
-        pa_object_unref(PA_OBJECT(o));                          \
-    }                                                           \
-    static inline int c##_refcnt(c* o) {                        \
-        return pa_object_refcnt(PA_OBJECT(o));                  \
-    }                                                           \
-    static inline void c##_assert_ref(c *o) {                   \
-        pa_object_assert_ref(PA_OBJECT(o));                     \
-    }                                                           \
+#define PA_OBJECT(o) pa_object_cast(o)
+
+#define PA_DECLARE_CLASS(c)                                             \
+    static inline int c##_isinstance(void *o) {                         \
+        pa_object *obj = (pa_object*) o;                                \
+        pa_assert(obj);                                                 \
+        return obj->check_type(obj, #c);                                \
+    }                                                                   \
+    static inline c* c##_cast(void *o) {                                \
+        pa_assert(c##_isinstance(o));                                   \
+        return (c*) o;                                                  \
+    }                                                                   \
+    static inline c* c##_ref(c *o) {                                    \
+        return (c*) pa_object_ref(PA_OBJECT(o));                        \
+    }                                                                   \
+    static inline void c##_unref(c* o) {                                \
+        pa_object_unref(PA_OBJECT(o));                                  \
+    }                                                                   \
+    static inline int c##_refcnt(c* o) {                                \
+        return pa_object_refcnt(PA_OBJECT(o));                          \
+    }                                                                   \
+    static inline void c##_assert_ref(c *o) {                           \
+        pa_object_assert_ref(PA_OBJECT(o));                             \
+    }                                                                   \
+    struct __stupid_useless_struct_to_allow_trailing_semicolon
+
+#define PA_DEFINE_CHECK_TYPE(c, func, parent)                           \
+    int func(pa_object *o, const char *type) {                          \
+        pa_assert(o);                                                   \
+        pa_assert(type);                                                \
+        if (type == #c ||                                               \
+            strcmp(type, #c) == 0)                                      \
+            return 1;                                                   \
+        return parent(o, type);                                         \
+    }                                                                   \
     struct __stupid_useless_struct_to_allow_trailing_semicolon
 
+
 #endif
index b7a4cc78a019a73c9f1282fb2f5b0a28fa0b91c8..67741bdec112057bde6414a48a13a3db8ad7805f 100644 (file)
 #include <pulsecore/namereg.h>
 #include <pulsecore/log.h>
 #include <pulsecore/core-error.h>
+#include <pulsecore/atomic.h>
 
 #include "protocol-simple.h"
 
 /* Don't allow more than this many concurrent connections */
 #define MAX_CONNECTIONS 10
 
-struct connection {
+typedef struct connection {
+    pa_msgobject parent;
     pa_protocol_simple *protocol;
     pa_iochannel *io;
     pa_sink_input *sink_input;
@@ -59,18 +61,21 @@ struct connection {
     struct {
         pa_memblock *current_memblock;
         size_t memblock_index, fragment_size;
-        pa_atomic_int missing;
+        pa_atomic_t missing;
     } playback;
-};
+} connection;
+
+PA_DECLARE_CLASS(connection);
+#define CONNECTION(o) (connection_cast(o))
 
+static PA_DEFINE_CHECK_TYPE(connection, connection_check_type, pa_msgobject_check_type);
+                     
 struct pa_protocol_simple {
     pa_module *module;
     pa_core *core;
     pa_socket_server*server;
     pa_idxset *connections;
 
-    pa_asyncmsgq *asyncmsgq;
-
     enum {
         RECORD = 1,
         PLAYBACK = 2,
@@ -86,8 +91,9 @@ enum {
 };
 
 enum {
-    MESSAGE_REQUEST_DATA,   /* data from source output to main loop */
-    MESSAGE_POST_DATA       /* data from source output to main loop */
+    MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
+    MESSAGE_POST_DATA,         /* data from source output to main loop */
+    MESSAGE_DROP_CONNECTION    /* Please drop a aconnection now */
 };
 
 
@@ -96,37 +102,49 @@ enum {
 #define RECORD_BUFFER_SECONDS (5)
 #define RECORD_BUFFER_FRAGMENTS (100)
 
-static void connection_free(struct connection *c) {
+static void connection_free(pa_object *o) {
+    connection *c = CONNECTION(o);
     pa_assert(c);
 
+    if (c->playback.current_memblock)
+        pa_memblock_unref(c->playback.current_memblock);
+
+    if (c->io)
+        pa_iochannel_free(c->io);
+    if (c->input_memblockq)
+        pa_memblockq_free(c->input_memblockq);
+    if (c->output_memblockq)
+        pa_memblockq_free(c->output_memblockq);
+
+    pa_xfree(c);
+}
+
+static void connection_drop(connection *c) {
+    pa_assert(c);
+    
     pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
 
     if (c->sink_input) {
         pa_sink_input_disconnect(c->sink_input);
         pa_sink_input_unref(c->sink_input);
+        c->sink_input = NULL;
     }
 
     if (c->source_output) {
         pa_source_output_disconnect(c->source_output);
         pa_source_output_unref(c->source_output);
+        c->source_output = NULL;
     }
 
-    if (c->playback.current_memblock)
-        pa_memblock_unref(c->playback.current_memblock);
-
-    if (c->client)
+    if (c->client) {
         pa_client_free(c->client);
-    if (c->io)
-        pa_iochannel_free(c->io);
-    if (c->input_memblockq)
-        pa_memblockq_free(c->input_memblockq);
-    if (c->output_memblockq)
-        pa_memblockq_free(c->output_memblockq);
+        c->client = NULL;
+    }
 
-    pa_xfree(c);
+    connection_unref(c);
 }
 
-static int do_read(struct connection *c) {
+static int do_read(connection *c) {
     pa_memchunk chunk;
     ssize_t r;
     size_t l;
@@ -171,17 +189,17 @@ static int do_read(struct connection *c) {
 
     c->playback.memblock_index += r;
 
-    pa_asyncmsgq_post(c->protocol->asyncmsgq, c, MESSAGE_POST_DATA, NULL, &chunk, NULL, NULL);
+    pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, &chunk, NULL);
 
     return 0;
 }
 
-static int do_write(struct connection *c) {
+static int do_write(connection *c) {
     pa_memchunk chunk;
     ssize_t r;
     void *p;
 
-    p_assert(c);
+    pa_assert(c);
 
     if (!c->source_output)
         return 0;
@@ -212,7 +230,7 @@ static int do_write(struct connection *c) {
     return 0;
 }
 
-static void do_work(struct connection *c) {
+static void do_work(connection *c) {
     pa_assert(c);
 
     if (c->dead)
@@ -243,14 +261,39 @@ fail:
 
         pa_memblockq_prebuf_disable(c->input_memblockq);
     } else
-        connection_free(c);
+        connection_drop(c);
+}
+
+static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) {
+    connection *c = CONNECTION(o);
+    
+    connection_assert_ref(c);
+
+    switch (code) {
+        case MESSAGE_REQUEST_DATA:
+            do_work(c);
+            break;
+            
+        case MESSAGE_POST_DATA:
+            pa_memblockq_push(c->output_memblockq, chunk);
+            do_work(c);
+            break;
+
+        case MESSAGE_DROP_CONNECTION:
+            connection_drop(c);
+            break;
+
+    }
+
+    return 0;
 }
 
 /*** sink_input callbacks ***/
 
 /* Called from thread context */
-static int sink_input_process_msg(pa_sink_input *i, int code, void *userdata, const pa_memchunk *chunk) {
-    struct connection*c;
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
+    pa_sink_input *i = PA_SINK_INPUT(o);
+    connection*c;
 
     pa_assert(i);
     c = i->userdata;
@@ -263,6 +306,8 @@ static int sink_input_process_msg(pa_sink_input *i, int code, void *userdata, co
 
             /* New data from the main loop */
             pa_memblockq_push_align(c->input_memblockq, chunk);
+            pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq));
+            
             return 0;
         }
 
@@ -276,13 +321,14 @@ static int sink_input_process_msg(pa_sink_input *i, int code, void *userdata, co
         }
 
         default:
-            return pa_sink_input_process_msg(i, code, userdata);
+            return pa_sink_input_process_msg(o, code, userdata, chunk);
     }
 }
 
 /* Called from thread context */
 static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
-    struct connection*c;
+    connection*c;
+    int r;
 
     pa_assert(i);
     c = i->userdata;
@@ -292,14 +338,14 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
     r = pa_memblockq_peek(c->input_memblockq, chunk);
 
     if (c->dead && r < 0)
-        connection_free(c);
+        pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, c, NULL, NULL);
 
     return r;
 }
 
 /* Called from thread context */
 static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) {
-    struct connection*c = i->userdata;
+    connection*c = i->userdata;
     size_t old, new;
 
     pa_assert(i);
@@ -310,10 +356,10 @@ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_
     pa_memblockq_drop(c->input_memblockq, chunk, length);
     new = pa_memblockq_missing(c->input_memblockq);
 
-    pa_atomic_store(&c->playback.missing, &new);
+    pa_atomic_store(&c->playback.missing, new);
 
     if (new > old)
-        pa_asyncmsgq_post(c->protocol->asyncmsgq, c, MESSAGE_REQUEST_DATA, NULL, NULL, NULL, NULL);
+        pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_REQUEST_DATA, NULL, NULL, NULL);
 }
 
 /* Called from main context */
@@ -321,34 +367,34 @@ static void sink_input_kill_cb(pa_sink_input *i) {
     pa_assert(i);
     pa_assert(i->userdata);
 
-    connection_free((struct connection *) i->userdata);
+    connection_drop((connection *) i->userdata);
 }
 
 /*** source_output callbacks ***/
 
 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
-    struct connection *c;
+    connection *c;
 
     pa_assert(o);
     c = o->userdata;
     pa_assert(c);
     pa_assert(chunk);
 
-    pa_asyncmsgq_post(c->protocol->asyncmsgq, c, MESSAGE_REQUEST_DATA, NULL, chunk, NULL, NULL);
+    pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_POST_DATA, NULL, chunk, NULL);
 }
 
 static void source_output_kill_cb(pa_source_output *o) {
-    struct connection*c;
+    connection*c;
 
     pa_assert(o);
     c = o->userdata;
     pa_assert(c);
 
-    connection_free(c);
+    connection_drop(c);
 }
 
 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
-    struct connection*c;
+    connection*c;
 
     pa_assert(o);
     c = o->userdata;
@@ -360,19 +406,19 @@ static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
 /*** client callbacks ***/
 
 static void client_kill_cb(pa_client *client) {
-    struct connection*c;
+    connection*c;
 
     pa_assert(client);
     c = client->userdata;
     pa_assert(c);
 
-    connection_free(client);
+    connection_drop(c);
 }
 
 /*** pa_iochannel callbacks ***/
 
 static void io_callback(pa_iochannel*io, void *userdata) {
-    struct connection *c = userdata;
+    connection *c = userdata;
 
     pa_assert(io);
     pa_assert(c);
@@ -384,7 +430,7 @@ static void io_callback(pa_iochannel*io, void *userdata) {
 
 static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) {
     pa_protocol_simple *p = userdata;
-    struct connection *c = NULL;
+    connection *c = NULL;
     char cname[256];
 
     pa_assert(s);
@@ -397,7 +443,9 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
         return;
     }
 
-    c = pa_xnew(struct connection, 1);
+    c = pa_msgobject_new(connection, connection_check_type);
+    c->parent.parent.free = connection_free;
+    c->parent.process_msg = connection_process_msg;
     c->io = io;
     c->sink_input = NULL;
     c->source_output = NULL;
@@ -415,7 +463,6 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
     c->client->kill = client_kill_cb;
     c->client->userdata = c;
 
-
     if (p->mode & PLAYBACK) {
         pa_sink_input_new_data data;
         size_t l;
@@ -432,10 +479,10 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
             goto fail;
         }
 
+        c->sink_input->parent.process_msg = sink_input_process_msg;
         c->sink_input->peek = sink_input_peek_cb;
         c->sink_input->drop = sink_input_drop_cb;
         c->sink_input->kill = sink_input_kill_cb;
-        c->sink_input->get_latency = sink_input_get_latency_cb;
         c->sink_input->userdata = c;
 
         l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
@@ -449,7 +496,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
                 NULL);
         pa_assert(c->input_memblockq);
         pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
-        c->playback.fragment_size = l/10;
+        c->playback.fragment_size = l/PLAYBACK_BUFFER_FRAGMENTS;
 
         pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq));
 
@@ -498,47 +545,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
 
 fail:
     if (c)
-        connection_free(c);
-}
-
-static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
-    pa_protocol_simple *p = userdata;
-    int do_some_work = 0;
-
-    pa_assert(pa_asyncmsgq_get_fd(p->asyncmsgq) == fd);
-    pa_assert(events == PA_IO_EVENT_INPUT);
-
-    pa_asyncmsgq_after_poll(p->asyncmsgq);
-
-    for (;;) {
-        int code;
-        void *object, *data;
-
-        /* Check whether there is a message for us to process */
-        while (pa_asyncmsgq_get(p->asyncmsgq, &object, &code, &data) == 0) {
-
-            connection *c = object;
-
-            pa_assert(c);
-
-            switch (code) {
-
-                case MESSAGE_REQUEST_DATA:
-                    do_work(c);
-                    break;
-
-                case MESSAGE_POST_DATA:
-                    pa_memblockq_push(c->output_memblockq, chunk);
-                    do_work(c);
-                    break;
-            }
-
-            pa_asyncmsgq_done(p->asyncmsgq);
-        }
-
-        if (pa_asyncmsgq_before_poll(p->asyncmsgq) == 0)
-            break;
-    }
+        connection_drop(c);
 }
 
 pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) {
@@ -554,7 +561,6 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv
     p->core = core;
     p->server = server;
     p->connections = pa_idxset_new(NULL, NULL);
-    pa_assert_se(p->asyncmsgq = pa_asyncmsgq_new(0));
 
     p->sample_spec = core->default_sample_spec;
     if (pa_modargs_get_sample_spec(ma, &p->sample_spec) < 0) {
@@ -586,9 +592,6 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv
 
     pa_socket_server_set_callback(p->server, on_connection, p);
 
-    pa_assert_se(pa_asyncmsgq_before_poll(p->asyncmsgq) == 0);
-    pa_assert_se(p->asyncmsgq_event = core->mainloop->io_event_new(core->mainloop, pa_asyncmsgq_get_fd(p->asyncmsgq), PA_IO_EVENT_INPUT, p));
-
     return p;
 
 fail:
@@ -600,12 +603,12 @@ fail:
 
 
 void pa_protocol_simple_free(pa_protocol_simple *p) {
-    struct connection *c;
+    connection *c;
     pa_assert(p);
 
     if (p->connections) {
         while((c = pa_idxset_first(p->connections, NULL)))
-            connection_free(c);
+            connection_drop(c);
 
         pa_idxset_free(p->connections, NULL, NULL);
     }
@@ -613,12 +616,6 @@ void pa_protocol_simple_free(pa_protocol_simple *p) {
     if (p->server)
         pa_socket_server_unref(p->server);
 
-    if (p->asyncmsgq) {
-        c->mainloop->io_event_free(c->asyncmsgq_event);
-        pa_asyncmsgq_after_poll(c->asyncmsgq);
-        pa_asyncmsgq_free(p->asyncmsgq);
-    }
-
     pa_xfree(p);
 }
 
index 248d733747fbf0989b44babd0711136e5a9cb4d8..a43c7c7cf6cd9cf2fbf9f35fad5783a1ce6bae70 100644 (file)
@@ -48,6 +48,7 @@ struct pa_resampler {
 
     void (*impl_free)(pa_resampler *r);
     void (*impl_update_input_rate)(pa_resampler *r, uint32_t rate);
+    void (*impl_update_output_rate)(pa_resampler *r, uint32_t rate);
     void (*impl_run)(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out);
     void *impl_data;
 };
@@ -165,6 +166,19 @@ void pa_resampler_set_input_rate(pa_resampler *r, uint32_t rate) {
         r->impl_update_input_rate(r, rate);
 }
 
+void pa_resampler_set_output_rate(pa_resampler *r, uint32_t rate) {
+    assert(r);
+    assert(rate > 0);
+
+    if (r->o_ss.rate == rate)
+        return;
+
+    r->o_ss.rate = rate;
+
+    if (r->impl_update_output_rate)
+        r->impl_update_output_rate(r, rate);
+}
+
 void pa_resampler_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out) {
     assert(r && in && out && r->impl_run);
 
@@ -512,6 +526,25 @@ static void libsamplerate_update_input_rate(pa_resampler *r, uint32_t rate) {
     }
 }
 
+
+static void libsamplerate_update_output_rate(pa_resampler *r, uint32_t rate) {
+    struct impl_libsamplerate *u;
+
+    assert(r);
+    assert(rate > 0);
+    assert(r->impl_data);
+    u = r->impl_data;
+
+    if (!u->src_state) {
+        int err;
+        u->src_state = src_new(r->resample_method, r->o_ss.channels, &err);
+        assert(u->src_state);
+    } else {
+        int ret = src_set_ratio(u->src_state, (double) rate / r->i_ss.rate);
+        assert(ret == 0);
+    }
+}
+
 static int libsamplerate_init(pa_resampler *r) {
     struct impl_libsamplerate *u = NULL;
     int err;
@@ -541,6 +574,7 @@ static int libsamplerate_init(pa_resampler *r) {
 
     r->impl_free = libsamplerate_free;
     r->impl_update_input_rate = libsamplerate_update_input_rate;
+    r->impl_update_output_rate = libsamplerate_update_output_rate;
     r->impl_run = libsamplerate_run;
 
     calc_map_table(r);
@@ -631,7 +665,7 @@ static void trivial_free(pa_resampler *r) {
     pa_xfree(r->impl_data);
 }
 
-static void trivial_update_input_rate(pa_resampler *r, uint32_t rate) {
+static void trivial_update_rate(pa_resampler *r, uint32_t rate) {
     struct impl_trivial *u;
 
     assert(r);
@@ -655,7 +689,8 @@ static int trivial_init(pa_resampler*r) {
 
     r->impl_run = trivial_run;
     r->impl_free = trivial_free;
-    r->impl_update_input_rate = trivial_update_input_rate;
+    r->impl_update_input_rate = trivial_update_rate;
+    r->impl_update_output_rate = trivial_update_rate;
 
     return 0;
 }
index c283593d679f0bfe934a9d703489d09a5c666904..ada293e5a580484ca246f15f28bd0b1915812bab 100644 (file)
@@ -63,6 +63,9 @@ void pa_resampler_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out);
 /* Change the input rate of the resampler object */
 void pa_resampler_set_input_rate(pa_resampler *r, uint32_t rate);
 
+/* Change the output rate of the resampler object */
+void pa_resampler_set_output_rate(pa_resampler *r, uint32_t rate);
+
 /* Return the resampling method of the resampler object */
 pa_resample_method_t pa_resampler_get_method(pa_resampler *r);
 
index 00b82d26479b10743cf3f290c03371ce0b858a2f..2c6b356c737eeb1da055041c34a2c95c4839de40 100644 (file)
@@ -45,7 +45,9 @@
 #define MOVE_BUFFER_LENGTH (1024*1024)
 #define SILENCE_BUFFER_LENGTH (64*1024)
 
-static void sink_input_free(pa_msgobject *o);
+static PA_DEFINE_CHECK_TYPE(pa_sink_input, sink_input_check_type, pa_msgobject_check_type);
+
+static void sink_input_free(pa_object *o);
 
 pa_sink_input_new_data* pa_sink_input_new_data_init(pa_sink_input_new_data *data) {
     pa_assert(data);
@@ -159,13 +161,12 @@ pa_sink_input* pa_sink_input_new(
         data->resample_method = pa_resampler_get_method(resampler);
     }
 
-    i = pa_msgobject_new(pa_sink_input);
-
+    i = pa_msgobject_new(pa_sink_input, sink_input_check_type);
     i->parent.parent.free = sink_input_free;
     i->parent.process_msg = pa_sink_input_process_msg;
 
     i->core = core;
-    pa_atomic_load(&i->state, PA_SINK_INPUT_DRAINED);
+    pa_atomic_store(&i->state, PA_SINK_INPUT_DRAINED);
     i->flags = flags;
     i->name = pa_xstrdup(data->name);
     i->driver = pa_xstrdup(data->driver);
@@ -189,11 +190,11 @@ pa_sink_input* pa_sink_input_new(
     i->userdata = NULL;
 
     i->thread_info.silence_memblock = NULL;
-    i->thread_info.move_silence = 0;
+/*     i->thread_info.move_silence = 0; */
     pa_memchunk_reset(&i->thread_info.resampled_chunk);
     i->thread_info.resampler = resampler;
-    i->thread_info.soft_volume = i->volume;
-    i->thread_info.soft_muted = i->muted;
+    i->thread_info.volume = i->volume;
+    i->thread_info.muted = i->muted;
 
     pa_assert_se(pa_idxset_put(core->sink_inputs, i, &i->index) == 0);
     pa_assert_se(pa_idxset_put(i->sink->inputs, i, NULL) == 0);
@@ -213,14 +214,16 @@ void pa_sink_input_disconnect(pa_sink_input *i) {
     pa_assert(i);
     pa_return_if_fail(pa_sink_input_get_state(i) != PA_SINK_INPUT_DISCONNECTED);
 
-    pa_asyncmsgq_send(i->sink->asyncmsgq, i->sink, PA_SINK_MESSAGE_REMOVE_INPUT, i, NULL);
+    pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_REMOVE_INPUT, i, NULL);
 
     pa_idxset_remove_by_data(i->sink->core->sink_inputs, i, NULL);
     pa_idxset_remove_by_data(i->sink->inputs, i, NULL);
 
     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_REMOVE, i->index);
-    i->sink = NULL;
 
+    pa_sink_update_status(i->sink);
+
+    i->sink = NULL;
     i->process_msg = NULL;
     i->peek = NULL;
     i->drop = NULL;
@@ -228,10 +231,10 @@ void pa_sink_input_disconnect(pa_sink_input *i) {
     i->get_latency = NULL;
     i->underrun = NULL;
 
-    pa_atomic_load(&i->state, PA_SINK_INPUT_DISCONNECTED);
+    pa_atomic_store(&i->state, PA_SINK_INPUT_DISCONNECTED);
 }
 
-static void sink_input_free(pa_msgobject *o) {
+static void sink_input_free(pa_object *o) {
     pa_sink_input* i = PA_SINK_INPUT(o);
 
     pa_assert(i);
@@ -241,8 +244,8 @@ static void sink_input_free(pa_msgobject *o) {
 
     pa_log_info("Freeing output %u \"%s\"", i->index, i->name);
 
-    if (i->resampled_chunk.memblock)
-        pa_memblock_unref(i->resampled_chunk.memblock);
+    if (i->thread_info.resampled_chunk.memblock)
+        pa_memblock_unref(i->thread_info.resampled_chunk.memblock);
 
     if (i->thread_info.resampler)
         pa_resampler_free(i->thread_info.resampler);
@@ -261,10 +264,10 @@ void pa_sink_input_put(pa_sink_input *i) {
     i->thread_info.volume = i->volume;
     i->thread_info.muted = i->muted;
 
-    pa_asyncmsgq_post(i->sink->asyncmsgq, i->sink, PA_SINK_MESSAGE_ADD_INPUT, i, NULL, pa_sink_unref, pa_sink_input_unref);
+    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, pa_sink_input_ref(i), NULL, (pa_free_cb_t) pa_sink_input_unref);
     pa_sink_update_status(i->sink);
 
-    pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, i->index);
+    pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, i->index);
 }
 
 void pa_sink_input_kill(pa_sink_input*i) {
@@ -279,7 +282,7 @@ pa_usec_t pa_sink_input_get_latency(pa_sink_input *i) {
 
     pa_sink_input_assert_ref(i);
 
-    if (pa_asyncmsgq_send(i->sink->asyncmsgq, i->sink, PA_SINK_INPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
+    if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
         r = 0;
 
     if (i->get_latency)
@@ -327,14 +330,14 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
 /*         goto finish; */
 /*     } */
 
-    if (!i->resampler) {
+    if (!i->thread_info.resampler) {
         do_volume_adj_here = 0;
         ret = i->peek(i, chunk);
         goto finish;
     }
 
     do_volume_adj_here = !pa_channel_map_equal(&i->channel_map, &i->sink->channel_map);
-    volume_is_norm = pa_cvolume_is_norm(&i->thread_info.soft_volume) && !i->thread_info.soft_muted;
+    volume_is_norm = pa_cvolume_is_norm(&i->thread_info.volume) && !i->thread_info.muted;
 
     while (!i->thread_info.resampled_chunk.memblock) {
         pa_memchunk tchunk;
@@ -345,7 +348,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
 
         pa_assert(tchunk.length);
 
-        l = pa_resampler_request(i->resampler, CONVERT_BUFFER_LENGTH);
+        l = pa_resampler_request(i->thread_info.resampler, CONVERT_BUFFER_LENGTH);
 
         if (l > tchunk.length)
             l = tchunk.length;
@@ -356,10 +359,10 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
         /* It might be necessary to adjust the volume here */
         if (do_volume_adj_here && !volume_is_norm) {
             pa_memchunk_make_writable(&tchunk, 0);
-            pa_volume_memchunk(&tchunk, &i->sample_spec, &i->thread_info.soft_volume);
+            pa_volume_memchunk(&tchunk, &i->sample_spec, &i->thread_info.volume);
         }
 
-        pa_resampler_run(i->resampler, &tchunk, &i->thread_info.resampled_chunk);
+        pa_resampler_run(i->thread_info.resampler, &tchunk, &i->thread_info.resampled_chunk);
         pa_memblock_unref(tchunk.memblock);
     }
 
@@ -378,7 +381,7 @@ finish:
 
     if (ret >= 0)
         pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_RUNNING);
-    else if (ret < 0 && i->state == PA_SINK_INPUT_RUNNING)
+    else if (ret < 0 && state == PA_SINK_INPUT_RUNNING)
         pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_DRAINED);
 
     if (ret >= 0) {
@@ -427,7 +430,7 @@ void pa_sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t lengt
 /*         return; */
 /*     } */
 
-    if (!i->resampler) {
+    if (!i->thread_info.resampler) {
         if (i->drop)
             i->drop(i, chunk, length);
         return;
@@ -454,7 +457,7 @@ void pa_sink_input_set_volume(pa_sink_input *i, const pa_cvolume *volume) {
 
     i->volume = *volume;
 
-    pa_asyncmsgq_post(s->asyncmsgq, pa_sink_input_ref(i), PA_SINK_INPUT_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), pa_sink_input_unref, pa_xfree);
+    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree);
     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
 }
 
@@ -473,18 +476,17 @@ void pa_sink_input_set_mute(pa_sink_input *i, int mute) {
 
     i->muted = mute;
 
-    pa_asyncmsgq_post(s->asyncmsgq, pa_sink_input_ref(i), PA_SINK_INPUT_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), pa_sink_input_unref, NULL);
+    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
 }
 
 int pa_sink_input_get_mute(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
 
-    return !!i->mute;
+    return !!i->muted;
 }
 
 void pa_sink_input_cork(pa_sink_input *i, int b) {
-    int n;
     pa_sink_input_state_t state;
 
     pa_sink_input_assert_ref(i);
@@ -493,24 +495,24 @@ void pa_sink_input_cork(pa_sink_input *i, int b) {
     pa_assert(state != PA_SINK_INPUT_DISCONNECTED);
 
     if (b && state != PA_SINK_INPUT_CORKED)
-        pa_atomic_store(i->state, PA_SINK_INPUT_CORKED);
+        pa_atomic_store(&i->state, PA_SINK_INPUT_CORKED);
     else if (!b && state == PA_SINK_INPUT_CORKED)
-        pa_atomic_cmpxchg(i->state, state, PA_SINK_INPUT_DRAINED);
+        pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_DRAINED);
 }
 
 int pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate) {
     pa_sink_input_assert_ref(i);
-    pa_return_val_if_fail(u->thread_info.resampler, -1);
+    pa_return_val_if_fail(i->thread_info.resampler, -1);
 
     if (i->sample_spec.rate == rate)
         return 0;
 
     i->sample_spec.rate = rate;
 
-    pa_asyncmsgq_post(s->asyncmsgq, pa_sink_input_ref(i), PA_SINK_INPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, pa_sink_input_unref, NULL);
+    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, NULL);
 
     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
-    return 0
+    return 0;
 }
 
 void pa_sink_input_set_name(pa_sink_input *i, const char *name) {
@@ -535,9 +537,9 @@ pa_resample_method_t pa_sink_input_get_resample_method(pa_sink_input *i) {
 }
 
 int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
-    pa_resampler *new_resampler = NULL;
-    pa_memblockq *buffer = NULL;
-    pa_sink *origin;
+/*     pa_resampler *new_resampler = NULL; */
+/*     pa_memblockq *buffer = NULL; */
+/*     pa_sink *origin; */
 
     pa_sink_input_assert_ref(i);
     pa_sink_assert_ref(dest);
@@ -702,18 +704,18 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memc
 
     switch (code) {
         case PA_SINK_INPUT_MESSAGE_SET_VOLUME:
-            s->thread_info.soft_volume = *((pa_cvolume*) userdata);
+            i->thread_info.volume = *((pa_cvolume*) userdata);
             return 0;
 
         case PA_SINK_INPUT_MESSAGE_SET_MUTE:
-            s->thread_info.soft_muted = PA_PTR_TO_UINT(userdata);
+            i->thread_info.muted = PA_PTR_TO_UINT(userdata);
             return 0;
 
         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
             pa_usec_t *r = userdata;
 
             if (i->thread_info.resampled_chunk.memblock)
-                *r += pa_bytes_to_usec(i->resampled_chunk.length, &i->sink->sample_spec);
+                *r += pa_bytes_to_usec(i->thread_info.resampled_chunk.length, &i->sink->sample_spec);
 
 /*             if (i->move_silence) */
 /*                 r += pa_bytes_to_usec(i->move_silence, &i->sink->sample_spec); */
@@ -724,7 +726,7 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memc
         case PA_SINK_INPUT_MESSAGE_SET_RATE: {
 
             i->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata);
-            pa_resampler_set_input_rate(i->resampler, PA_PTR_TO_UINT(userdata));
+            pa_resampler_set_input_rate(i->thread_info.resampler, PA_PTR_TO_UINT(userdata));
 
             return 0;
         }
index 338d696277392ee3e6583a03e8a7393cb1eb783f..a8c05b853b949b15a59909664688eb88f7da5f20 100644 (file)
@@ -99,7 +99,7 @@ struct pa_sink_input {
 };
 
 PA_DECLARE_CLASS(pa_sink_input);
-#define PA_SINK_INPUT(o) ((pa_sink_input*) (o))
+#define PA_SINK_INPUT(o) pa_sink_input_cast(o)
 
 enum {
     PA_SINK_INPUT_MESSAGE_SET_VOLUME,
@@ -160,7 +160,7 @@ int pa_sink_input_get_mute(pa_sink_input *i);
 
 void pa_sink_input_cork(pa_sink_input *i, int b);
 
-void pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate);
+int pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate);
 
 pa_resample_method_t pa_sink_input_get_resample_method(pa_sink_input *i);
 
index 0e022d922471cfc043012aff606410e6f1be8f71..7f009048525fddb7de630f8beda2ea8fb8a5dd9d 100644 (file)
@@ -47,6 +47,8 @@
 
 #define MAX_MIX_CHANNELS 32
 
+static PA_DEFINE_CHECK_TYPE(pa_sink, sink_check_type, pa_msgobject_check_type);
+
 static void sink_free(pa_object *s);
 
 pa_sink* pa_sink_new(
@@ -77,7 +79,7 @@ pa_sink* pa_sink_new(
     pa_return_null_if_fail(!driver || pa_utf8_valid(driver));
     pa_return_null_if_fail(name && pa_utf8_valid(name) && *name);
 
-    s = pa_msgobject_new(pa_sink);
+    s = pa_msgobject_new(pa_sink, sink_check_type);
 
     if (!(name = pa_namereg_register(core, name, PA_NAMEREG_SINK, s, fail))) {
         pa_xfree(s);
@@ -88,7 +90,7 @@ pa_sink* pa_sink_new(
     s->parent.process_msg = pa_sink_process_msg;
 
     s->core = core;
-    pa_atomic_store(&s->state, PA_SINK_IDLE);
+    s->state = PA_SINK_IDLE;
     s->name = pa_xstrdup(name);
     s->description = NULL;
     s->driver = pa_xstrdup(driver);
@@ -110,11 +112,10 @@ pa_sink* pa_sink_new(
     s->get_volume = NULL;
     s->set_mute = NULL;
     s->get_mute = NULL;
-    s->start = NULL;
-    s->stop = NULL;
+    s->set_state = NULL;
     s->userdata = NULL;
 
-    pa_assert_se(s->asyncmsgq = pa_asyncmsgq_new(0));
+    s->asyncmsgq = NULL;
 
     r = pa_idxset_put(core->sinks, s, &s->index);
     pa_assert(s->index != PA_IDXSET_INVALID && r >= 0);
@@ -139,56 +140,40 @@ pa_sink* pa_sink_new(
     s->thread_info.inputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
     s->thread_info.soft_volume = s->volume;
     s->thread_info.soft_muted = s->muted;
+    s->thread_info.state = s->state;
 
     pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_NEW, s->index);
 
     return s;
 }
 
-static void sink_start(pa_sink *s) {
-    pa_sink_state_t state;
+static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
+    int ret;
+    
     pa_assert(s);
 
-    state = pa_sink_get_state(s);
-    pa_return_if_fail(state == PA_SINK_IDLE || state == PA_SINK_SUSPENDED);
-
-    pa_atomic_store(&s->state, PA_SINK_RUNNING);
-
-    if (s->start)
-        s->start(s);
-    else
-        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_START, NULL, NULL, NULL);
-}
-
-static void sink_stop(pa_sink *s) {
-    pa_sink_state_t state;
-    int stop;
+    if (s->state == state)
+        return 0;
 
-    pa_assert(s);
-    state = pa_sink_get_state(s);
-    pa_return_if_fail(state == PA_SINK_RUNNING || state == PA_SINK_SUSPENDED);
+    if (s->set_state)
+        if ((ret = s->set_state(s, state)) < 0)
+            return -1;
 
-    stop = state == PA_SINK_RUNNING;
-    pa_atomic_store(&s->state, PA_SINK_IDLE);
+    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+        return -1;
 
-    if (stop) {
-        if (s->stop)
-            s->stop(s);
-        else
-            pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_STOP, NULL, NULL, NULL);
-    }
+    s->state = state;
+    return 0;
 }
 
 void pa_sink_disconnect(pa_sink* s) {
     pa_sink_input *i, *j = NULL;
 
     pa_assert(s);
-    pa_return_if_fail(pa_sink_get_state(s) != PA_SINK_DISCONNECTED);
-
-    sink_stop(s);
+    pa_return_if_fail(s->state != PA_SINK_DISCONNECTED);
 
-    pa_atomic_store(&s->state, PA_SINK_DISCONNECTED);
     pa_namereg_unregister(s->core, s->name);
+    pa_idxset_remove_by_data(s->core->sinks, s, NULL);
 
     pa_hook_fire(&s->core->hook_sink_disconnect, s);
 
@@ -201,26 +186,27 @@ void pa_sink_disconnect(pa_sink* s) {
     if (s->monitor_source)
         pa_source_disconnect(s->monitor_source);
 
-    pa_idxset_remove_by_data(s->core->sinks, s, NULL);
+    sink_set_state(s, PA_SINK_DISCONNECTED);
 
     s->get_latency = NULL;
     s->get_volume = NULL;
     s->set_volume = NULL;
     s->set_mute = NULL;
     s->get_mute = NULL;
-    s->start = NULL;
-    s->stop = NULL;
+    s->set_state = NULL;
 
     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_REMOVE, s->index);
 }
 
 static void sink_free(pa_object *o) {
     pa_sink *s = PA_SINK(o);
+    pa_sink_input *i;
 
     pa_assert(s);
     pa_assert(pa_sink_refcnt(s) == 0);
 
-    pa_sink_disconnect(s);
+    if (s->state != PA_SINK_DISCONNECTED)
+        pa_sink_disconnect(s);
 
     pa_log_info("Freeing sink %u \"%s\"", s->index, s->name);
 
@@ -231,9 +217,10 @@ static void sink_free(pa_object *o) {
 
     pa_idxset_free(s->inputs, NULL, NULL);
 
-    pa_hashmap_free(s->thread_info.inputs, (pa_free2_cb_t) pa_sink_input_unref, NULL);
-
-    pa_asyncmsgq_free(s->asyncmsgq);
+    while ((i = pa_hashmap_steal_first(s->thread_info.inputs)))
+        pa_sink_input_unref(i);
+    
+    pa_hashmap_free(s->thread_info.inputs, NULL, NULL);
 
     pa_xfree(s->name);
     pa_xfree(s->description);
@@ -241,44 +228,38 @@ static void sink_free(pa_object *o) {
     pa_xfree(s);
 }
 
-void pa_sink_update_status(pa_sink*s) {
+void pa_sink_set_asyncmsgq(pa_sink *s, pa_asyncmsgq *q) {
     pa_sink_assert_ref(s);
+    pa_assert(q);
 
-    if (pa_sink_get_state(s) == PA_SINK_SUSPENDED)
-        return;
+    s->asyncmsgq = q;
 
-    if (pa_sink_used_by(s) > 0)
-        sink_start(s);
-    else
-        sink_stop(s);
+    if (s->monitor_source)
+        pa_source_set_asyncmsgq(s->monitor_source, q);
 }
 
-void pa_sink_suspend(pa_sink *s, int suspend) {
-    pa_sink_state_t state;
-
+int pa_sink_update_status(pa_sink*s) {
     pa_sink_assert_ref(s);
 
-    state = pa_sink_get_state(s);
-    pa_return_if_fail(suspend && (state == PA_SINK_RUNNING || state == PA_SINK_IDLE));
-    pa_return_if_fail(!suspend && (state == PA_SINK_SUSPENDED));
+    if (s->state == PA_SINK_SUSPENDED)
+        return 0;
 
+    return sink_set_state(s, pa_sink_used_by(s) ? PA_SINK_RUNNING : PA_SINK_IDLE);
+}
 
-    if (suspend) {
-        pa_atomic_store(&s->state, PA_SINK_SUSPENDED);
+int pa_sink_suspend(pa_sink *s, int suspend) {
+    pa_sink_assert_ref(s);
 
-        if (s->stop)
-            s->stop(s);
-        else
-            pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_STOP, NULL, NULL, NULL);
+    if (suspend)
+        return sink_set_state(s, PA_SINK_SUSPENDED);
+    else
+        return sink_set_state(s, pa_sink_used_by(s) ? PA_SINK_RUNNING : PA_SINK_IDLE);
+}
 
-    } else {
-        pa_atomic_store(&s->state, PA_SINK_RUNNING);
+void pa_sink_ping(pa_sink *s) {
+    pa_sink_assert_ref(s);
 
-        if (s->start)
-            s->start(s);
-        else
-            pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_START, NULL, NULL, NULL);
-    }
+    pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_PING, NULL, NULL, NULL);
 }
 
 static unsigned fill_mix_info(pa_sink *s, pa_mix_info *info, unsigned maxinfo) {
@@ -652,7 +633,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *
     pa_sink *s = PA_SINK(o);
     pa_sink_assert_ref(s);
 
-    switch (code) {
+    switch ((pa_sink_message_t) code) {
         case PA_SINK_MESSAGE_ADD_INPUT: {
             pa_sink_input *i = userdata;
             pa_hashmap_put(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index), pa_sink_input_ref(i));
@@ -681,7 +662,17 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *
             *((int*) userdata) = s->thread_info.soft_muted;
             return 0;
 
-        default:
-            return -1;
+        case PA_SINK_MESSAGE_PING:
+            return 0;
+
+        case PA_SINK_MESSAGE_SET_STATE:
+            s->thread_info.state = PA_PTR_TO_UINT(userdata);
+            return 0;
+            
+        case PA_SINK_MESSAGE_GET_LATENCY:
+        case PA_SINK_MESSAGE_MAX:
+            ;
     }
+
+    return -1;
 }
index 2939cc47c6265cc1140df7c879fdaa1d31837e60..0b308e53df9b125db6baefd7a7e9a2a109d074e8 100644 (file)
@@ -55,7 +55,7 @@ struct pa_sink {
 
     uint32_t index;
     pa_core *core;
-    pa_atomic_t state;
+    pa_sink_state_t state;
 
     char *name;
     char *description, *driver;            /* may be NULL */
@@ -74,8 +74,7 @@ struct pa_sink {
     int refresh_volume;
     int refresh_mute;
 
-    int (*start)(pa_sink *s);
-    int (*stop)(pa_sink *s);
+    int (*set_state)(pa_sink *s, pa_sink_state_t state);
     int (*set_volume)(pa_sink *s);      /* dito */
     int (*get_volume)(pa_sink *s);      /* dito */
     int (*get_mute)(pa_sink *s);        /* dito */
@@ -87,6 +86,7 @@ struct pa_sink {
     /* Contains copies of the above data so that the real-time worker
      * thread can work without access locking */
     struct {
+        pa_sink_state_t state;
         pa_hashmap *inputs;
         pa_cvolume soft_volume;
         int soft_muted;
@@ -96,7 +96,7 @@ struct pa_sink {
 };
 
 PA_DECLARE_CLASS(pa_sink);
-#define PA_SINK(s) ((pa_sink*) (s))
+#define PA_SINK(s) (pa_sink_cast(s))
 
 typedef enum pa_sink_message {
     PA_SINK_MESSAGE_ADD_INPUT,
@@ -106,8 +106,8 @@ typedef enum pa_sink_message {
     PA_SINK_MESSAGE_GET_MUTE,
     PA_SINK_MESSAGE_SET_MUTE,
     PA_SINK_MESSAGE_GET_LATENCY,
-    PA_SINK_MESSAGE_START,
-    PA_SINK_MESSAGE_STOP,
+    PA_SINK_MESSAGE_SET_STATE,
+    PA_SINK_MESSAGE_PING,
     PA_SINK_MESSAGE_MAX
 } pa_sink_message_t;
 
@@ -125,13 +125,19 @@ void pa_sink_disconnect(pa_sink* s);
 
 void pa_sink_set_module(pa_sink *sink, pa_module *m);
 void pa_sink_set_description(pa_sink *s, const char *description);
+void pa_sink_set_asyncmsgq(pa_sink *s, pa_asyncmsgq *q);
 
 /* Usable by everyone */
 
 pa_usec_t pa_sink_get_latency(pa_sink *s);
 
-void pa_sink_update_status(pa_sink*s);
-void pa_sink_suspend(pa_sink *s, int suspend);
+int pa_sink_update_status(pa_sink*s);
+int pa_sink_suspend(pa_sink *s, int suspend);
+
+/* Sends a ping message to the sink thread, to make it wake up and
+ * check for data to process even if there is no real message is
+ * sent */
+void pa_sink_ping(pa_sink *s); 
 
 void pa_sink_set_volume(pa_sink *sink, const pa_cvolume *volume);
 const pa_cvolume *pa_sink_get_volume(pa_sink *sink);
@@ -139,7 +145,7 @@ void pa_sink_set_mute(pa_sink *sink, int mute);
 int pa_sink_get_mute(pa_sink *sink);
 
 unsigned pa_sink_used_by(pa_sink *s);
-#define pa_sink_get_state(s) ((pa_sink_state_t) pa_atomic_load(&(s)->state))
+#define pa_sink_get_state(s) ((s)->state)
 
 /* To be used exclusively by the sink driver thread */
 
@@ -149,5 +155,5 @@ int pa_sink_render_into(pa_sink*s, pa_memchunk *target);
 void pa_sink_render_into_full(pa_sink *s, pa_memchunk *target);
 
 int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
-
+    
 #endif
index a682ee6c381463d224850463a17b1429d8a9ccdd..974c053ac6755cb0ef9c8711a3484edfc22f7ad9 100644 (file)
@@ -200,7 +200,7 @@ int pa_play_file(
     u->sink_input->kill = sink_input_kill;
     u->sink_input->userdata = u;
 
-    pa_sink_notify(u->sink_input->sink);
+/*     pa_sink_notify(u->sink_input->sink); */
 
     return 0;
 
index 517c033dcc4c9fd2c3274a87768f4716e18a791d..2211f2518eb8b24f0b58747c829082909f1f6d78 100644 (file)
 
 #include "source-output.h"
 
+static PA_DEFINE_CHECK_TYPE(pa_source_output, source_output_check_type, pa_msgobject_check_type);
+
+static void source_output_free(pa_object* mo);
+
 pa_source_output_new_data* pa_source_output_new_data_init(pa_source_output_new_data *data) {
     pa_assert(data);
 
@@ -126,13 +130,12 @@ pa_source_output* pa_source_output_new(
         data->resample_method = pa_resampler_get_method(resampler);
     }
 
-    o = pa_source_output_new(pa_source_output);
-
+    o = pa_msgobject_new(pa_source_output, source_output_check_type);
     o->parent.parent.free = source_output_free;
     o->parent.process_msg = pa_source_output_process_msg;
 
     o->core = core;
-    pa_atomic_load(&o->state, PA_SOURCE_OUTPUT_RUNNING);
+    pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_RUNNING);
     o->flags = flags;
     o->name = pa_xstrdup(data->name);
     o->driver = pa_xstrdup(data->driver);
@@ -168,27 +171,29 @@ pa_source_output* pa_source_output_new(
 
 void pa_source_output_disconnect(pa_source_output*o) {
     pa_assert(o);
-    pa_return_if_fail(pa_source_output_get_state(i) != PA_SOURCE_OUTPUT_DISCONNECTED);
+    pa_return_if_fail(pa_source_output_get_state(o) != PA_SOURCE_OUTPUT_DISCONNECTED);
     pa_assert(o->source);
     pa_assert(o->source->core);
 
-    pa_asyncmsgq_send(i->sink->asyncmsgq, i->sink, PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, NULL);
+    pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, NULL);
 
     pa_idxset_remove_by_data(o->source->core->source_outputs, o, NULL);
     pa_idxset_remove_by_data(o->source->outputs, o, NULL);
 
     pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_REMOVE, o->index);
-    o->source = NULL;
 
+    pa_source_update_status(o->source);
+
+    o->source = NULL;
     o->process_msg = NULL;
     o->push = NULL;
     o->kill = NULL;
     o->get_latency = NULL;
 
-    pa_atomic_load(&i->state, PA_SOURCE_OUTPUT_DISCONNECTED);
+    pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_DISCONNECTED);
 }
 
-static void source_output_free(pa_msgobject* mo) {
+static void source_output_free(pa_object* mo) {
     pa_source_output *o = PA_SOURCE_OUTPUT(mo);
 
     pa_assert(pa_source_output_refcnt(o) == 0);
@@ -208,10 +213,10 @@ static void source_output_free(pa_msgobject* mo) {
 void pa_source_output_put(pa_source_output *o) {
     pa_source_output_assert_ref(o);
 
-    pa_asyncmsgq_post(o->source->asyncmsgq, o->source, PA_SOURCE_MESSAGE_ADD_OUTPUT, o, NULL, pa_source_unref, pa_source_output_unref);
+    pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, pa_source_output_ref(o), NULL, (pa_free_cb_t) pa_source_output_unref);
     pa_source_update_status(o->source);
 
-    pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index);
+    pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index);
 }
 
 void pa_source_output_kill(pa_source_output*o) {
@@ -226,7 +231,7 @@ pa_usec_t pa_source_output_get_latency(pa_source_output *o) {
 
     pa_source_output_assert_ref(o);
 
-    if (pa_asyncmsgq_send(o->source->asyncmsgq, i->source, PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
+    if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
         r = 0;
 
     if (o->get_latency)
@@ -250,12 +255,12 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) {
 
     pa_assert(state = PA_SOURCE_OUTPUT_RUNNING);
 
-    if (!o->resampler) {
+    if (!o->thread_info.resampler) {
         o->push(o, chunk);
         return;
     }
 
-    pa_resampler_run(o->resampler, chunk, &rchunk);
+    pa_resampler_run(o->thread_info.resampler, chunk, &rchunk);
     if (!rchunk.length)
         return;
 
@@ -265,7 +270,6 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) {
 }
 
 void pa_source_output_cork(pa_source_output *o, int b) {
-    int n;
     pa_source_output_state_t state;
 
     pa_source_output_assert_ref(o);
@@ -274,23 +278,23 @@ void pa_source_output_cork(pa_source_output *o, int b) {
     pa_assert(state != PA_SOURCE_OUTPUT_DISCONNECTED);
 
     if (b && state != PA_SOURCE_OUTPUT_CORKED)
-        pa_atomic_store(o->state, PA_SOURCE_OUTPUT_CORKED);
+        pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_CORKED);
     else if (!b && state == PA_SOURCE_OUTPUT_CORKED)
-        pa_atomic_cmpxchg(o->state, state, PA_SOURCE_OUTPUT_RUNNING);
+        pa_atomic_cmpxchg(&o->state, state, PA_SOURCE_OUTPUT_RUNNING);
 }
 
 int pa_source_output_set_rate(pa_source_output *o, uint32_t rate) {
     pa_source_output_assert_ref(o);
     pa_return_val_if_fail(o->thread_info.resampler, -1);
 
-    if (i->sample_spec.rate == rate)
+    if (o->sample_spec.rate == rate)
         return 0;
 
-    i->sample_spec.rate = rate;
+    o->sample_spec.rate = rate;
 
-    pa_asyncmsgq_post(s->asyncmsgq, pa_source_output_ref(i), PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, pa_source_output_unref, NULL);
+    pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, NULL);
 
-    pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT!|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
+    pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, o->index);
     return 0;
 }
 
@@ -316,8 +320,8 @@ pa_resample_method_t pa_source_output_get_resample_method(pa_source_output *o) {
 }
 
 int pa_source_output_move_to(pa_source_output *o, pa_source *dest) {
-    pa_source *origin;
-    pa_resampler *new_resampler = NULL;
+/*     pa_source *origin; */
+/*     pa_resampler *new_resampler = NULL; */
 
     pa_source_output_assert_ref(o);
     pa_source_assert_ref(dest);
@@ -344,7 +348,7 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) {
 /*     else if (!pa_sample_spec_equal(&o->sample_spec, &dest->sample_spec) || */
 /*         !pa_channel_map_equal(&o->channel_map, &dest->channel_map)) { */
 
-/*         /\* Okey, we need a new resampler for the new sink *\/ */
+/*         /\* Okey, we need a new resampler for the new source *\/ */
 
 /*         if (!(new_resampler = pa_resampler_new( */
 /*                       dest->core->mempool, */
@@ -376,16 +380,16 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) {
 }
 
 int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, pa_memchunk* chunk) {
-    pa_source_output *o = PA_SOURCE_OUTPUT(o);
+    pa_source_output *o = PA_SOURCE_OUTPUT(mo);
 
-    pa_source_output_assert_ref(i);
+    pa_source_output_assert_ref(o);
 
     switch (code) {
 
         case PA_SOURCE_OUTPUT_MESSAGE_SET_RATE: {
 
-            i->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata);
-            pa_resampler_set_output_rate(i->resampler, PA_PTR_TO_UINT(userdata));
+            o->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata);
+            pa_resampler_set_output_rate(o->thread_info.resampler, PA_PTR_TO_UINT(userdata));
 
             return 0;
         }
index e7c2c131a456f170dca5928262758b9fdb9aeb6a..d3bc0bc467428be64fd0e39476f72a9a338de07a 100644 (file)
@@ -80,7 +80,7 @@ struct pa_source_output {
 };
 
 PA_DECLARE_CLASS(pa_source_output);
-#define PA_SOURCE_OUTPUT(o) ((pa_source_output*) (o))
+#define PA_SOURCE_OUTPUT(o) pa_source_output_cast(o)
 
 enum {
     PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY,
@@ -129,7 +129,7 @@ pa_usec_t pa_source_output_get_latency(pa_source_output *i);
 
 void pa_source_output_cork(pa_source_output *i, int b);
 
-void pa_source_output_set_rate(pa_source_output *o, uint32_t rate);
+int pa_source_output_set_rate(pa_source_output *o, uint32_t rate);
 
 pa_resample_method_t pa_source_output_get_resample_method(pa_source_output *o);
 
index 7d0133875258e9b738e54ad9c4f4803861600aa9..f0a898f4ed1276b68ab0ad2ddf20e8d7e78b7a75 100644 (file)
 
 #include "source.h"
 
+static PA_DEFINE_CHECK_TYPE(pa_source, source_check_type, pa_msgobject_check_type);
+
+static void source_free(pa_object *o);
+
 pa_source* pa_source_new(
         pa_core *core,
         const char *driver,
@@ -69,7 +73,7 @@ pa_source* pa_source_new(
     pa_return_null_if_fail(!driver || pa_utf8_valid(driver));
     pa_return_null_if_fail(pa_utf8_valid(name) && *name);
 
-    s = pa_msgobject_new(pa_source);
+    s = pa_msgobject_new(pa_source, source_check_type);
 
     if (!(name = pa_namereg_register(core, name, PA_NAMEREG_SOURCE, s, fail))) {
         pa_xfree(s);
@@ -80,7 +84,7 @@ pa_source* pa_source_new(
     s->parent.process_msg = pa_source_process_msg;
 
     s->core = core;
-    pa_atomic_store(&s->state, PA_SOURCE_IDLE);
+    s->state = PA_SOURCE_IDLE;
     s->name = pa_xstrdup(name);
     s->description = NULL;
     s->driver = pa_xstrdup(driver);
@@ -94,7 +98,7 @@ pa_source* pa_source_new(
 
     pa_cvolume_reset(&s->volume, spec->channels);
     s->muted = 0;
-    s->refresh_volume = s->refresh_mute = 0;
+    s->refresh_volume = s->refresh_muted = 0;
 
     s->is_hardware = 0;
 
@@ -103,11 +107,10 @@ pa_source* pa_source_new(
     s->get_volume = NULL;
     s->set_mute = NULL;
     s->get_mute = NULL;
-    s->start = NULL;
-    s->stop = NULL;
+    s->set_state = NULL;
     s->userdata = NULL;
 
-    pa_assert_se(s->asyncmsgq = pa_asyncmsgq_new(0));
+    s->asyncmsgq = NULL;
 
     r = pa_idxset_put(core->sources, s, &s->index);
     assert(s->index != PA_IDXSET_INVALID && r >= 0);
@@ -118,56 +121,40 @@ pa_source* pa_source_new(
     s->thread_info.outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
     s->thread_info.soft_volume = s->volume;
     s->thread_info.soft_muted = s->muted;
+    s->thread_info.state = s->state;
 
     pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_NEW, s->index);
 
     return s;
 }
 
-static void source_start(pa_source *s) {
-    pa_source_state_t state;
+static int source_set_state(pa_source *s, pa_source_state_t state) {
+    int ret;
+    
     pa_assert(s);
 
-    state = pa_source_get_state(s);
-    pa_return_if_fail(state == PA_SOURCE_IDLE || state == PA_SOURCE_SUSPENDED);
-
-    pa_atomic_store(&s->state, PA_SOURCE_RUNNING);
-
-    if (s->start)
-        s->start(s);
-    else
-        pa_asyncmsgq_post(s->asyncmsgq, s, PA_SOURCE_MESSAGE_START, NULL, NULL, pa_source_unref, NULL);
-}
-
-static void source_stop(pa_source *s) {
-    pa_source_state_t state;
-    int stop;
+    if (s->state == state)
+        return 0;
 
-    pa_assert(s);
-    state = pa_source_get_state(s);
-    pa_return_if_fail(state == PA_SOURCE_RUNNING || state == PA_SOURCE_SUSPENDED);
+    if (s->set_state)
+        if ((ret = s->set_state(s, state)) < 0)
+            return -1;
 
-    stop = state == PA_SOURCE_RUNNING;
-    pa_atomic_store(&s->state, PA_SOURCE_IDLE);
+    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+        return -1;
 
-    if (stop) {
-        if (s->stop)
-            s->stop(s);
-        else
-            pa_asyncmsgq_post(s->asyncmsgq, s, PA_SOURCE_MESSAGE_STOP, NULL, NULL, pa_source_unref, NULL);
-    }
+    s->state = state;
+    return 0;
 }
 
 void pa_source_disconnect(pa_source *s) {
     pa_source_output *o, *j = NULL;
 
     pa_assert(s);
-    pa_return_if_fail(pa_sink_get_state(s) != PA_SINK_DISCONNECT);
-
-    source_stop(s);
+    pa_return_if_fail(s->state != PA_SOURCE_DISCONNECTED);
 
-    pa_atomic_store(&s->state, PA_SOURCE_DISCONNECTED);
     pa_namereg_unregister(s->core, s->name);
+    pa_idxset_remove_by_data(s->core->sources, s, NULL);
 
     pa_hook_fire(&s->core->hook_source_disconnect, s);
 
@@ -177,33 +164,36 @@ void pa_source_disconnect(pa_source *s) {
         j = o;
     }
 
-    pa_idxset_remove_by_data(s->core->sources, s, NULL);
+    source_set_state(s, PA_SOURCE_DISCONNECTED);
 
     s->get_latency = NULL;
     s->get_volume = NULL;
     s->set_volume = NULL;
     s->set_mute = NULL;
     s->get_mute = NULL;
-    s->start = NULL;
-    s->stop = NULL;
+    s->set_state = NULL;
 
     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index);
 }
 
-static void source_free(pa_msgobject *o) {
+static void source_free(pa_object *o) {
+    pa_source_output *so;
     pa_source *s = PA_SOURCE(o);
 
     pa_assert(s);
     pa_assert(pa_source_refcnt(s) == 0);
 
-    pa_source_disconnect(s);
+    if (s->state != PA_SOURCE_DISCONNECTED)
+        pa_source_disconnect(s);
 
     pa_log_info("Freeing source %u \"%s\"", s->index, s->name);
 
     pa_idxset_free(s->outputs, NULL, NULL);
-    pa_hashmap_free(s->thread_info.outputs, pa_sink_output_unref, NULL);
 
-    pa_asyncmsgq_free(s->asyncmsgq);
+    while ((so = pa_hashmap_steal_first(s->thread_info.outputs)))
+        pa_source_output_unref(so);
+    
+    pa_hashmap_free(s->thread_info.outputs, NULL, NULL);
 
     pa_xfree(s->name);
     pa_xfree(s->description);
@@ -211,44 +201,28 @@ static void source_free(pa_msgobject *o) {
     pa_xfree(s);
 }
 
-void pa_source_update_status(pa_source*s) {
+int pa_source_update_status(pa_source*s) {
     pa_source_assert_ref(s);
 
-    if (pa_source_get_state(s) == PA_SOURCE_STATE_SUSPENDED)
-        return;
+    if (s->state == PA_SOURCE_SUSPENDED)
+        return 0;
 
-    if (pa_source_used_by(s) > 0)
-        source_start(s);
-    else
-        source_stop(s);
+    return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
 }
 
-void pa_source_suspend(pa_source *s, int suspend) {
-    pa_source_state_t state;
-
+int pa_source_suspend(pa_source *s, int suspend) {
     pa_source_assert_ref(s);
 
-    state = pa_source_get_state(s);
-    pa_return_if_fail(suspend && (s->state == PA_SOURCE_RUNNING || s->state == PA_SOURCE_IDLE));
-    pa_return_if_fail(!suspend && (s->state == PA_SOURCE_SUSPENDED));
-
-
-    if (suspend) {
-        pa_atomic_store(&s->state, PA_SOURCE_SUSPENDED);
-
-        if (s->stop)
-            s->stop(s);
-        else
-            pa_asyncmsgq_post(s->asyncmsgq, s, PA_SOURCE_MESSAGE_STOP, NULL, NULL, pa_source_unref, NULL);
+    if (suspend)
+        return source_set_state(s, PA_SOURCE_SUSPENDED);
+    else
+        return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
+}
 
-    } else {
-        pa_atomic_store(&s->state, PA_SOURCE_RUNNING);
+void pa_source_ping(pa_source *s) {
+    pa_source_assert_ref(s);
 
-        if (s->start)
-            s->start(s);
-        else
-            pa_asyncmsgq_post(s->asyncmsgq, s, PA_SOURCE_MESSAGE_START, NULL, NULL, pa_source_unref, NULL);
-    }
+    pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_PING, NULL, NULL, NULL);
 }
 
 void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
@@ -258,16 +232,16 @@ void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
     pa_source_assert_ref(s);
     pa_assert(chunk);
 
-    if (s->sw_muted || !pa_cvolume_is_norm(&s->sw_volume)) {
+    if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
         pa_memchunk vchunk = *chunk;
 
         pa_memblock_ref(vchunk.memblock);
         pa_memchunk_make_writable(&vchunk, 0);
 
-        if (s->thread_info.muted || pa_cvolume_is_muted(s->thread_info.volume))
+        if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
             pa_silence_memchunk(&vchunk, &s->sample_spec);
         else
-            pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.volume);
+            pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
 
         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
             pa_source_output_push(o, &vchunk);
@@ -289,32 +263,33 @@ pa_usec_t pa_source_get_latency(pa_source *s) {
     if (s->get_latency)
         return s->get_latency(s);
 
-    if (pa_asyncmsgq_send(s->asyncmsgq, s, PA_SOURCE_MESSAGE_GET_LATENCY, &usec, NULL) < 0)
+    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY, &usec, NULL) < 0)
         return 0;
 
     return usec;
 }
 
 void pa_source_set_volume(pa_source *s, const pa_cvolume *volume) {
-    pa_cvolume *v;
+    int changed;
 
     pa_source_assert_ref(s);
     pa_assert(volume);
 
-    changed = !pa_cvolume_equal(volume, s->volume);
+    changed = !pa_cvolume_equal(volume, &s->volume);
     s->volume = *volume;
 
     if (s->set_volume && s->set_volume(s) < 0)
         s->set_volume = NULL;
 
     if (!s->set_volume)
-        pa_asyncmsgq_post(s->asyncmsgq, pa_source_ref(s), PA_SOURCE_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), pa_source_unref, pa_xfree);
+        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree);
 
     if (changed)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
 }
 
 const pa_cvolume *pa_source_get_volume(pa_source *s) {
+    pa_cvolume old_volume;
     pa_source_assert_ref(s);
 
     old_volume = s->volume;
@@ -323,7 +298,7 @@ const pa_cvolume *pa_source_get_volume(pa_source *s) {
         s->get_volume = NULL;
 
     if (!s->get_volume && s->refresh_volume)
-        pa_asyncmsgq_send(s->asyncmsgq, s, PA_SOURCE_MESSAGE_GET_VOLUME, &s->volume);
+        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_VOLUME, &s->volume, NULL);
 
     if (!pa_cvolume_equal(&old_volume, &s->volume))
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -331,7 +306,7 @@ const pa_cvolume *pa_source_get_volume(pa_source *s) {
     return &s->volume;
 }
 
-void pa_source_set_mute(pa_source *s, pa_mixer_t m, int mute) {
+void pa_source_set_mute(pa_source *s, int mute) {
     int changed;
 
     pa_source_assert_ref(s);
@@ -342,13 +317,13 @@ void pa_source_set_mute(pa_source *s, pa_mixer_t m, int mute) {
         s->set_mute = NULL;
 
     if (!s->set_mute)
-        pa_asyncmsgq_post(s->asyncmsgq, pa_source_ref(s), PA_SOURCE_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), pa_source_unref, NULL);
+        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
 
     if (changed)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
 }
 
-int pa_source_get_mute(pa_source *s, pa_mixer_t m) {
+int pa_source_get_mute(pa_source *s) {
     int old_muted;
 
     pa_source_assert_ref(s);
@@ -358,8 +333,8 @@ int pa_source_get_mute(pa_source *s, pa_mixer_t m) {
     if (s->get_mute && s->get_mute(s) < 0)
         s->get_mute = NULL;
 
-    if (!s->get_mute && s->refresh_mute)
-        pa_asyncmsgq_send(s->asyncmsgq, s, PA_SOURCE_MESSAGE_GET_MUTE, &s->muted);
+    if (!s->get_mute && s->refresh_muted)
+        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MUTE, &s->muted, NULL);
 
     if (old_muted != s->muted)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
@@ -393,26 +368,33 @@ void pa_source_set_description(pa_source *s, const char *description) {
     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
 }
 
+void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q) {
+    pa_source_assert_ref(s);
+    pa_assert(q);
+
+    s->asyncmsgq = q;
+}
+
 unsigned pa_source_used_by(pa_source *s) {
     pa_source_assert_ref(s);
 
     return pa_idxset_size(s->outputs);
 }
 
-int pa_source_process_msg(pa_msgobject *o, void *object, int code, pa_memchunk *chunk, void *userdata) {
+int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
     pa_source *s = PA_SOURCE(o);
     pa_source_assert_ref(s);
 
-    switch (code) {
+    switch ((pa_source_message_t) code) {
         case PA_SOURCE_MESSAGE_ADD_OUTPUT: {
             pa_source_output *i = userdata;
             pa_hashmap_put(s->thread_info.outputs, PA_UINT32_TO_PTR(i->index), pa_source_output_ref(i));
             return 0;
         }
 
-        case PA_SOURCE_MESSAGE_REMOVE_INPUT: {
-            pa_source_input *i = userdata;
-            pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(i->index), pa_source_output_ref(i));
+        case PA_SOURCE_MESSAGE_REMOVE_OUTPUT: {
+            pa_source_output *i = userdata;
+            pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(i->index));
             return 0;
         }
 
@@ -432,7 +414,17 @@ int pa_source_process_msg(pa_msgobject *o, void *object, int code, pa_memchunk *
             *((int*) userdata) = s->thread_info.soft_muted;
             return 0;
 
-        default:
-            return -1;
+        case PA_SOURCE_MESSAGE_PING:
+            return 0;
+
+        case PA_SOURCE_MESSAGE_SET_STATE:
+            s->thread_info.state = PA_PTR_TO_UINT(userdata);
+            return 0;
+            
+        case PA_SOURCE_MESSAGE_GET_LATENCY:
+        case PA_SOURCE_MESSAGE_MAX:
+            ;
     }
+
+    return -1;
 }
index b41b1bc3225dbc2b13fdddda49998e02331671c1..4db2dedf0826148f0d1f4ae628dc0a2f1ce281f7 100644 (file)
@@ -57,7 +57,7 @@ struct pa_source {
 
     uint32_t index;
     pa_core *core;
-    pa_atomic_t state;
+    pa_source_state_t state;
 
     char *name;
     char *description, *driver;              /* may be NULL */
@@ -74,10 +74,9 @@ struct pa_source {
     pa_cvolume volume;
     int muted;
     int refresh_volume;
-    int referesh_mute;
+    int refresh_muted;
 
-    void (*start)(pa_source*source);         /* may be NULL */
-    void (*stop)(pa_source*source);          /* may be NULL */
+    int (*set_state)(pa_source*source, pa_source_state_t state);          /* may be NULL */
     int (*set_volume)(pa_source *s);         /* dito */
     int (*get_volume)(pa_source *s);         /* dito */
     int (*set_mute)(pa_source *s);           /* dito */
@@ -87,6 +86,7 @@ struct pa_source {
     pa_asyncmsgq *asyncmsgq;
 
     struct {
+        pa_source_state_t state;
         pa_hashmap *outputs;
         pa_cvolume soft_volume;
         int soft_muted;
@@ -96,7 +96,7 @@ struct pa_source {
 };
 
 PA_DECLARE_CLASS(pa_source);
-#define PA_SOURCE(s) ((pa_source*) (s))
+#define PA_SOURCE(s) pa_source_cast(s)
 
 typedef enum pa_source_message {
     PA_SOURCE_MESSAGE_ADD_OUTPUT,
@@ -106,8 +106,8 @@ typedef enum pa_source_message {
     PA_SOURCE_MESSAGE_GET_MUTE,
     PA_SOURCE_MESSAGE_SET_MUTE,
     PA_SOURCE_MESSAGE_GET_LATENCY,
-    PA_SOURCE_MESSAGE_START,
-    PA_SOURCE_MESSAGE_STOP,
+    PA_SOURCE_MESSAGE_SET_STATE,
+    PA_SOURCE_MESSAGE_PING,
     PA_SOURCE_MESSAGE_MAX
 } pa_source_message_t;
 
@@ -125,13 +125,15 @@ void pa_source_disconnect(pa_source *s);
 
 void pa_source_set_module(pa_source *s, pa_module *m);
 void pa_source_set_description(pa_source *s, const char *description);
+void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q);
 
 /* Callable by everyone */
 
 pa_usec_t pa_source_get_latency(pa_source *s);
 
-void pa_source_update_status(pa_source*s);
-void pa_source_suspend(pa_source *s);
+int pa_source_update_status(pa_source*s);
+int pa_source_suspend(pa_source *s, int suspend);
+void pa_source_ping(pa_source *s);
 
 void pa_source_set_volume(pa_source *source, const pa_cvolume *volume);
 const pa_cvolume *pa_source_get_volume(pa_source *source);
@@ -139,11 +141,11 @@ void pa_source_set_mute(pa_source *source, int mute);
 int pa_source_get_mute(pa_source *source);
 
 unsigned pa_source_used_by(pa_source *s);
-#define pa_source_get_state(s) ((pa_source_state_t) pa_atomic_load(&(s)->state))
+#define pa_source_get_state(s) ((pa_source_state_t) (s)->state)
 
 /* To be used exclusively by the source driver thread */
 
 void pa_source_post(pa_source*s, const pa_memchunk *b);
-void pa_source_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
+int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
 
 #endif
index d10b512dd6a2165f95d3f4a943e0fab3c8692a13..847d5be1881c57d0e191d4deedc8ef2ccef2ca05 100644 (file)
@@ -49,7 +49,7 @@ static void the_thread(void *_q) {
     do {
         int code = 0;
 
-        pa_assert_se(pa_asyncmsgq_get(q, NULL, &code, NULL, 1) == 0);
+        pa_assert_se(pa_asyncmsgq_get(q, NULL, &code, NULL, NULL, 1) == 0);
 
         switch (code) {
 
@@ -71,7 +71,7 @@ static void the_thread(void *_q) {
                 break;
         }
 
-        pa_asyncmsgq_done(q);
+        pa_asyncmsgq_done(q, 0);
 
     } while (!quit);
 }
@@ -91,11 +91,11 @@ int main(int argc, char *argv[]) {
 
     printf("Operation B post\n");
     pa_asyncmsgq_post(q, NULL, OPERATION_B, NULL, NULL, NULL);
-
+    
     pa_thread_yield();
 
     printf("Operation C send\n");
-    pa_asyncmsgq_send(q, NULL, OPERATION_C, NULL);
+    pa_asyncmsgq_send(q, NULL, OPERATION_C, NULL, NULL);
 
     pa_thread_yield();