2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
38 #include <pulse/rtclock.h>
39 #include <pulse/timeval.h>
40 #include <pulse/util.h>
41 #include <pulse/version.h>
42 #include <pulse/xmalloc.h>
44 #include <pulsecore/module.h>
45 #include <pulsecore/core-util.h>
46 #include <pulsecore/modargs.h>
47 #include <pulsecore/log.h>
48 #include <pulsecore/core-subscribe.h>
49 #include <pulsecore/pdispatch.h>
50 #include <pulsecore/pstream.h>
51 #include <pulsecore/pstream-util.h>
52 #include <pulsecore/socket-client.h>
53 #include <pulsecore/time-smoother.h>
54 #include <pulsecore/thread.h>
55 #include <pulsecore/thread-mq.h>
56 #include <pulsecore/core-rtclock.h>
57 #include <pulsecore/core-error.h>
58 #include <pulsecore/proplist-util.h>
59 #include <pulsecore/auth-cookie.h>
60 #include <pulsecore/mcalign.h>
61 #include <pulsecore/strlist.h>
64 #include <pulsecore/x11prop.h>
68 #include "module-tunnel-sink-symdef.h"
70 #include "module-tunnel-source-symdef.h"
73 #define ENV_DEFAULT_SINK "PULSE_SINK"
74 #define ENV_DEFAULT_SOURCE "PULSE_SOURCE"
75 #define ENV_DEFAULT_SERVER "PULSE_SERVER"
76 #define ENV_COOKIE_FILE "PULSE_COOKIE"
79 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
81 "sink_name=<name for the local sink> "
82 "sink_properties=<properties for the local sink> "
83 "auto=<determine server/sink/cookie automatically> "
85 "sink=<remote sink name> "
87 "format=<sample format> "
88 "channels=<number of channels> "
90 "channel_map=<channel map>");
92 PA_MODULE_DESCRIPTION("Tunnel module for sources");
94 "source_name=<name for the local source> "
95 "source_properties=<properties for the local source> "
96 "auto=<determine server/source/cookie automatically> "
98 "source=<remote source name> "
100 "format=<sample format> "
101 "channels=<number of channels> "
102 "rate=<sample rate> "
103 "channel_map=<channel map>");
106 PA_MODULE_AUTHOR("Lennart Poettering");
107 PA_MODULE_VERSION(PACKAGE_VERSION
);
108 PA_MODULE_LOAD_ONCE(false);
110 static const char* const valid_modargs
[] = {
130 #define DEFAULT_TIMEOUT 5
132 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
134 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
139 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
140 SINK_MESSAGE_REMOTE_SUSPEND
,
141 SINK_MESSAGE_UPDATE_LATENCY
,
145 #define DEFAULT_TLENGTH_MSEC 150
146 #define DEFAULT_MINREQ_MSEC 25
151 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
152 SOURCE_MESSAGE_REMOTE_SUSPEND
,
153 SOURCE_MESSAGE_UPDATE_LATENCY
156 #define DEFAULT_FRAGSIZE_MSEC 25
161 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
162 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
164 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
165 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
166 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
167 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
168 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
169 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
170 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
172 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
174 [PA_COMMAND_REQUEST
] = command_request
,
175 [PA_COMMAND_STARTED
] = command_started
,
177 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
178 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
179 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
180 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
181 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
182 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
183 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
184 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
185 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
186 [PA_COMMAND_PLAYBACK_STREAM_EVENT
] = command_stream_or_client_event
,
187 [PA_COMMAND_RECORD_STREAM_EVENT
] = command_stream_or_client_event
,
188 [PA_COMMAND_CLIENT_EVENT
] = command_stream_or_client_event
,
189 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
,
190 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
197 pa_thread_mq thread_mq
;
201 pa_socket_client
*client
;
203 pa_pdispatch
*pdispatch
;
209 size_t requested_bytes
;
216 pa_auth_cookie
*auth_cookie
;
220 uint32_t device_index
;
223 int64_t counter
, counter_delta
;
225 bool remote_corked
:1;
226 bool remote_suspended
:1;
228 pa_usec_t transport_usec
; /* maintained in the main thread */
229 pa_usec_t thread_transport_usec
; /* maintained in the IO thread */
231 uint32_t ignore_latency_before
;
233 pa_time_event
*time_event
;
235 pa_smoother
*smoother
;
237 char *device_description
;
251 static void request_latency(struct userdata
*u
);
253 /* Called from main context */
254 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
255 pa_log_debug("Got stream or client event.");
258 /* Called from main context */
259 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
260 struct userdata
*u
= userdata
;
265 pa_assert(u
->pdispatch
== pd
);
267 pa_log_warn("Stream killed");
268 pa_module_unload_request(u
->module
, true);
271 /* Called from main context */
272 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
273 struct userdata
*u
= userdata
;
278 pa_assert(u
->pdispatch
== pd
);
280 pa_log_info("Server signalled buffer overrun/underrun.");
284 /* Called from main context */
285 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
286 struct userdata
*u
= userdata
;
293 pa_assert(u
->pdispatch
== pd
);
295 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
296 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
297 !pa_tagstruct_eof(t
)) {
299 pa_log("Invalid packet.");
300 pa_module_unload_request(u
->module
, true);
304 pa_log_debug("Server reports device suspend.");
307 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
309 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
315 /* Called from main context */
316 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
317 struct userdata
*u
= userdata
;
318 uint32_t channel
, di
;
325 pa_assert(u
->pdispatch
== pd
);
327 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
328 pa_tagstruct_getu32(t
, &di
) < 0 ||
329 pa_tagstruct_gets(t
, &dn
) < 0 ||
330 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
332 pa_log_error("Invalid packet.");
333 pa_module_unload_request(u
->module
, true);
337 pa_log_debug("Server reports a stream move.");
340 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
342 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
348 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
349 struct userdata
*u
= userdata
;
350 uint32_t channel
, maxlength
, tlength
= 0, fragsize
, prebuf
, minreq
;
356 pa_assert(u
->pdispatch
== pd
);
358 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
359 pa_tagstruct_getu32(t
, &maxlength
) < 0) {
361 pa_log_error("Invalid packet.");
362 pa_module_unload_request(u
->module
, true);
366 if (command
== PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
) {
367 if (pa_tagstruct_getu32(t
, &fragsize
) < 0 ||
368 pa_tagstruct_get_usec(t
, &usec
) < 0) {
370 pa_log_error("Invalid packet.");
371 pa_module_unload_request(u
->module
, true);
375 if (pa_tagstruct_getu32(t
, &tlength
) < 0 ||
376 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
377 pa_tagstruct_getu32(t
, &minreq
) < 0 ||
378 pa_tagstruct_get_usec(t
, &usec
) < 0) {
380 pa_log_error("Invalid packet.");
381 pa_module_unload_request(u
->module
, true);
387 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength
, (unsigned long) u
->tlength
);
395 /* Called from main context */
396 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
397 struct userdata
*u
= userdata
;
402 pa_assert(u
->pdispatch
== pd
);
404 pa_log_debug("Server reports playback started.");
410 /* Called from IO thread context */
411 static void check_smoother_status(struct userdata
*u
, bool past
) {
416 x
= pa_rtclock_now();
418 /* Correct by the time the requested issued needs to travel to the
419 * other side. This is a valid thread-safe access, because the
420 * main thread is waiting for us */
423 x
-= u
->thread_transport_usec
;
425 x
+= u
->thread_transport_usec
;
427 if (u
->remote_suspended
|| u
->remote_corked
)
428 pa_smoother_pause(u
->smoother
, x
);
430 pa_smoother_resume(u
->smoother
, x
, true);
433 /* Called from IO thread context */
434 static void stream_cork_within_thread(struct userdata
*u
, bool cork
) {
437 if (u
->remote_corked
== cork
)
440 u
->remote_corked
= cork
;
441 check_smoother_status(u
, false);
444 /* Called from main context */
445 static void stream_cork(struct userdata
*u
, bool cork
) {
452 t
= pa_tagstruct_new(NULL
, 0);
454 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
456 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
458 pa_tagstruct_putu32(t
, u
->ctag
++);
459 pa_tagstruct_putu32(t
, u
->channel
);
460 pa_tagstruct_put_boolean(t
, !!cork
);
461 pa_pstream_send_tagstruct(u
->pstream
, t
);
466 /* Called from IO thread context */
467 static void stream_suspend_within_thread(struct userdata
*u
, bool suspend
) {
470 if (u
->remote_suspended
== suspend
)
473 u
->remote_suspended
= suspend
;
474 check_smoother_status(u
, true);
479 /* Called from IO thread context */
480 static void send_data(struct userdata
*u
) {
483 while (u
->requested_bytes
> 0) {
484 pa_memchunk memchunk
;
486 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
487 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
488 pa_memblock_unref(memchunk
.memblock
);
490 u
->requested_bytes
-= memchunk
.length
;
492 u
->counter
+= (int64_t) memchunk
.length
;
496 /* This function is called from IO context -- except when it is not. */
497 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
498 struct userdata
*u
= PA_SINK(o
)->userdata
;
502 case PA_SINK_MESSAGE_SET_STATE
: {
505 /* First, change the state, because otherwise pa_sink_render() would fail */
506 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
508 stream_cork_within_thread(u
, u
->sink
->thread_info
.state
== PA_SINK_SUSPENDED
);
510 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
517 case PA_SINK_MESSAGE_GET_LATENCY
: {
518 pa_usec_t yl
, yr
, *usec
= data
;
520 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
521 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
523 *usec
= yl
> yr
? yl
- yr
: 0;
527 case SINK_MESSAGE_REQUEST
:
529 pa_assert(offset
> 0);
530 u
->requested_bytes
+= (size_t) offset
;
532 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
537 case SINK_MESSAGE_REMOTE_SUSPEND
:
539 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
542 case SINK_MESSAGE_UPDATE_LATENCY
: {
545 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
547 if (y
> (pa_usec_t
) offset
)
548 y
-= (pa_usec_t
) offset
;
552 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
554 /* We can access this freely here, since the main thread is waiting for us */
555 u
->thread_transport_usec
= u
->transport_usec
;
560 case SINK_MESSAGE_POST
:
562 /* OK, This might be a bit confusing. This message is
563 * delivered to us from the main context -- NOT from the
564 * IO thread context where the rest of the messages are
565 * dispatched. Yeah, ugly, but I am a lazy bastard. */
567 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
569 u
->counter_delta
+= (int64_t) chunk
->length
;
574 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
577 /* Called from main context */
578 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
580 pa_sink_assert_ref(s
);
583 switch ((pa_sink_state_t
) state
) {
585 case PA_SINK_SUSPENDED
:
586 pa_assert(PA_SINK_IS_OPENED(s
->state
));
587 stream_cork(u
, true);
591 case PA_SINK_RUNNING
:
592 if (s
->state
== PA_SINK_SUSPENDED
)
593 stream_cork(u
, false);
596 case PA_SINK_UNLINKED
:
598 case PA_SINK_INVALID_STATE
:
607 /* This function is called from IO context -- except when it is not. */
608 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
609 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
613 case PA_SOURCE_MESSAGE_SET_STATE
: {
616 if ((r
= pa_source_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
617 stream_cork_within_thread(u
, u
->source
->thread_info
.state
== PA_SOURCE_SUSPENDED
);
622 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
623 pa_usec_t yr
, yl
, *usec
= data
;
625 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &PA_SOURCE(o
)->sample_spec
);
626 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
628 *usec
= yr
> yl
? yr
- yl
: 0;
632 case SOURCE_MESSAGE_POST
: {
635 pa_mcalign_push(u
->mcalign
, chunk
);
637 while (pa_mcalign_pop(u
->mcalign
, &c
) >= 0) {
639 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
640 pa_source_post(u
->source
, &c
);
642 pa_memblock_unref(c
.memblock
);
644 u
->counter
+= (int64_t) c
.length
;
650 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
652 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
655 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
658 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->source
->sample_spec
);
659 y
+= (pa_usec_t
) offset
;
661 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
663 /* We can access this freely here, since the main thread is waiting for us */
664 u
->thread_transport_usec
= u
->transport_usec
;
670 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
673 /* Called from main context */
674 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
676 pa_source_assert_ref(s
);
679 switch ((pa_source_state_t
) state
) {
681 case PA_SOURCE_SUSPENDED
:
682 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
683 stream_cork(u
, true);
687 case PA_SOURCE_RUNNING
:
688 if (s
->state
== PA_SOURCE_SUSPENDED
)
689 stream_cork(u
, false);
692 case PA_SOURCE_UNLINKED
:
694 case PA_SINK_INVALID_STATE
:
703 static void thread_func(void *userdata
) {
704 struct userdata
*u
= userdata
;
708 pa_log_debug("Thread starting up");
710 pa_thread_mq_install(&u
->thread_mq
);
716 if (PA_UNLIKELY(u
->sink
->thread_info
.rewind_requested
))
717 pa_sink_process_rewind(u
->sink
, 0);
720 if ((ret
= pa_rtpoll_run(u
->rtpoll
, true)) < 0)
728 /* If this was no regular exit from the loop we have to continue
729 * processing messages until we received PA_MESSAGE_SHUTDOWN */
730 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
731 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
734 pa_log_debug("Thread shutting down");
738 /* Called from main context */
739 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
740 struct userdata
*u
= userdata
;
741 uint32_t bytes
, channel
;
744 pa_assert(command
== PA_COMMAND_REQUEST
);
747 pa_assert(u
->pdispatch
== pd
);
749 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
750 pa_tagstruct_getu32(t
, &bytes
) < 0) {
751 pa_log("Invalid protocol reply");
755 if (channel
!= u
->channel
) {
756 pa_log("Received data for invalid channel");
760 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
764 pa_module_unload_request(u
->module
, true);
769 /* Called from main context */
770 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
771 struct userdata
*u
= userdata
;
772 pa_usec_t sink_usec
, source_usec
;
774 int64_t write_index
, read_index
;
775 struct timeval local
, remote
, now
;
782 if (command
!= PA_COMMAND_REPLY
) {
783 if (command
== PA_COMMAND_ERROR
)
784 pa_log("Failed to get latency.");
786 pa_log("Protocol error.");
790 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
791 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
792 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
793 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
794 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
795 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
796 pa_tagstruct_gets64(t
, &read_index
) < 0) {
797 pa_log("Invalid reply.");
802 if (u
->version
>= 13) {
803 uint64_t underrun_for
= 0, playing_for
= 0;
805 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
806 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
807 pa_log("Invalid reply.");
813 if (!pa_tagstruct_eof(t
)) {
814 pa_log("Invalid reply.");
818 if (tag
< u
->ignore_latency_before
) {
822 pa_gettimeofday(&now
);
824 /* Calculate transport usec */
825 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
826 /* local and remote seem to have synchronized clocks */
828 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
830 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
833 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
835 /* First, take the device's delay */
837 delay
= (int64_t) sink_usec
;
838 ss
= &u
->sink
->sample_spec
;
840 delay
= (int64_t) source_usec
;
841 ss
= &u
->source
->sample_spec
;
844 /* Add the length of our server-side buffer */
845 if (write_index
>= read_index
)
846 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) (write_index
-read_index
), ss
);
848 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) (read_index
-write_index
), ss
);
850 /* Our measurements are already out of date, hence correct by the *
851 * transport latency */
853 delay
-= (int64_t) u
->transport_usec
;
855 delay
+= (int64_t) u
->transport_usec
;
858 /* Now correct by what we have have read/written since we requested the update */
860 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
862 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
866 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
868 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
875 pa_module_unload_request(u
->module
, true);
878 /* Called from main context */
879 static void request_latency(struct userdata
*u
) {
885 t
= pa_tagstruct_new(NULL
, 0);
887 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
889 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
891 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
892 pa_tagstruct_putu32(t
, u
->channel
);
894 pa_tagstruct_put_timeval(t
, pa_gettimeofday(&now
));
896 pa_pstream_send_tagstruct(u
->pstream
, t
);
897 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
899 u
->ignore_latency_before
= tag
;
900 u
->counter_delta
= 0;
903 /* Called from main context */
904 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*t
, void *userdata
) {
905 struct userdata
*u
= userdata
;
913 pa_core_rttime_restart(u
->core
, e
, pa_rtclock_now() + LATENCY_INTERVAL
);
916 /* Called from main context */
917 static void update_description(struct userdata
*u
) {
919 char un
[128], hn
[128];
924 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
927 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
930 pa_sink_set_description(u
->sink
, d
);
931 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
932 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
933 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
935 pa_source_set_description(u
->source
, d
);
936 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
937 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
938 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
943 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
944 pa_get_user_name(un
, sizeof(un
)),
945 pa_get_host_name(hn
, sizeof(hn
)));
947 t
= pa_tagstruct_new(NULL
, 0);
949 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
951 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
953 pa_tagstruct_putu32(t
, u
->ctag
++);
954 pa_tagstruct_putu32(t
, u
->channel
);
955 pa_tagstruct_puts(t
, d
);
956 pa_pstream_send_tagstruct(u
->pstream
, t
);
961 /* Called from main context */
962 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
963 struct userdata
*u
= userdata
;
966 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
972 if (command
!= PA_COMMAND_REPLY
) {
973 if (command
== PA_COMMAND_ERROR
)
974 pa_log("Failed to get info.");
976 pa_log("Protocol error.");
980 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
981 pa_tagstruct_gets(t
, &server_version
) < 0 ||
982 pa_tagstruct_gets(t
, &user_name
) < 0 ||
983 pa_tagstruct_gets(t
, &host_name
) < 0 ||
984 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
985 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
986 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
987 pa_tagstruct_getu32(t
, &cookie
) < 0 ||
988 (u
->version
>= 15 && pa_tagstruct_get_channel_map(t
, &cm
) < 0)) {
990 pa_log("Parse failure");
994 if (!pa_tagstruct_eof(t
)) {
995 pa_log("Packet too long");
999 pa_xfree(u
->server_fqdn
);
1000 u
->server_fqdn
= pa_xstrdup(host_name
);
1002 pa_xfree(u
->user_name
);
1003 u
->user_name
= pa_xstrdup(user_name
);
1005 update_description(u
);
1010 pa_module_unload_request(u
->module
, true);
1013 static int read_ports(struct userdata
*u
, pa_tagstruct
*t
) {
1014 if (u
->version
>= 16) {
1018 if (pa_tagstruct_getu32(t
, &n_ports
)) {
1019 pa_log("Parse failure");
1020 return -PA_ERR_PROTOCOL
;
1023 for (uint32_t j
= 0; j
< n_ports
; j
++) {
1026 if (pa_tagstruct_gets(t
, &s
) < 0 || /* name */
1027 pa_tagstruct_gets(t
, &s
) < 0 || /* description */
1028 pa_tagstruct_getu32(t
, &priority
) < 0) {
1030 pa_log("Parse failure");
1031 return -PA_ERR_PROTOCOL
;
1033 if (u
->version
>= 24 && pa_tagstruct_getu32(t
, &priority
) < 0) { /* available */
1034 pa_log("Parse failure");
1035 return -PA_ERR_PROTOCOL
;
1039 if (pa_tagstruct_gets(t
, &s
) < 0) { /* active port */
1040 pa_log("Parse failure");
1041 return -PA_ERR_PROTOCOL
;
1047 static int read_formats(struct userdata
*u
, pa_tagstruct
*t
) {
1049 pa_format_info
*format
;
1051 if (pa_tagstruct_getu8(t
, &n_formats
) < 0) { /* no. of formats */
1052 pa_log("Parse failure");
1053 return -PA_ERR_PROTOCOL
;
1056 for (uint8_t j
= 0; j
< n_formats
; j
++) {
1057 format
= pa_format_info_new();
1058 if (pa_tagstruct_get_format_info(t
, format
)) { /* format info */
1059 pa_format_info_free(format
);
1060 pa_log("Parse failure");
1061 return -PA_ERR_PROTOCOL
;
1063 pa_format_info_free(format
);
1070 /* Called from main context */
1071 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1072 struct userdata
*u
= userdata
;
1073 uint32_t idx
, owner_module
, monitor_source
, flags
;
1074 const char *name
, *description
, *monitor_source_name
, *driver
;
1084 if (command
!= PA_COMMAND_REPLY
) {
1085 if (command
== PA_COMMAND_ERROR
)
1086 pa_log("Failed to get info.");
1088 pa_log("Protocol error.");
1092 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1093 pa_tagstruct_gets(t
, &name
) < 0 ||
1094 pa_tagstruct_gets(t
, &description
) < 0 ||
1095 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1096 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1097 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1098 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1099 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1100 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
1101 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
1102 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1103 pa_tagstruct_gets(t
, &driver
) < 0 ||
1104 pa_tagstruct_getu32(t
, &flags
) < 0) {
1106 pa_log("Parse failure");
1110 if (u
->version
>= 13) {
1111 pa_usec_t configured_latency
;
1113 if (pa_tagstruct_get_proplist(t
, NULL
) < 0 ||
1114 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1116 pa_log("Parse failure");
1121 if (u
->version
>= 15) {
1122 pa_volume_t base_volume
;
1123 uint32_t state
, n_volume_steps
, card
;
1125 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1126 pa_tagstruct_getu32(t
, &state
) < 0 ||
1127 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1128 pa_tagstruct_getu32(t
, &card
) < 0) {
1130 pa_log("Parse failure");
1135 if (read_ports(u
, t
) < 0)
1138 if (u
->version
>= 21 && read_formats(u
, t
) < 0)
1141 if (!pa_tagstruct_eof(t
)) {
1142 pa_log("Packet too long");
1146 if (!u
->sink_name
|| !pa_streq(name
, u
->sink_name
))
1149 pa_xfree(u
->device_description
);
1150 u
->device_description
= pa_xstrdup(description
);
1152 update_description(u
);
1157 pa_module_unload_request(u
->module
, true);
1160 /* Called from main context */
1161 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1162 struct userdata
*u
= userdata
;
1163 uint32_t idx
, owner_module
, client
, sink
;
1164 pa_usec_t buffer_usec
, sink_usec
;
1165 const char *name
, *driver
, *resample_method
;
1167 pa_sample_spec sample_spec
;
1168 pa_channel_map channel_map
;
1175 if (command
!= PA_COMMAND_REPLY
) {
1176 if (command
== PA_COMMAND_ERROR
)
1177 pa_log("Failed to get info.");
1179 pa_log("Protocol error.");
1183 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1184 pa_tagstruct_gets(t
, &name
) < 0 ||
1185 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1186 pa_tagstruct_getu32(t
, &client
) < 0 ||
1187 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1188 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1189 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1190 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1191 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1192 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1193 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1194 pa_tagstruct_gets(t
, &driver
) < 0) {
1196 pa_log("Parse failure");
1200 if (u
->version
>= 11) {
1201 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1203 pa_log("Parse failure");
1208 if (u
->version
>= 13) {
1209 if (pa_tagstruct_get_proplist(t
, NULL
) < 0) {
1211 pa_log("Parse failure");
1216 if (u
->version
>= 19) {
1217 if (pa_tagstruct_get_boolean(t
, &b
) < 0) {
1219 pa_log("Parse failure");
1224 if (u
->version
>= 20) {
1225 if (pa_tagstruct_get_boolean(t
, &b
) < 0 ||
1226 pa_tagstruct_get_boolean(t
, &b
) < 0) {
1228 pa_log("Parse failure");
1233 if (u
->version
>= 21) {
1234 pa_format_info
*format
= pa_format_info_new();
1236 if (pa_tagstruct_get_format_info(t
, format
) < 0) {
1237 pa_format_info_free(format
);
1238 pa_log("Parse failure");
1241 pa_format_info_free(format
);
1244 if (!pa_tagstruct_eof(t
)) {
1245 pa_log("Packet too long");
1249 if (idx
!= u
->device_index
)
1254 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1255 pa_cvolume_equal(&volume
, &u
->sink
->real_volume
))
1258 pa_sink_volume_changed(u
->sink
, &volume
);
1260 if (u
->version
>= 11)
1261 pa_sink_mute_changed(u
->sink
, mute
);
1266 pa_module_unload_request(u
->module
, true);
1271 /* Called from main context */
1272 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1273 struct userdata
*u
= userdata
;
1274 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1275 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1280 pa_usec_t latency
, configured_latency
;
1285 if (command
!= PA_COMMAND_REPLY
) {
1286 if (command
== PA_COMMAND_ERROR
)
1287 pa_log("Failed to get info.");
1289 pa_log("Protocol error.");
1293 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1294 pa_tagstruct_gets(t
, &name
) < 0 ||
1295 pa_tagstruct_gets(t
, &description
) < 0 ||
1296 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1297 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1298 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1299 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1300 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1301 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1302 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1303 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1304 pa_tagstruct_gets(t
, &driver
) < 0 ||
1305 pa_tagstruct_getu32(t
, &flags
) < 0) {
1307 pa_log("Parse failure");
1311 if (u
->version
>= 13) {
1312 if (pa_tagstruct_get_proplist(t
, NULL
) < 0 ||
1313 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1315 pa_log("Parse failure");
1320 if (u
->version
>= 15) {
1321 pa_volume_t base_volume
;
1322 uint32_t state
, n_volume_steps
, card
;
1324 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1325 pa_tagstruct_getu32(t
, &state
) < 0 ||
1326 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1327 pa_tagstruct_getu32(t
, &card
) < 0) {
1329 pa_log("Parse failure");
1334 if (read_ports(u
, t
) < 0)
1337 if (u
->version
>= 22 && read_formats(u
, t
) < 0)
1340 if (!pa_tagstruct_eof(t
)) {
1341 pa_log("Packet too long");
1345 if (!u
->source_name
|| !pa_streq(name
, u
->source_name
))
1348 pa_xfree(u
->device_description
);
1349 u
->device_description
= pa_xstrdup(description
);
1351 update_description(u
);
1356 pa_module_unload_request(u
->module
, true);
1361 /* Called from main context */
1362 static void request_info(struct userdata
*u
) {
1367 t
= pa_tagstruct_new(NULL
, 0);
1368 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1369 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1370 pa_pstream_send_tagstruct(u
->pstream
, t
);
1371 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1374 t
= pa_tagstruct_new(NULL
, 0);
1375 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1376 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1377 pa_tagstruct_putu32(t
, u
->device_index
);
1378 pa_pstream_send_tagstruct(u
->pstream
, t
);
1379 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1382 t
= pa_tagstruct_new(NULL
, 0);
1383 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1384 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1385 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1386 pa_tagstruct_puts(t
, u
->sink_name
);
1387 pa_pstream_send_tagstruct(u
->pstream
, t
);
1388 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1391 if (u
->source_name
) {
1392 t
= pa_tagstruct_new(NULL
, 0);
1393 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1394 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1395 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1396 pa_tagstruct_puts(t
, u
->source_name
);
1397 pa_pstream_send_tagstruct(u
->pstream
, t
);
1398 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1403 /* Called from main context */
1404 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1405 struct userdata
*u
= userdata
;
1406 pa_subscription_event_type_t e
;
1412 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1414 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1415 pa_tagstruct_getu32(t
, &idx
) < 0) {
1416 pa_log("Invalid protocol reply");
1417 pa_module_unload_request(u
->module
, true);
1421 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1423 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1424 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1426 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1434 /* Called from main context */
1435 static void start_subscribe(struct userdata
*u
) {
1439 t
= pa_tagstruct_new(NULL
, 0);
1440 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1441 pa_tagstruct_putu32(t
, u
->ctag
++);
1442 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1444 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1446 PA_SUBSCRIPTION_MASK_SOURCE
1450 pa_pstream_send_tagstruct(u
->pstream
, t
);
1453 /* Called from main context */
1454 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1455 struct userdata
*u
= userdata
;
1462 pa_assert(u
->pdispatch
== pd
);
1464 if (command
!= PA_COMMAND_REPLY
) {
1465 if (command
== PA_COMMAND_ERROR
)
1466 pa_log("Failed to create stream.");
1468 pa_log("Protocol error.");
1472 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1473 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1475 || pa_tagstruct_getu32(t
, &bytes
) < 0
1480 if (u
->version
>= 9) {
1482 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1483 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1484 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1485 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1488 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1489 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1494 if (u
->version
>= 12) {
1497 uint32_t device_index
;
1501 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1502 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1503 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1504 pa_tagstruct_gets(t
, &dn
) < 0 ||
1505 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1509 pa_xfree(u
->sink_name
);
1510 u
->sink_name
= pa_xstrdup(dn
);
1512 pa_xfree(u
->source_name
);
1513 u
->source_name
= pa_xstrdup(dn
);
1517 if (u
->version
>= 13) {
1520 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1523 /* #ifdef TUNNEL_SINK */
1524 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1526 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1530 if (u
->version
>= 21) {
1531 pa_format_info
*format
= pa_format_info_new();
1533 if (pa_tagstruct_get_format_info(t
, format
) < 0) {
1534 pa_format_info_free(format
);
1538 pa_format_info_free(format
);
1541 if (!pa_tagstruct_eof(t
))
1547 pa_assert(!u
->time_event
);
1548 u
->time_event
= pa_core_rttime_new(u
->core
, pa_rtclock_now() + LATENCY_INTERVAL
, timeout_callback
, u
);
1552 pa_log_debug("Stream created.");
1555 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1561 pa_log("Invalid reply. (Create stream)");
1564 pa_module_unload_request(u
->module
, true);
1568 /* Called from main context */
1569 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1570 struct userdata
*u
= userdata
;
1571 pa_tagstruct
*reply
;
1572 char name
[256], un
[128], hn
[128];
1577 pa_assert(u
->pdispatch
== pd
);
1579 if (command
!= PA_COMMAND_REPLY
||
1580 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1581 !pa_tagstruct_eof(t
)) {
1583 if (command
== PA_COMMAND_ERROR
)
1584 pa_log("Failed to authenticate");
1586 pa_log("Protocol error.");
1591 /* Minimum supported protocol version */
1592 if (u
->version
< 8) {
1593 pa_log("Incompatible protocol version");
1597 /* Starting with protocol version 13 the MSB of the version tag
1598 reflects if shm is enabled for this connection or not. We don't
1599 support SHM here at all, so we just ignore this. */
1601 if (u
->version
>= 13)
1602 u
->version
&= 0x7FFFFFFFU
;
1604 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1607 pa_proplist_setf(u
->sink
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1608 pa_sink_update_proplist(u
->sink
, 0, NULL
);
1610 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1612 pa_get_user_name(un
, sizeof(un
)),
1613 pa_get_host_name(hn
, sizeof(hn
)));
1615 pa_proplist_setf(u
->source
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1616 pa_source_update_proplist(u
->source
, 0, NULL
);
1618 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1620 pa_get_user_name(un
, sizeof(un
)),
1621 pa_get_host_name(hn
, sizeof(hn
)));
1624 reply
= pa_tagstruct_new(NULL
, 0);
1625 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1626 pa_tagstruct_putu32(reply
, u
->ctag
++);
1628 if (u
->version
>= 13) {
1630 pl
= pa_proplist_new();
1631 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1632 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1633 pa_init_proplist(pl
);
1634 pa_tagstruct_put_proplist(reply
, pl
);
1635 pa_proplist_free(pl
);
1637 pa_tagstruct_puts(reply
, "PulseAudio");
1639 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1640 /* We ignore the server's reply here */
1642 reply
= pa_tagstruct_new(NULL
, 0);
1644 if (u
->version
< 13)
1645 /* Only for older PA versions we need to fill in the maxlength */
1646 u
->maxlength
= 4*1024*1024;
1649 u
->tlength
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1650 u
->minreq
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1651 u
->prebuf
= u
->tlength
;
1653 u
->fragsize
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1657 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1658 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1660 if (u
->version
< 13)
1661 pa_tagstruct_puts(reply
, name
);
1663 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1664 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1665 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1666 pa_tagstruct_puts(reply
, u
->sink_name
);
1667 pa_tagstruct_putu32(reply
, u
->maxlength
);
1668 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1669 pa_tagstruct_putu32(reply
, u
->tlength
);
1670 pa_tagstruct_putu32(reply
, u
->prebuf
);
1671 pa_tagstruct_putu32(reply
, u
->minreq
);
1672 pa_tagstruct_putu32(reply
, 0);
1673 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1674 pa_tagstruct_put_cvolume(reply
, &volume
);
1676 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1677 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1679 if (u
->version
< 13)
1680 pa_tagstruct_puts(reply
, name
);
1682 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1683 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1684 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1685 pa_tagstruct_puts(reply
, u
->source_name
);
1686 pa_tagstruct_putu32(reply
, u
->maxlength
);
1687 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1688 pa_tagstruct_putu32(reply
, u
->fragsize
);
1691 if (u
->version
>= 12) {
1692 pa_tagstruct_put_boolean(reply
, false); /* no_remap */
1693 pa_tagstruct_put_boolean(reply
, false); /* no_remix */
1694 pa_tagstruct_put_boolean(reply
, false); /* fix_format */
1695 pa_tagstruct_put_boolean(reply
, false); /* fix_rate */
1696 pa_tagstruct_put_boolean(reply
, false); /* fix_channels */
1697 pa_tagstruct_put_boolean(reply
, true); /* no_move */
1698 pa_tagstruct_put_boolean(reply
, false); /* variable_rate */
1701 if (u
->version
>= 13) {
1704 pa_tagstruct_put_boolean(reply
, false); /* start muted/peak detect*/
1705 pa_tagstruct_put_boolean(reply
, true); /* adjust_latency */
1707 pl
= pa_proplist_new();
1708 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1709 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1710 pa_tagstruct_put_proplist(reply
, pl
);
1711 pa_proplist_free(pl
);
1714 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1718 if (u
->version
>= 14) {
1720 pa_tagstruct_put_boolean(reply
, false); /* volume_set */
1722 pa_tagstruct_put_boolean(reply
, true); /* early rquests */
1725 if (u
->version
>= 15) {
1727 pa_tagstruct_put_boolean(reply
, false); /* muted_set */
1729 pa_tagstruct_put_boolean(reply
, false); /* don't inhibit auto suspend */
1730 pa_tagstruct_put_boolean(reply
, false); /* fail on suspend */
1734 if (u
->version
>= 17)
1735 pa_tagstruct_put_boolean(reply
, false); /* relative volume */
1737 if (u
->version
>= 18)
1738 pa_tagstruct_put_boolean(reply
, false); /* passthrough stream */
1742 if (u
->version
>= 21) {
1743 /* We're not using the extended API, so n_formats = 0 and that's that */
1744 pa_tagstruct_putu8(reply
, 0);
1747 if (u
->version
>= 22) {
1748 /* We're not using the extended API, so n_formats = 0 and that's that */
1749 pa_tagstruct_putu8(reply
, 0);
1750 pa_cvolume_reset(&volume
, u
->source
->sample_spec
.channels
);
1751 pa_tagstruct_put_cvolume(reply
, &volume
);
1752 pa_tagstruct_put_boolean(reply
, false); /* muted */
1753 pa_tagstruct_put_boolean(reply
, false); /* volume_set */
1754 pa_tagstruct_put_boolean(reply
, false); /* muted_set */
1755 pa_tagstruct_put_boolean(reply
, false); /* relative volume */
1756 pa_tagstruct_put_boolean(reply
, false); /* passthrough stream */
1760 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1761 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1763 pa_log_debug("Connection authenticated, creating stream ...");
1768 pa_module_unload_request(u
->module
, true);
1771 /* Called from main context */
1772 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1773 struct userdata
*u
= userdata
;
1778 pa_log_warn("Stream died.");
1779 pa_module_unload_request(u
->module
, true);
1782 /* Called from main context */
1783 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1784 struct userdata
*u
= userdata
;
1790 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1791 pa_log("Invalid packet");
1792 pa_module_unload_request(u
->module
, true);
1798 /* Called from main context */
1799 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1800 struct userdata
*u
= userdata
;
1806 if (channel
!= u
->channel
) {
1807 pa_log("Received memory block on bad channel.");
1808 pa_module_unload_request(u
->module
, true);
1812 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1814 u
->counter_delta
+= (int64_t) chunk
->length
;
1818 /* Called from main context */
1819 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1820 struct userdata
*u
= userdata
;
1826 pa_assert(u
->client
== sc
);
1828 pa_socket_client_unref(u
->client
);
1832 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1833 pa_module_unload_request(u
->module
, true);
1837 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1838 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, true, command_table
, PA_COMMAND_MAX
);
1840 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1841 pa_pstream_set_receive_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1843 pa_pstream_set_receive_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1846 t
= pa_tagstruct_new(NULL
, 0);
1847 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1848 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1849 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1851 pa_tagstruct_put_arbitrary(t
, pa_auth_cookie_read(u
->auth_cookie
, PA_NATIVE_COOKIE_LENGTH
), PA_NATIVE_COOKIE_LENGTH
);
1857 if (pa_iochannel_creds_supported(io
))
1858 pa_iochannel_creds_enable(io
);
1860 ucred
.uid
= getuid();
1861 ucred
.gid
= getgid();
1863 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1866 pa_pstream_send_tagstruct(u
->pstream
, t
);
1869 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1871 pa_log_debug("Connection established, authenticating ...");
1876 /* Called from main context */
1877 static void sink_set_volume(pa_sink
*sink
) {
1885 t
= pa_tagstruct_new(NULL
, 0);
1886 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1887 pa_tagstruct_putu32(t
, u
->ctag
++);
1888 pa_tagstruct_putu32(t
, u
->device_index
);
1889 pa_tagstruct_put_cvolume(t
, &sink
->real_volume
);
1890 pa_pstream_send_tagstruct(u
->pstream
, t
);
1893 /* Called from main context */
1894 static void sink_set_mute(pa_sink
*sink
) {
1902 if (u
->version
< 11)
1905 t
= pa_tagstruct_new(NULL
, 0);
1906 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1907 pa_tagstruct_putu32(t
, u
->ctag
++);
1908 pa_tagstruct_putu32(t
, u
->device_index
);
1909 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1910 pa_pstream_send_tagstruct(u
->pstream
, t
);
1915 int pa__init(pa_module
*m
) {
1916 pa_modargs
*ma
= NULL
;
1917 struct userdata
*u
= NULL
;
1918 char *server
= NULL
;
1919 pa_strlist
*server_list
= NULL
;
1924 pa_sink_new_data data
;
1926 pa_source_new_data data
;
1930 xcb_connection_t
*xcb
= NULL
;
1932 const char *cookie_path
;
1936 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1937 pa_log("Failed to parse module arguments");
1941 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1945 u
->pdispatch
= NULL
;
1947 u
->server_name
= NULL
;
1949 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1951 u
->requested_bytes
= 0;
1953 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1956 u
->smoother
= pa_smoother_new(
1965 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1966 u
->time_event
= NULL
;
1967 u
->ignore_latency_before
= 0;
1968 u
->transport_usec
= u
->thread_transport_usec
= 0;
1969 u
->remote_suspended
= u
->remote_corked
= false;
1970 u
->counter
= u
->counter_delta
= 0;
1972 u
->rtpoll
= pa_rtpoll_new();
1973 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1975 if (pa_modargs_get_value_boolean(ma
, "auto", &automatic
) < 0) {
1976 pa_log("Failed to parse argument \"auto\".");
1980 cookie_path
= pa_modargs_get_value(ma
, "cookie", NULL
);
1981 server
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
));
1985 /* Need an X11 connection to get root properties */
1986 if (getenv("DISPLAY") != NULL
) {
1987 if (!(xcb
= xcb_connect(getenv("DISPLAY"), NULL
)))
1988 pa_log("xcb_connect() failed");
1990 if (xcb_connection_has_error(xcb
)) {
1991 pa_log("xcb_connection_has_error() returned true");
1992 xcb_disconnect(xcb
);
1999 /* Figure out the cookie the same way a normal client would */
2001 cookie_path
= getenv(ENV_COOKIE_FILE
);
2004 if (!cookie_path
&& xcb
) {
2006 if (pa_x11_get_prop(xcb
, 0, "PULSE_COOKIE", t
, sizeof(t
))) {
2007 uint8_t cookie
[PA_NATIVE_COOKIE_LENGTH
];
2009 if (pa_parsehex(t
, cookie
, sizeof(cookie
)) != sizeof(cookie
))
2010 pa_log("Failed to parse cookie data");
2012 if (!(u
->auth_cookie
= pa_auth_cookie_create(u
->core
, cookie
, sizeof(cookie
))))
2019 /* Same thing for the server name */
2021 server
= pa_xstrdup(getenv(ENV_DEFAULT_SERVER
));
2024 if (!server
&& xcb
) {
2026 if (pa_x11_get_prop(xcb
, 0, "PULSE_SERVER", t
, sizeof(t
)))
2027 server
= pa_xstrdup(t
);
2031 /* Also determine the default sink/source on the other server */
2034 u
->sink_name
= pa_xstrdup(getenv(ENV_DEFAULT_SINK
));
2037 if (!u
->sink_name
&& xcb
) {
2039 if (pa_x11_get_prop(xcb
, 0, "PULSE_SINK", t
, sizeof(t
)))
2040 u
->sink_name
= pa_xstrdup(t
);
2044 if (!u
->source_name
)
2045 u
->source_name
= pa_xstrdup(getenv(ENV_DEFAULT_SOURCE
));
2048 if (!u
->source_name
&& xcb
) {
2050 if (pa_x11_get_prop(xcb
, 0, "PULSE_SOURCE", t
, sizeof(t
)))
2051 u
->source_name
= pa_xstrdup(t
);
2057 if (!cookie_path
&& !u
->auth_cookie
)
2058 cookie_path
= PA_NATIVE_COOKIE_FILE
;
2061 if (!(u
->auth_cookie
= pa_auth_cookie_get(u
->core
, cookie_path
, true, PA_NATIVE_COOKIE_LENGTH
)))
2066 if (!(server_list
= pa_strlist_parse(server
))) {
2067 pa_log("Invalid server specified.");
2074 pa_log("No server specified.");
2078 pa_log("No server address found. Attempting default local sockets.");
2080 /* The system wide instance via PF_LOCAL */
2081 server_list
= pa_strlist_prepend(server_list
, PA_SYSTEM_RUNTIME_PATH PA_PATH_SEP PA_NATIVE_DEFAULT_UNIX_SOCKET
);
2083 /* The user instance via PF_LOCAL */
2084 if ((ufn
= pa_runtime_path(PA_NATIVE_DEFAULT_UNIX_SOCKET
))) {
2085 server_list
= pa_strlist_prepend(server_list
, ufn
);
2090 ss
= m
->core
->default_sample_spec
;
2091 map
= m
->core
->default_channel_map
;
2092 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
2093 pa_log("Invalid sample format specification");
2098 server_list
= pa_strlist_pop(server_list
, &u
->server_name
);
2100 if (!u
->server_name
) {
2101 pa_log("Failed to connect to server '%s'", server
);
2105 pa_log_debug("Trying to connect to %s...", u
->server_name
);
2107 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, true, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
2108 pa_xfree(u
->server_name
);
2109 u
->server_name
= NULL
;
2116 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
2120 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
2121 dn
= pa_sprintf_malloc("tunnel-sink.%s", u
->server_name
);
2123 pa_sink_new_data_init(&data
);
2124 data
.driver
= __FILE__
;
2126 data
.namereg_fail
= false;
2127 pa_sink_new_data_set_name(&data
, dn
);
2128 pa_sink_new_data_set_sample_spec(&data
, &ss
);
2129 pa_sink_new_data_set_channel_map(&data
, &map
);
2130 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
2131 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
2133 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
2135 if (pa_modargs_get_proplist(ma
, "sink_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
2136 pa_log("Invalid properties");
2137 pa_sink_new_data_done(&data
);
2141 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
);
2142 pa_sink_new_data_done(&data
);
2145 pa_log("Failed to create sink.");
2149 u
->sink
->parent
.process_msg
= sink_process_msg
;
2150 u
->sink
->userdata
= u
;
2151 u
->sink
->set_state
= sink_set_state
;
2152 pa_sink_set_set_volume_callback(u
->sink
, sink_set_volume
);
2153 pa_sink_set_set_mute_callback(u
->sink
, sink_set_mute
);
2155 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= false;
2157 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2159 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
2160 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
2164 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
2165 dn
= pa_sprintf_malloc("tunnel-source.%s", u
->server_name
);
2167 pa_source_new_data_init(&data
);
2168 data
.driver
= __FILE__
;
2170 data
.namereg_fail
= false;
2171 pa_source_new_data_set_name(&data
, dn
);
2172 pa_source_new_data_set_sample_spec(&data
, &ss
);
2173 pa_source_new_data_set_channel_map(&data
, &map
);
2174 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
2175 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
2177 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
2179 if (pa_modargs_get_proplist(ma
, "source_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
2180 pa_log("Invalid properties");
2181 pa_source_new_data_done(&data
);
2185 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
2186 pa_source_new_data_done(&data
);
2189 pa_log("Failed to create source.");
2193 u
->source
->parent
.process_msg
= source_process_msg
;
2194 u
->source
->set_state
= source_set_state
;
2195 u
->source
->userdata
= u
;
2197 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2199 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
2200 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
2202 u
->mcalign
= pa_mcalign_new(pa_frame_size(&u
->source
->sample_spec
));
2207 u
->time_event
= NULL
;
2209 u
->maxlength
= (uint32_t) -1;
2211 u
->tlength
= u
->minreq
= u
->prebuf
= (uint32_t) -1;
2213 u
->fragsize
= (uint32_t) -1;
2216 if (!(u
->thread
= pa_thread_new("module-tunnel", thread_func
, u
))) {
2217 pa_log("Failed to create thread.");
2222 pa_sink_put(u
->sink
);
2224 pa_source_put(u
->source
);
2231 pa_strlist_free(server_list
);
2235 xcb_disconnect(xcb
);
2238 pa_modargs_free(ma
);
2249 pa_strlist_free(server_list
);
2253 xcb_disconnect(xcb
);
2257 pa_modargs_free(ma
);
2264 void pa__done(pa_module
*m
) {
2269 if (!(u
= m
->userdata
))
2274 pa_sink_unlink(u
->sink
);
2277 pa_source_unlink(u
->source
);
2281 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
2282 pa_thread_free(u
->thread
);
2285 pa_thread_mq_done(&u
->thread_mq
);
2289 pa_sink_unref(u
->sink
);
2292 pa_source_unref(u
->source
);
2296 pa_rtpoll_free(u
->rtpoll
);
2299 pa_pstream_unlink(u
->pstream
);
2300 pa_pstream_unref(u
->pstream
);
2304 pa_pdispatch_unref(u
->pdispatch
);
2307 pa_socket_client_unref(u
->client
);
2310 pa_auth_cookie_unref(u
->auth_cookie
);
2313 pa_smoother_free(u
->smoother
);
2316 u
->core
->mainloop
->time_free(u
->time_event
);
2320 pa_mcalign_free(u
->mcalign
);
2324 pa_xfree(u
->sink_name
);
2326 pa_xfree(u
->source_name
);
2328 pa_xfree(u
->server_name
);
2330 pa_xfree(u
->device_description
);
2331 pa_xfree(u
->server_fqdn
);
2332 pa_xfree(u
->user_name
);