2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/sink-input.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/time-smoother.h>
52 #include <pulsecore/thread.h>
53 #include <pulsecore/thread-mq.h>
54 #include <pulsecore/core-rtclock.h>
55 #include <pulsecore/core-error.h>
56 #include <pulsecore/proplist-util.h>
57 #include <pulsecore/auth-cookie.h>
60 #include "module-tunnel-sink-symdef.h"
62 #include "module-tunnel-source-symdef.h"
66 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
68 "sink_name=<name for the local sink> "
69 "sink_properties=<properties for the local sink> "
71 "sink=<remote sink name> "
73 "format=<sample format> "
74 "channels=<number of channels> "
76 "channel_map=<channel map>");
78 PA_MODULE_DESCRIPTION("Tunnel module for sources");
80 "source_name=<name for the local source> "
81 "source_properties=<properties for the local source> "
83 "source=<remote source name> "
85 "format=<sample format> "
86 "channels=<number of channels> "
88 "channel_map=<channel map>");
91 PA_MODULE_AUTHOR("Lennart Poettering");
92 PA_MODULE_VERSION(PACKAGE_VERSION
);
93 PA_MODULE_LOAD_ONCE(FALSE
);
95 static const char* const valid_modargs
[] = {
114 #define DEFAULT_TIMEOUT 5
116 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
118 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
123 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
124 SINK_MESSAGE_REMOTE_SUSPEND
,
125 SINK_MESSAGE_UPDATE_LATENCY
,
129 #define DEFAULT_TLENGTH_MSEC 150
130 #define DEFAULT_MINREQ_MSEC 25
135 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
136 SOURCE_MESSAGE_REMOTE_SUSPEND
,
137 SOURCE_MESSAGE_UPDATE_LATENCY
140 #define DEFAULT_FRAGSIZE_MSEC 25
145 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
146 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
148 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
149 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
150 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
151 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
152 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
153 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
154 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
156 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
158 [PA_COMMAND_REQUEST
] = command_request
,
159 [PA_COMMAND_STARTED
] = command_started
,
161 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
162 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
163 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
164 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
165 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
166 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
167 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
168 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
169 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
170 [PA_COMMAND_PLAYBACK_STREAM_EVENT
] = command_stream_or_client_event
,
171 [PA_COMMAND_RECORD_STREAM_EVENT
] = command_stream_or_client_event
,
172 [PA_COMMAND_CLIENT_EVENT
] = command_stream_or_client_event
,
173 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
,
174 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
181 pa_thread_mq thread_mq
;
185 pa_socket_client
*client
;
187 pa_pdispatch
*pdispatch
;
193 size_t requested_bytes
;
199 pa_auth_cookie
*auth_cookie
;
203 uint32_t device_index
;
206 int64_t counter
, counter_delta
;
208 pa_bool_t remote_corked
:1;
209 pa_bool_t remote_suspended
:1;
211 pa_usec_t transport_usec
; /* maintained in the main thread */
212 pa_usec_t thread_transport_usec
; /* maintained in the IO thread */
214 uint32_t ignore_latency_before
;
216 pa_time_event
*time_event
;
218 pa_smoother
*smoother
;
220 char *device_description
;
234 static void request_latency(struct userdata
*u
);
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
238 pa_log_debug("Got stream or client event.");
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
243 struct userdata
*u
= userdata
;
248 pa_assert(u
->pdispatch
== pd
);
250 pa_log_warn("Stream killed");
251 pa_module_unload_request(u
->module
, TRUE
);
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
256 struct userdata
*u
= userdata
;
261 pa_assert(u
->pdispatch
== pd
);
263 pa_log_info("Server signalled buffer overrun/underrun.");
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
269 struct userdata
*u
= userdata
;
276 pa_assert(u
->pdispatch
== pd
);
278 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
279 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
280 !pa_tagstruct_eof(t
)) {
282 pa_log("Invalid packet.");
283 pa_module_unload_request(u
->module
, TRUE
);
287 pa_log_debug("Server reports device suspend.");
290 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
292 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
298 /* Called from main context */
299 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
300 struct userdata
*u
= userdata
;
301 uint32_t channel
, di
;
308 pa_assert(u
->pdispatch
== pd
);
310 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
311 pa_tagstruct_getu32(t
, &di
) < 0 ||
312 pa_tagstruct_gets(t
, &dn
) < 0 ||
313 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
315 pa_log_error("Invalid packet.");
316 pa_module_unload_request(u
->module
, TRUE
);
320 pa_log_debug("Server reports a stream move.");
323 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
325 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
331 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
332 struct userdata
*u
= userdata
;
333 uint32_t channel
, maxlength
, tlength
, fragsize
, prebuf
, minreq
;
339 pa_assert(u
->pdispatch
== pd
);
341 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
342 pa_tagstruct_getu32(t
, &maxlength
) < 0) {
344 pa_log_error("Invalid packet.");
345 pa_module_unload_request(u
->module
, TRUE
);
349 if (command
== PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
) {
350 if (pa_tagstruct_getu32(t
, &fragsize
) < 0 ||
351 pa_tagstruct_get_usec(t
, &usec
) < 0) {
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u
->module
, TRUE
);
358 if (pa_tagstruct_getu32(t
, &tlength
) < 0 ||
359 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
360 pa_tagstruct_getu32(t
, &minreq
) < 0 ||
361 pa_tagstruct_get_usec(t
, &usec
) < 0) {
363 pa_log_error("Invalid packet.");
364 pa_module_unload_request(u
->module
, TRUE
);
370 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength
, (unsigned long) u
->tlength
);
378 /* Called from main context */
379 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
380 struct userdata
*u
= userdata
;
385 pa_assert(u
->pdispatch
== pd
);
387 pa_log_debug("Server reports playback started.");
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata
*u
, pa_bool_t past
) {
399 x
= pa_rtclock_now();
401 /* Correct by the time the requested issued needs to travel to the
402 * other side. This is a valid thread-safe access, because the
403 * main thread is waiting for us */
406 x
-= u
->thread_transport_usec
;
408 x
+= u
->thread_transport_usec
;
410 if (u
->remote_suspended
|| u
->remote_corked
)
411 pa_smoother_pause(u
->smoother
, x
);
413 pa_smoother_resume(u
->smoother
, x
, TRUE
);
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
420 if (u
->remote_corked
== cork
)
423 u
->remote_corked
= cork
;
424 check_smoother_status(u
, FALSE
);
427 /* Called from main context */
428 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
435 t
= pa_tagstruct_new(NULL
, 0);
437 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
439 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
441 pa_tagstruct_putu32(t
, u
->ctag
++);
442 pa_tagstruct_putu32(t
, u
->channel
);
443 pa_tagstruct_put_boolean(t
, !!cork
);
444 pa_pstream_send_tagstruct(u
->pstream
, t
);
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
453 if (u
->remote_suspended
== suspend
)
456 u
->remote_suspended
= suspend
;
457 check_smoother_status(u
, TRUE
);
462 /* Called from IO thread context */
463 static void send_data(struct userdata
*u
) {
466 while (u
->requested_bytes
> 0) {
467 pa_memchunk memchunk
;
469 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
470 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
471 pa_memblock_unref(memchunk
.memblock
);
473 u
->requested_bytes
-= memchunk
.length
;
475 u
->counter
+= (int64_t) memchunk
.length
;
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
481 struct userdata
*u
= PA_SINK(o
)->userdata
;
485 case PA_SINK_MESSAGE_SET_STATE
: {
488 /* First, change the state, because otherwide pa_sink_render() would fail */
489 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
491 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
493 if (PA_SINK_IS_OPENED(u
->sink
->state
))
500 case PA_SINK_MESSAGE_GET_LATENCY
: {
501 pa_usec_t yl
, yr
, *usec
= data
;
503 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
504 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
506 *usec
= yl
> yr
? yl
- yr
: 0;
510 case SINK_MESSAGE_REQUEST
:
512 pa_assert(offset
> 0);
513 u
->requested_bytes
+= (size_t) offset
;
515 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
521 case SINK_MESSAGE_REMOTE_SUSPEND
:
523 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
527 case SINK_MESSAGE_UPDATE_LATENCY
: {
530 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
532 if (y
> (pa_usec_t
) offset
)
533 y
-= (pa_usec_t
) offset
;
537 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
539 /* We can access this freely here, since the main thread is waiting for us */
540 u
->thread_transport_usec
= u
->transport_usec
;
545 case SINK_MESSAGE_POST
:
547 /* OK, This might be a bit confusing. This message is
548 * delivered to us from the main context -- NOT from the
549 * IO thread context where the rest of the messages are
550 * dispatched. Yeah, ugly, but I am a lazy bastard. */
552 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
554 u
->counter_delta
+= (int64_t) chunk
->length
;
559 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
562 /* Called from main context */
563 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
565 pa_sink_assert_ref(s
);
568 switch ((pa_sink_state_t
) state
) {
570 case PA_SINK_SUSPENDED
:
571 pa_assert(PA_SINK_IS_OPENED(s
->state
));
572 stream_cork(u
, TRUE
);
576 case PA_SINK_RUNNING
:
577 if (s
->state
== PA_SINK_SUSPENDED
)
578 stream_cork(u
, FALSE
);
581 case PA_SINK_UNLINKED
:
583 case PA_SINK_INVALID_STATE
:
592 /* This function is called from IO context -- except when it is not. */
593 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
594 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
598 case PA_SOURCE_MESSAGE_SET_STATE
: {
601 if ((r
= pa_source_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
602 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
607 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
608 pa_usec_t yr
, yl
, *usec
= data
;
610 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &PA_SOURCE(o
)->sample_spec
);
611 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
613 *usec
= yr
> yl
? yr
- yl
: 0;
617 case SOURCE_MESSAGE_POST
:
619 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
620 pa_source_post(u
->source
, chunk
);
622 u
->counter
+= (int64_t) chunk
->length
;
626 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
628 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
631 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
634 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->source
->sample_spec
);
635 y
+= (pa_usec_t
) offset
;
637 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
639 /* We can access this freely here, since the main thread is waiting for us */
640 u
->thread_transport_usec
= u
->transport_usec
;
646 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
649 /* Called from main context */
650 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
652 pa_source_assert_ref(s
);
655 switch ((pa_source_state_t
) state
) {
657 case PA_SOURCE_SUSPENDED
:
658 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
659 stream_cork(u
, TRUE
);
663 case PA_SOURCE_RUNNING
:
664 if (s
->state
== PA_SOURCE_SUSPENDED
)
665 stream_cork(u
, FALSE
);
668 case PA_SOURCE_UNLINKED
:
670 case PA_SINK_INVALID_STATE
:
679 static void thread_func(void *userdata
) {
680 struct userdata
*u
= userdata
;
684 pa_log_debug("Thread starting up");
686 pa_thread_mq_install(&u
->thread_mq
);
692 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
693 if (PA_UNLIKELY(u
->sink
->thread_info
.rewind_requested
))
694 pa_sink_process_rewind(u
->sink
, 0);
697 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
705 /* If this was no regular exit from the loop we have to continue
706 * processing messages until we received PA_MESSAGE_SHUTDOWN */
707 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
708 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
711 pa_log_debug("Thread shutting down");
715 /* Called from main context */
716 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
717 struct userdata
*u
= userdata
;
718 uint32_t bytes
, channel
;
721 pa_assert(command
== PA_COMMAND_REQUEST
);
724 pa_assert(u
->pdispatch
== pd
);
726 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
727 pa_tagstruct_getu32(t
, &bytes
) < 0) {
728 pa_log("Invalid protocol reply");
732 if (channel
!= u
->channel
) {
733 pa_log("Received data for invalid channel");
737 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
741 pa_module_unload_request(u
->module
, TRUE
);
746 /* Called from main context */
747 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
748 struct userdata
*u
= userdata
;
749 pa_usec_t sink_usec
, source_usec
;
751 int64_t write_index
, read_index
;
752 struct timeval local
, remote
, now
;
759 if (command
!= PA_COMMAND_REPLY
) {
760 if (command
== PA_COMMAND_ERROR
)
761 pa_log("Failed to get latency.");
763 pa_log("Protocol error.");
767 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
768 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
769 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
770 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
771 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
772 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
773 pa_tagstruct_gets64(t
, &read_index
) < 0) {
774 pa_log("Invalid reply.");
779 if (u
->version
>= 13) {
780 uint64_t underrun_for
= 0, playing_for
= 0;
782 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
783 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
784 pa_log("Invalid reply.");
790 if (!pa_tagstruct_eof(t
)) {
791 pa_log("Invalid reply.");
795 if (tag
< u
->ignore_latency_before
) {
799 pa_gettimeofday(&now
);
801 /* Calculate transport usec */
802 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
803 /* local and remote seem to have synchronized clocks */
805 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
807 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
810 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
812 /* First, take the device's delay */
814 delay
= (int64_t) sink_usec
;
815 ss
= &u
->sink
->sample_spec
;
817 delay
= (int64_t) source_usec
;
818 ss
= &u
->source
->sample_spec
;
821 /* Add the length of our server-side buffer */
822 if (write_index
>= read_index
)
823 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) (write_index
-read_index
), ss
);
825 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) (read_index
-write_index
), ss
);
827 /* Our measurements are already out of date, hence correct by the *
828 * transport latency */
830 delay
-= (int64_t) u
->transport_usec
;
832 delay
+= (int64_t) u
->transport_usec
;
835 /* Now correct by what we have have read/written since we requested the update */
837 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
839 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
843 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
845 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
852 pa_module_unload_request(u
->module
, TRUE
);
855 /* Called from main context */
856 static void request_latency(struct userdata
*u
) {
862 t
= pa_tagstruct_new(NULL
, 0);
864 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
866 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
868 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
869 pa_tagstruct_putu32(t
, u
->channel
);
871 pa_tagstruct_put_timeval(t
, pa_gettimeofday(&now
));
873 pa_pstream_send_tagstruct(u
->pstream
, t
);
874 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
876 u
->ignore_latency_before
= tag
;
877 u
->counter_delta
= 0;
880 /* Called from main context */
881 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*t
, void *userdata
) {
882 struct userdata
*u
= userdata
;
890 pa_core_rttime_restart(u
->core
, e
, pa_rtclock_now() + LATENCY_INTERVAL
);
893 /* Called from main context */
894 static void update_description(struct userdata
*u
) {
896 char un
[128], hn
[128];
901 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
904 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
907 pa_sink_set_description(u
->sink
, d
);
908 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
909 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
910 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
912 pa_source_set_description(u
->source
, d
);
913 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
914 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
915 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
920 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
921 pa_get_user_name(un
, sizeof(un
)),
922 pa_get_host_name(hn
, sizeof(hn
)));
924 t
= pa_tagstruct_new(NULL
, 0);
926 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
928 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
930 pa_tagstruct_putu32(t
, u
->ctag
++);
931 pa_tagstruct_putu32(t
, u
->channel
);
932 pa_tagstruct_puts(t
, d
);
933 pa_pstream_send_tagstruct(u
->pstream
, t
);
938 /* Called from main context */
939 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
940 struct userdata
*u
= userdata
;
943 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
949 if (command
!= PA_COMMAND_REPLY
) {
950 if (command
== PA_COMMAND_ERROR
)
951 pa_log("Failed to get info.");
953 pa_log("Protocol error.");
957 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
958 pa_tagstruct_gets(t
, &server_version
) < 0 ||
959 pa_tagstruct_gets(t
, &user_name
) < 0 ||
960 pa_tagstruct_gets(t
, &host_name
) < 0 ||
961 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
962 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
963 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
964 pa_tagstruct_getu32(t
, &cookie
) < 0 ||
966 pa_tagstruct_get_channel_map(t
, &cm
) < 0)) {
968 pa_log("Parse failure");
972 if (!pa_tagstruct_eof(t
)) {
973 pa_log("Packet too long");
977 pa_xfree(u
->server_fqdn
);
978 u
->server_fqdn
= pa_xstrdup(host_name
);
980 pa_xfree(u
->user_name
);
981 u
->user_name
= pa_xstrdup(user_name
);
983 update_description(u
);
988 pa_module_unload_request(u
->module
, TRUE
);
993 /* Called from main context */
994 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
995 struct userdata
*u
= userdata
;
996 uint32_t idx
, owner_module
, monitor_source
, flags
;
997 const char *name
, *description
, *monitor_source_name
, *driver
;
1008 pl
= pa_proplist_new();
1010 if (command
!= PA_COMMAND_REPLY
) {
1011 if (command
== PA_COMMAND_ERROR
)
1012 pa_log("Failed to get info.");
1014 pa_log("Protocol error.");
1018 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1019 pa_tagstruct_gets(t
, &name
) < 0 ||
1020 pa_tagstruct_gets(t
, &description
) < 0 ||
1021 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1022 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1023 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1024 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1025 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1026 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
1027 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
1028 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1029 pa_tagstruct_gets(t
, &driver
) < 0 ||
1030 pa_tagstruct_getu32(t
, &flags
) < 0) {
1032 pa_log("Parse failure");
1036 if (u
->version
>= 13) {
1037 pa_usec_t configured_latency
;
1039 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1040 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1042 pa_log("Parse failure");
1047 if (u
->version
>= 15) {
1048 pa_volume_t base_volume
;
1049 uint32_t state
, n_volume_steps
, card
;
1051 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1052 pa_tagstruct_getu32(t
, &state
) < 0 ||
1053 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1054 pa_tagstruct_getu32(t
, &card
) < 0) {
1056 pa_log("Parse failure");
1061 if (!pa_tagstruct_eof(t
)) {
1062 pa_log("Packet too long");
1066 pa_proplist_free(pl
);
1068 if (!u
->sink_name
|| strcmp(name
, u
->sink_name
))
1071 pa_xfree(u
->device_description
);
1072 u
->device_description
= pa_xstrdup(description
);
1074 update_description(u
);
1079 pa_module_unload_request(u
->module
, TRUE
);
1080 pa_proplist_free(pl
);
1083 /* Called from main context */
1084 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1085 struct userdata
*u
= userdata
;
1086 uint32_t idx
, owner_module
, client
, sink
;
1087 pa_usec_t buffer_usec
, sink_usec
;
1088 const char *name
, *driver
, *resample_method
;
1090 pa_sample_spec sample_spec
;
1091 pa_channel_map channel_map
;
1098 pl
= pa_proplist_new();
1100 if (command
!= PA_COMMAND_REPLY
) {
1101 if (command
== PA_COMMAND_ERROR
)
1102 pa_log("Failed to get info.");
1104 pa_log("Protocol error.");
1108 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1109 pa_tagstruct_gets(t
, &name
) < 0 ||
1110 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1111 pa_tagstruct_getu32(t
, &client
) < 0 ||
1112 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1113 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1114 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1115 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1116 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1117 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1118 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1119 pa_tagstruct_gets(t
, &driver
) < 0) {
1121 pa_log("Parse failure");
1125 if (u
->version
>= 11) {
1126 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1128 pa_log("Parse failure");
1133 if (u
->version
>= 13) {
1134 if (pa_tagstruct_get_proplist(t
, pl
) < 0) {
1136 pa_log("Parse failure");
1141 if (!pa_tagstruct_eof(t
)) {
1142 pa_log("Packet too long");
1146 pa_proplist_free(pl
);
1148 if (idx
!= u
->device_index
)
1153 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1154 pa_cvolume_equal(&volume
, &u
->sink
->virtual_volume
))
1157 pa_sink_volume_changed(u
->sink
, &volume
, FALSE
);
1159 if (u
->version
>= 11)
1160 pa_sink_mute_changed(u
->sink
, mute
, FALSE
);
1165 pa_module_unload_request(u
->module
, TRUE
);
1166 pa_proplist_free(pl
);
1171 /* Called from main context */
1172 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1173 struct userdata
*u
= userdata
;
1174 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1175 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1180 pa_usec_t latency
, configured_latency
;
1186 pl
= pa_proplist_new();
1188 if (command
!= PA_COMMAND_REPLY
) {
1189 if (command
== PA_COMMAND_ERROR
)
1190 pa_log("Failed to get info.");
1192 pa_log("Protocol error.");
1196 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1197 pa_tagstruct_gets(t
, &name
) < 0 ||
1198 pa_tagstruct_gets(t
, &description
) < 0 ||
1199 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1200 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1201 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1202 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1203 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1204 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1205 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1206 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1207 pa_tagstruct_gets(t
, &driver
) < 0 ||
1208 pa_tagstruct_getu32(t
, &flags
) < 0) {
1210 pa_log("Parse failure");
1214 if (u
->version
>= 13) {
1215 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1216 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1218 pa_log("Parse failure");
1223 if (u
->version
>= 15) {
1224 pa_volume_t base_volume
;
1225 uint32_t state
, n_volume_steps
, card
;
1227 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1228 pa_tagstruct_getu32(t
, &state
) < 0 ||
1229 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1230 pa_tagstruct_getu32(t
, &card
) < 0) {
1232 pa_log("Parse failure");
1237 if (!pa_tagstruct_eof(t
)) {
1238 pa_log("Packet too long");
1242 pa_proplist_free(pl
);
1244 if (!u
->source_name
|| strcmp(name
, u
->source_name
))
1247 pa_xfree(u
->device_description
);
1248 u
->device_description
= pa_xstrdup(description
);
1250 update_description(u
);
1255 pa_module_unload_request(u
->module
, TRUE
);
1256 pa_proplist_free(pl
);
1261 /* Called from main context */
1262 static void request_info(struct userdata
*u
) {
1267 t
= pa_tagstruct_new(NULL
, 0);
1268 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1269 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1270 pa_pstream_send_tagstruct(u
->pstream
, t
);
1271 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1274 t
= pa_tagstruct_new(NULL
, 0);
1275 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1276 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1277 pa_tagstruct_putu32(t
, u
->device_index
);
1278 pa_pstream_send_tagstruct(u
->pstream
, t
);
1279 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1282 t
= pa_tagstruct_new(NULL
, 0);
1283 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1284 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1285 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1286 pa_tagstruct_puts(t
, u
->sink_name
);
1287 pa_pstream_send_tagstruct(u
->pstream
, t
);
1288 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1291 if (u
->source_name
) {
1292 t
= pa_tagstruct_new(NULL
, 0);
1293 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1294 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1295 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1296 pa_tagstruct_puts(t
, u
->source_name
);
1297 pa_pstream_send_tagstruct(u
->pstream
, t
);
1298 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1303 /* Called from main context */
1304 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1305 struct userdata
*u
= userdata
;
1306 pa_subscription_event_type_t e
;
1312 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1314 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1315 pa_tagstruct_getu32(t
, &idx
) < 0) {
1316 pa_log("Invalid protocol reply");
1317 pa_module_unload_request(u
->module
, TRUE
);
1321 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1323 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1324 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1326 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1334 /* Called from main context */
1335 static void start_subscribe(struct userdata
*u
) {
1340 t
= pa_tagstruct_new(NULL
, 0);
1341 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1342 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1343 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1345 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1347 PA_SUBSCRIPTION_MASK_SOURCE
1351 pa_pstream_send_tagstruct(u
->pstream
, t
);
1354 /* Called from main context */
1355 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1356 struct userdata
*u
= userdata
;
1363 pa_assert(u
->pdispatch
== pd
);
1365 if (command
!= PA_COMMAND_REPLY
) {
1366 if (command
== PA_COMMAND_ERROR
)
1367 pa_log("Failed to create stream.");
1369 pa_log("Protocol error.");
1373 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1374 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1376 || pa_tagstruct_getu32(t
, &bytes
) < 0
1381 if (u
->version
>= 9) {
1383 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1384 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1385 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1386 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1389 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1390 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1395 if (u
->version
>= 12) {
1398 uint32_t device_index
;
1400 pa_bool_t suspended
;
1402 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1403 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1404 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1405 pa_tagstruct_gets(t
, &dn
) < 0 ||
1406 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1410 pa_xfree(u
->sink_name
);
1411 u
->sink_name
= pa_xstrdup(dn
);
1413 pa_xfree(u
->source_name
);
1414 u
->source_name
= pa_xstrdup(dn
);
1418 if (u
->version
>= 13) {
1421 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1424 /* #ifdef TUNNEL_SINK */
1425 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1427 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1431 if (!pa_tagstruct_eof(t
))
1437 pa_assert(!u
->time_event
);
1438 u
->time_event
= pa_core_rttime_new(u
->core
, pa_rtclock_now() + LATENCY_INTERVAL
, timeout_callback
, u
);
1442 pa_log_debug("Stream created.");
1445 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1451 pa_log("Invalid reply. (Create stream)");
1454 pa_module_unload_request(u
->module
, TRUE
);
1458 /* Called from main context */
1459 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1460 struct userdata
*u
= userdata
;
1461 pa_tagstruct
*reply
;
1462 char name
[256], un
[128], hn
[128];
1469 pa_assert(u
->pdispatch
== pd
);
1471 if (command
!= PA_COMMAND_REPLY
||
1472 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1473 !pa_tagstruct_eof(t
)) {
1475 if (command
== PA_COMMAND_ERROR
)
1476 pa_log("Failed to authenticate");
1478 pa_log("Protocol error.");
1483 /* Minimum supported protocol version */
1484 if (u
->version
< 8) {
1485 pa_log("Incompatible protocol version");
1489 /* Starting with protocol version 13 the MSB of the version tag
1490 reflects if shm is enabled for this connection or not. We don't
1491 support SHM here at all, so we just ignore this. */
1493 if (u
->version
>= 13)
1494 u
->version
&= 0x7FFFFFFFU
;
1496 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1499 pa_proplist_setf(u
->sink
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1500 pa_sink_update_proplist(u
->sink
, 0, NULL
);
1502 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1504 pa_get_user_name(un
, sizeof(un
)),
1505 pa_get_host_name(hn
, sizeof(hn
)));
1507 pa_proplist_setf(u
->source
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1508 pa_source_update_proplist(u
->source
, 0, NULL
);
1510 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1512 pa_get_user_name(un
, sizeof(un
)),
1513 pa_get_host_name(hn
, sizeof(hn
)));
1516 reply
= pa_tagstruct_new(NULL
, 0);
1517 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1518 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1520 if (u
->version
>= 13) {
1522 pl
= pa_proplist_new();
1523 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1524 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1525 pa_init_proplist(pl
);
1526 pa_tagstruct_put_proplist(reply
, pl
);
1527 pa_proplist_free(pl
);
1529 pa_tagstruct_puts(reply
, "PulseAudio");
1531 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1532 /* We ignore the server's reply here */
1534 reply
= pa_tagstruct_new(NULL
, 0);
1536 if (u
->version
< 13)
1537 /* Only for older PA versions we need to fill in the maxlength */
1538 u
->maxlength
= 4*1024*1024;
1541 u
->tlength
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1542 u
->minreq
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1543 u
->prebuf
= u
->tlength
;
1545 u
->fragsize
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1549 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1550 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1552 if (u
->version
< 13)
1553 pa_tagstruct_puts(reply
, name
);
1555 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1556 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1557 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1558 pa_tagstruct_puts(reply
, u
->sink_name
);
1559 pa_tagstruct_putu32(reply
, u
->maxlength
);
1560 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1561 pa_tagstruct_putu32(reply
, u
->tlength
);
1562 pa_tagstruct_putu32(reply
, u
->prebuf
);
1563 pa_tagstruct_putu32(reply
, u
->minreq
);
1564 pa_tagstruct_putu32(reply
, 0);
1565 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1566 pa_tagstruct_put_cvolume(reply
, &volume
);
1568 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1569 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1571 if (u
->version
< 13)
1572 pa_tagstruct_puts(reply
, name
);
1574 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1575 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1576 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1577 pa_tagstruct_puts(reply
, u
->source_name
);
1578 pa_tagstruct_putu32(reply
, u
->maxlength
);
1579 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1580 pa_tagstruct_putu32(reply
, u
->fragsize
);
1583 if (u
->version
>= 12) {
1584 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1585 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1586 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1587 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1588 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1589 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1590 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1593 if (u
->version
>= 13) {
1596 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1597 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1599 pl
= pa_proplist_new();
1600 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1601 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1602 pa_tagstruct_put_proplist(reply
, pl
);
1603 pa_proplist_free(pl
);
1606 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1610 if (u
->version
>= 14) {
1612 pa_tagstruct_put_boolean(reply
, FALSE
); /* volume_set */
1614 pa_tagstruct_put_boolean(reply
, TRUE
); /* early rquests */
1617 if (u
->version
>= 15) {
1619 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted_set */
1621 pa_tagstruct_put_boolean(reply
, FALSE
); /* don't inhibit auto suspend */
1622 pa_tagstruct_put_boolean(reply
, FALSE
); /* fail on suspend */
1625 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1626 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1628 pa_log_debug("Connection authenticated, creating stream ...");
1633 pa_module_unload_request(u
->module
, TRUE
);
1636 /* Called from main context */
1637 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1638 struct userdata
*u
= userdata
;
1643 pa_log_warn("Stream died.");
1644 pa_module_unload_request(u
->module
, TRUE
);
1647 /* Called from main context */
1648 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1649 struct userdata
*u
= userdata
;
1655 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1656 pa_log("Invalid packet");
1657 pa_module_unload_request(u
->module
, TRUE
);
1663 /* Called from main context */
1664 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1665 struct userdata
*u
= userdata
;
1671 if (channel
!= u
->channel
) {
1672 pa_log("Received memory block on bad channel.");
1673 pa_module_unload_request(u
->module
, TRUE
);
1677 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1679 u
->counter_delta
+= (int64_t) chunk
->length
;
1683 /* Called from main context */
1684 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1685 struct userdata
*u
= userdata
;
1691 pa_assert(u
->client
== sc
);
1693 pa_socket_client_unref(u
->client
);
1697 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1698 pa_module_unload_request(u
->module
, TRUE
);
1702 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1703 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, TRUE
, command_table
, PA_COMMAND_MAX
);
1705 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1706 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1708 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1711 t
= pa_tagstruct_new(NULL
, 0);
1712 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1713 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1714 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1716 pa_tagstruct_put_arbitrary(t
, pa_auth_cookie_read(u
->auth_cookie
, PA_NATIVE_COOKIE_LENGTH
), PA_NATIVE_COOKIE_LENGTH
);
1722 if (pa_iochannel_creds_supported(io
))
1723 pa_iochannel_creds_enable(io
);
1725 ucred
.uid
= getuid();
1726 ucred
.gid
= getgid();
1728 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1731 pa_pstream_send_tagstruct(u
->pstream
, t
);
1734 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1736 pa_log_debug("Connection established, authenticating ...");
1741 /* Called from main context */
1742 static void sink_set_volume(pa_sink
*sink
) {
1751 t
= pa_tagstruct_new(NULL
, 0);
1752 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1753 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1754 pa_tagstruct_putu32(t
, u
->device_index
);
1755 pa_tagstruct_put_cvolume(t
, &sink
->virtual_volume
);
1756 pa_pstream_send_tagstruct(u
->pstream
, t
);
1759 /* Called from main context */
1760 static void sink_set_mute(pa_sink
*sink
) {
1769 if (u
->version
< 11)
1772 t
= pa_tagstruct_new(NULL
, 0);
1773 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1774 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1775 pa_tagstruct_putu32(t
, u
->device_index
);
1776 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1777 pa_pstream_send_tagstruct(u
->pstream
, t
);
1782 int pa__init(pa_module
*m
) {
1783 pa_modargs
*ma
= NULL
;
1784 struct userdata
*u
= NULL
;
1789 pa_sink_new_data data
;
1791 pa_source_new_data data
;
1796 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1797 pa_log("Failed to parse module arguments");
1801 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1805 u
->pdispatch
= NULL
;
1807 u
->server_name
= NULL
;
1809 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1811 u
->requested_bytes
= 0;
1813 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1816 u
->smoother
= pa_smoother_new(
1825 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1826 u
->time_event
= NULL
;
1827 u
->ignore_latency_before
= 0;
1828 u
->transport_usec
= u
->thread_transport_usec
= 0;
1829 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1830 u
->counter
= u
->counter_delta
= 0;
1832 u
->rtpoll
= pa_rtpoll_new();
1833 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1835 if (!(u
->auth_cookie
= pa_auth_cookie_get(u
->core
, pa_modargs_get_value(ma
, "cookie", PA_NATIVE_COOKIE_FILE
), PA_NATIVE_COOKIE_LENGTH
)))
1838 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1839 pa_log("No server specified.");
1843 ss
= m
->core
->default_sample_spec
;
1844 map
= m
->core
->default_channel_map
;
1845 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1846 pa_log("Invalid sample format specification");
1850 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, TRUE
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1851 pa_log("Failed to connect to server '%s'", u
->server_name
);
1855 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1859 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1860 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1862 pa_sink_new_data_init(&data
);
1863 data
.driver
= __FILE__
;
1865 data
.namereg_fail
= TRUE
;
1866 pa_sink_new_data_set_name(&data
, dn
);
1867 pa_sink_new_data_set_sample_spec(&data
, &ss
);
1868 pa_sink_new_data_set_channel_map(&data
, &map
);
1869 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
1870 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1872 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
1874 if (pa_modargs_get_proplist(ma
, "sink_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
1875 pa_log("Invalid properties");
1876 pa_sink_new_data_done(&data
);
1880 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
|PA_SINK_HW_MUTE_CTRL
);
1881 pa_sink_new_data_done(&data
);
1884 pa_log("Failed to create sink.");
1888 u
->sink
->parent
.process_msg
= sink_process_msg
;
1889 u
->sink
->userdata
= u
;
1890 u
->sink
->set_state
= sink_set_state
;
1891 u
->sink
->set_volume
= sink_set_volume
;
1892 u
->sink
->set_mute
= sink_set_mute
;
1894 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= FALSE
;
1896 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1898 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1899 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1903 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1904 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1906 pa_source_new_data_init(&data
);
1907 data
.driver
= __FILE__
;
1909 data
.namereg_fail
= TRUE
;
1910 pa_source_new_data_set_name(&data
, dn
);
1911 pa_source_new_data_set_sample_spec(&data
, &ss
);
1912 pa_source_new_data_set_channel_map(&data
, &map
);
1913 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
1914 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1916 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
1918 if (pa_modargs_get_proplist(ma
, "source_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
1919 pa_log("Invalid properties");
1920 pa_source_new_data_done(&data
);
1924 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
1925 pa_source_new_data_done(&data
);
1928 pa_log("Failed to create source.");
1932 u
->source
->parent
.process_msg
= source_process_msg
;
1933 u
->source
->set_state
= source_set_state
;
1934 u
->source
->userdata
= u
;
1936 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1938 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1939 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1944 u
->time_event
= NULL
;
1946 u
->maxlength
= (uint32_t) -1;
1948 u
->tlength
= u
->minreq
= u
->prebuf
= (uint32_t) -1;
1950 u
->fragsize
= (uint32_t) -1;
1953 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1954 pa_log("Failed to create thread.");
1959 pa_sink_put(u
->sink
);
1961 pa_source_put(u
->source
);
1964 pa_modargs_free(ma
);
1972 pa_modargs_free(ma
);
1979 void pa__done(pa_module
*m
) {
1984 if (!(u
= m
->userdata
))
1989 pa_sink_unlink(u
->sink
);
1992 pa_source_unlink(u
->source
);
1996 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
1997 pa_thread_free(u
->thread
);
2000 pa_thread_mq_done(&u
->thread_mq
);
2004 pa_sink_unref(u
->sink
);
2007 pa_source_unref(u
->source
);
2011 pa_rtpoll_free(u
->rtpoll
);
2014 pa_pstream_unlink(u
->pstream
);
2015 pa_pstream_unref(u
->pstream
);
2019 pa_pdispatch_unref(u
->pdispatch
);
2022 pa_socket_client_unref(u
->client
);
2025 pa_auth_cookie_unref(u
->auth_cookie
);
2028 pa_smoother_free(u
->smoother
);
2031 u
->core
->mainloop
->time_free(u
->time_event
);
2034 pa_xfree(u
->sink_name
);
2036 pa_xfree(u
->source_name
);
2038 pa_xfree(u
->server_name
);
2040 pa_xfree(u
->device_description
);
2041 pa_xfree(u
->server_fqdn
);
2042 pa_xfree(u
->user_name
);