2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/sink-input.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/time-smoother.h>
52 #include <pulsecore/thread.h>
53 #include <pulsecore/thread-mq.h>
54 #include <pulsecore/core-rtclock.h>
55 #include <pulsecore/core-error.h>
56 #include <pulsecore/proplist-util.h>
57 #include <pulsecore/auth-cookie.h>
58 #include <pulsecore/mcalign.h>
61 #include "module-tunnel-sink-symdef.h"
63 #include "module-tunnel-source-symdef.h"
67 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
69 "sink_name=<name for the local sink> "
70 "sink_properties=<properties for the local sink> "
72 "sink=<remote sink name> "
74 "format=<sample format> "
75 "channels=<number of channels> "
77 "channel_map=<channel map>");
79 PA_MODULE_DESCRIPTION("Tunnel module for sources");
81 "source_name=<name for the local source> "
82 "source_properties=<properties for the local source> "
84 "source=<remote source name> "
86 "format=<sample format> "
87 "channels=<number of channels> "
89 "channel_map=<channel map>");
92 PA_MODULE_AUTHOR("Lennart Poettering");
93 PA_MODULE_VERSION(PACKAGE_VERSION
);
94 PA_MODULE_LOAD_ONCE(FALSE
);
96 static const char* const valid_modargs
[] = {
115 #define DEFAULT_TIMEOUT 5
117 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
119 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
124 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
125 SINK_MESSAGE_REMOTE_SUSPEND
,
126 SINK_MESSAGE_UPDATE_LATENCY
,
130 #define DEFAULT_TLENGTH_MSEC 150
131 #define DEFAULT_MINREQ_MSEC 25
136 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
137 SOURCE_MESSAGE_REMOTE_SUSPEND
,
138 SOURCE_MESSAGE_UPDATE_LATENCY
141 #define DEFAULT_FRAGSIZE_MSEC 25
146 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
147 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
149 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
150 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
151 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
152 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
153 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
154 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
155 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
157 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
159 [PA_COMMAND_REQUEST
] = command_request
,
160 [PA_COMMAND_STARTED
] = command_started
,
162 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
163 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
164 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
165 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
166 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
167 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
168 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
169 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
170 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
171 [PA_COMMAND_PLAYBACK_STREAM_EVENT
] = command_stream_or_client_event
,
172 [PA_COMMAND_RECORD_STREAM_EVENT
] = command_stream_or_client_event
,
173 [PA_COMMAND_CLIENT_EVENT
] = command_stream_or_client_event
,
174 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
,
175 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
182 pa_thread_mq thread_mq
;
186 pa_socket_client
*client
;
188 pa_pdispatch
*pdispatch
;
194 size_t requested_bytes
;
201 pa_auth_cookie
*auth_cookie
;
205 uint32_t device_index
;
208 int64_t counter
, counter_delta
;
210 pa_bool_t remote_corked
:1;
211 pa_bool_t remote_suspended
:1;
213 pa_usec_t transport_usec
; /* maintained in the main thread */
214 pa_usec_t thread_transport_usec
; /* maintained in the IO thread */
216 uint32_t ignore_latency_before
;
218 pa_time_event
*time_event
;
220 pa_smoother
*smoother
;
222 char *device_description
;
236 static void request_latency(struct userdata
*u
);
238 /* Called from main context */
239 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
240 pa_log_debug("Got stream or client event.");
243 /* Called from main context */
244 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
245 struct userdata
*u
= userdata
;
250 pa_assert(u
->pdispatch
== pd
);
252 pa_log_warn("Stream killed");
253 pa_module_unload_request(u
->module
, TRUE
);
256 /* Called from main context */
257 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
258 struct userdata
*u
= userdata
;
263 pa_assert(u
->pdispatch
== pd
);
265 pa_log_info("Server signalled buffer overrun/underrun.");
269 /* Called from main context */
270 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
271 struct userdata
*u
= userdata
;
278 pa_assert(u
->pdispatch
== pd
);
280 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
281 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
282 !pa_tagstruct_eof(t
)) {
284 pa_log("Invalid packet.");
285 pa_module_unload_request(u
->module
, TRUE
);
289 pa_log_debug("Server reports device suspend.");
292 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
294 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
300 /* Called from main context */
301 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
302 struct userdata
*u
= userdata
;
303 uint32_t channel
, di
;
310 pa_assert(u
->pdispatch
== pd
);
312 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
313 pa_tagstruct_getu32(t
, &di
) < 0 ||
314 pa_tagstruct_gets(t
, &dn
) < 0 ||
315 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
317 pa_log_error("Invalid packet.");
318 pa_module_unload_request(u
->module
, TRUE
);
322 pa_log_debug("Server reports a stream move.");
325 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
327 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
333 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
334 struct userdata
*u
= userdata
;
335 uint32_t channel
, maxlength
, tlength
= 0, fragsize
, prebuf
, minreq
;
341 pa_assert(u
->pdispatch
== pd
);
343 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
344 pa_tagstruct_getu32(t
, &maxlength
) < 0) {
346 pa_log_error("Invalid packet.");
347 pa_module_unload_request(u
->module
, TRUE
);
351 if (command
== PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
) {
352 if (pa_tagstruct_getu32(t
, &fragsize
) < 0 ||
353 pa_tagstruct_get_usec(t
, &usec
) < 0) {
355 pa_log_error("Invalid packet.");
356 pa_module_unload_request(u
->module
, TRUE
);
360 if (pa_tagstruct_getu32(t
, &tlength
) < 0 ||
361 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
362 pa_tagstruct_getu32(t
, &minreq
) < 0 ||
363 pa_tagstruct_get_usec(t
, &usec
) < 0) {
365 pa_log_error("Invalid packet.");
366 pa_module_unload_request(u
->module
, TRUE
);
372 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength
, (unsigned long) u
->tlength
);
380 /* Called from main context */
381 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
382 struct userdata
*u
= userdata
;
387 pa_assert(u
->pdispatch
== pd
);
389 pa_log_debug("Server reports playback started.");
395 /* Called from IO thread context */
396 static void check_smoother_status(struct userdata
*u
, pa_bool_t past
) {
401 x
= pa_rtclock_now();
403 /* Correct by the time the requested issued needs to travel to the
404 * other side. This is a valid thread-safe access, because the
405 * main thread is waiting for us */
408 x
-= u
->thread_transport_usec
;
410 x
+= u
->thread_transport_usec
;
412 if (u
->remote_suspended
|| u
->remote_corked
)
413 pa_smoother_pause(u
->smoother
, x
);
415 pa_smoother_resume(u
->smoother
, x
, TRUE
);
418 /* Called from IO thread context */
419 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
422 if (u
->remote_corked
== cork
)
425 u
->remote_corked
= cork
;
426 check_smoother_status(u
, FALSE
);
429 /* Called from main context */
430 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
437 t
= pa_tagstruct_new(NULL
, 0);
439 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
441 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
443 pa_tagstruct_putu32(t
, u
->ctag
++);
444 pa_tagstruct_putu32(t
, u
->channel
);
445 pa_tagstruct_put_boolean(t
, !!cork
);
446 pa_pstream_send_tagstruct(u
->pstream
, t
);
451 /* Called from IO thread context */
452 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
455 if (u
->remote_suspended
== suspend
)
458 u
->remote_suspended
= suspend
;
459 check_smoother_status(u
, TRUE
);
464 /* Called from IO thread context */
465 static void send_data(struct userdata
*u
) {
468 while (u
->requested_bytes
> 0) {
469 pa_memchunk memchunk
;
471 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
472 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
473 pa_memblock_unref(memchunk
.memblock
);
475 u
->requested_bytes
-= memchunk
.length
;
477 u
->counter
+= (int64_t) memchunk
.length
;
481 /* This function is called from IO context -- except when it is not. */
482 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
483 struct userdata
*u
= PA_SINK(o
)->userdata
;
487 case PA_SINK_MESSAGE_SET_STATE
: {
490 /* First, change the state, because otherwide pa_sink_render() would fail */
491 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
493 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
495 if (PA_SINK_IS_OPENED(u
->sink
->state
))
502 case PA_SINK_MESSAGE_GET_LATENCY
: {
503 pa_usec_t yl
, yr
, *usec
= data
;
505 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
506 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
508 *usec
= yl
> yr
? yl
- yr
: 0;
512 case SINK_MESSAGE_REQUEST
:
514 pa_assert(offset
> 0);
515 u
->requested_bytes
+= (size_t) offset
;
517 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
523 case SINK_MESSAGE_REMOTE_SUSPEND
:
525 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
529 case SINK_MESSAGE_UPDATE_LATENCY
: {
532 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
534 if (y
> (pa_usec_t
) offset
)
535 y
-= (pa_usec_t
) offset
;
539 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
541 /* We can access this freely here, since the main thread is waiting for us */
542 u
->thread_transport_usec
= u
->transport_usec
;
547 case SINK_MESSAGE_POST
:
549 /* OK, This might be a bit confusing. This message is
550 * delivered to us from the main context -- NOT from the
551 * IO thread context where the rest of the messages are
552 * dispatched. Yeah, ugly, but I am a lazy bastard. */
554 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
556 u
->counter_delta
+= (int64_t) chunk
->length
;
561 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
564 /* Called from main context */
565 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
567 pa_sink_assert_ref(s
);
570 switch ((pa_sink_state_t
) state
) {
572 case PA_SINK_SUSPENDED
:
573 pa_assert(PA_SINK_IS_OPENED(s
->state
));
574 stream_cork(u
, TRUE
);
578 case PA_SINK_RUNNING
:
579 if (s
->state
== PA_SINK_SUSPENDED
)
580 stream_cork(u
, FALSE
);
583 case PA_SINK_UNLINKED
:
585 case PA_SINK_INVALID_STATE
:
594 /* This function is called from IO context -- except when it is not. */
595 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
596 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
600 case PA_SOURCE_MESSAGE_SET_STATE
: {
603 if ((r
= pa_source_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
604 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
609 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
610 pa_usec_t yr
, yl
, *usec
= data
;
612 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &PA_SOURCE(o
)->sample_spec
);
613 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
615 *usec
= yr
> yl
? yr
- yl
: 0;
619 case SOURCE_MESSAGE_POST
: {
622 pa_mcalign_push(u
->mcalign
, chunk
);
624 while (pa_mcalign_pop(u
->mcalign
, &c
) >= 0) {
626 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
627 pa_source_post(u
->source
, &c
);
629 pa_memblock_unref(c
.memblock
);
631 u
->counter
+= (int64_t) c
.length
;
637 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
639 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
642 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
645 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->source
->sample_spec
);
646 y
+= (pa_usec_t
) offset
;
648 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
650 /* We can access this freely here, since the main thread is waiting for us */
651 u
->thread_transport_usec
= u
->transport_usec
;
657 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
660 /* Called from main context */
661 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
663 pa_source_assert_ref(s
);
666 switch ((pa_source_state_t
) state
) {
668 case PA_SOURCE_SUSPENDED
:
669 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
670 stream_cork(u
, TRUE
);
674 case PA_SOURCE_RUNNING
:
675 if (s
->state
== PA_SOURCE_SUSPENDED
)
676 stream_cork(u
, FALSE
);
679 case PA_SOURCE_UNLINKED
:
681 case PA_SINK_INVALID_STATE
:
690 static void thread_func(void *userdata
) {
691 struct userdata
*u
= userdata
;
695 pa_log_debug("Thread starting up");
697 pa_thread_mq_install(&u
->thread_mq
);
703 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
704 if (PA_UNLIKELY(u
->sink
->thread_info
.rewind_requested
))
705 pa_sink_process_rewind(u
->sink
, 0);
708 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
716 /* If this was no regular exit from the loop we have to continue
717 * processing messages until we received PA_MESSAGE_SHUTDOWN */
718 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
719 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
722 pa_log_debug("Thread shutting down");
726 /* Called from main context */
727 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
728 struct userdata
*u
= userdata
;
729 uint32_t bytes
, channel
;
732 pa_assert(command
== PA_COMMAND_REQUEST
);
735 pa_assert(u
->pdispatch
== pd
);
737 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
738 pa_tagstruct_getu32(t
, &bytes
) < 0) {
739 pa_log("Invalid protocol reply");
743 if (channel
!= u
->channel
) {
744 pa_log("Received data for invalid channel");
748 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
752 pa_module_unload_request(u
->module
, TRUE
);
757 /* Called from main context */
758 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
759 struct userdata
*u
= userdata
;
760 pa_usec_t sink_usec
, source_usec
;
762 int64_t write_index
, read_index
;
763 struct timeval local
, remote
, now
;
770 if (command
!= PA_COMMAND_REPLY
) {
771 if (command
== PA_COMMAND_ERROR
)
772 pa_log("Failed to get latency.");
774 pa_log("Protocol error.");
778 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
779 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
780 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
781 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
782 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
783 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
784 pa_tagstruct_gets64(t
, &read_index
) < 0) {
785 pa_log("Invalid reply.");
790 if (u
->version
>= 13) {
791 uint64_t underrun_for
= 0, playing_for
= 0;
793 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
794 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
795 pa_log("Invalid reply.");
801 if (!pa_tagstruct_eof(t
)) {
802 pa_log("Invalid reply.");
806 if (tag
< u
->ignore_latency_before
) {
810 pa_gettimeofday(&now
);
812 /* Calculate transport usec */
813 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
814 /* local and remote seem to have synchronized clocks */
816 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
818 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
821 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
823 /* First, take the device's delay */
825 delay
= (int64_t) sink_usec
;
826 ss
= &u
->sink
->sample_spec
;
828 delay
= (int64_t) source_usec
;
829 ss
= &u
->source
->sample_spec
;
832 /* Add the length of our server-side buffer */
833 if (write_index
>= read_index
)
834 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) (write_index
-read_index
), ss
);
836 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) (read_index
-write_index
), ss
);
838 /* Our measurements are already out of date, hence correct by the *
839 * transport latency */
841 delay
-= (int64_t) u
->transport_usec
;
843 delay
+= (int64_t) u
->transport_usec
;
846 /* Now correct by what we have have read/written since we requested the update */
848 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
850 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
854 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
856 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
863 pa_module_unload_request(u
->module
, TRUE
);
866 /* Called from main context */
867 static void request_latency(struct userdata
*u
) {
873 t
= pa_tagstruct_new(NULL
, 0);
875 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
877 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
879 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
880 pa_tagstruct_putu32(t
, u
->channel
);
882 pa_tagstruct_put_timeval(t
, pa_gettimeofday(&now
));
884 pa_pstream_send_tagstruct(u
->pstream
, t
);
885 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
887 u
->ignore_latency_before
= tag
;
888 u
->counter_delta
= 0;
891 /* Called from main context */
892 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*t
, void *userdata
) {
893 struct userdata
*u
= userdata
;
901 pa_core_rttime_restart(u
->core
, e
, pa_rtclock_now() + LATENCY_INTERVAL
);
904 /* Called from main context */
905 static void update_description(struct userdata
*u
) {
907 char un
[128], hn
[128];
912 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
915 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
918 pa_sink_set_description(u
->sink
, d
);
919 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
920 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
921 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
923 pa_source_set_description(u
->source
, d
);
924 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
925 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
926 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
931 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
932 pa_get_user_name(un
, sizeof(un
)),
933 pa_get_host_name(hn
, sizeof(hn
)));
935 t
= pa_tagstruct_new(NULL
, 0);
937 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
939 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
941 pa_tagstruct_putu32(t
, u
->ctag
++);
942 pa_tagstruct_putu32(t
, u
->channel
);
943 pa_tagstruct_puts(t
, d
);
944 pa_pstream_send_tagstruct(u
->pstream
, t
);
949 /* Called from main context */
950 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
951 struct userdata
*u
= userdata
;
954 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
960 if (command
!= PA_COMMAND_REPLY
) {
961 if (command
== PA_COMMAND_ERROR
)
962 pa_log("Failed to get info.");
964 pa_log("Protocol error.");
968 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
969 pa_tagstruct_gets(t
, &server_version
) < 0 ||
970 pa_tagstruct_gets(t
, &user_name
) < 0 ||
971 pa_tagstruct_gets(t
, &host_name
) < 0 ||
972 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
973 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
974 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
975 pa_tagstruct_getu32(t
, &cookie
) < 0 ||
976 (u
->version
>= 15 && pa_tagstruct_get_channel_map(t
, &cm
) < 0)) {
978 pa_log("Parse failure");
982 if (!pa_tagstruct_eof(t
)) {
983 pa_log("Packet too long");
987 pa_xfree(u
->server_fqdn
);
988 u
->server_fqdn
= pa_xstrdup(host_name
);
990 pa_xfree(u
->user_name
);
991 u
->user_name
= pa_xstrdup(user_name
);
993 update_description(u
);
998 pa_module_unload_request(u
->module
, TRUE
);
1003 /* Called from main context */
1004 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1005 struct userdata
*u
= userdata
;
1006 uint32_t idx
, owner_module
, monitor_source
, flags
;
1007 const char *name
, *description
, *monitor_source_name
, *driver
;
1018 pl
= pa_proplist_new();
1020 if (command
!= PA_COMMAND_REPLY
) {
1021 if (command
== PA_COMMAND_ERROR
)
1022 pa_log("Failed to get info.");
1024 pa_log("Protocol error.");
1028 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1029 pa_tagstruct_gets(t
, &name
) < 0 ||
1030 pa_tagstruct_gets(t
, &description
) < 0 ||
1031 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1032 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1033 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1034 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1035 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1036 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
1037 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
1038 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1039 pa_tagstruct_gets(t
, &driver
) < 0 ||
1040 pa_tagstruct_getu32(t
, &flags
) < 0) {
1042 pa_log("Parse failure");
1046 if (u
->version
>= 13) {
1047 pa_usec_t configured_latency
;
1049 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1050 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1052 pa_log("Parse failure");
1057 if (u
->version
>= 15) {
1058 pa_volume_t base_volume
;
1059 uint32_t state
, n_volume_steps
, card
;
1061 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1062 pa_tagstruct_getu32(t
, &state
) < 0 ||
1063 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1064 pa_tagstruct_getu32(t
, &card
) < 0) {
1066 pa_log("Parse failure");
1071 if (u
->version
>= 16) {
1075 if (pa_tagstruct_getu32(t
, &n_ports
)) {
1076 pa_log("Parse failure");
1080 for (uint32_t j
= 0; j
< n_ports
; j
++) {
1083 if (pa_tagstruct_gets(t
, &s
) < 0 || /* name */
1084 pa_tagstruct_gets(t
, &s
) < 0 || /* description */
1085 pa_tagstruct_getu32(t
, &priority
) < 0) {
1087 pa_log("Parse failure");
1092 if (pa_tagstruct_gets(t
, &s
) < 0) { /* active port */
1093 pa_log("Parse failure");
1098 if (u
->version
>= 21) {
1100 pa_format_info format
;
1102 if (pa_tagstruct_getu8(t
, &n_formats
) < 0) { /* no. of formats */
1103 pa_log("Parse failure");
1107 for (uint8_t j
= 0; j
< n_formats
; j
++) {
1108 if (pa_tagstruct_get_format_info(t
, &format
)) { /* format info */
1109 pa_log("Parse failure");
1115 if (!pa_tagstruct_eof(t
)) {
1116 pa_log("Packet too long");
1120 pa_proplist_free(pl
);
1122 if (!u
->sink_name
|| strcmp(name
, u
->sink_name
))
1125 pa_xfree(u
->device_description
);
1126 u
->device_description
= pa_xstrdup(description
);
1128 update_description(u
);
1133 pa_module_unload_request(u
->module
, TRUE
);
1134 pa_proplist_free(pl
);
1137 /* Called from main context */
1138 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1139 struct userdata
*u
= userdata
;
1140 uint32_t idx
, owner_module
, client
, sink
;
1141 pa_usec_t buffer_usec
, sink_usec
;
1142 const char *name
, *driver
, *resample_method
;
1143 pa_bool_t mute
= FALSE
;
1144 pa_sample_spec sample_spec
;
1145 pa_channel_map channel_map
;
1153 pl
= pa_proplist_new();
1155 if (command
!= PA_COMMAND_REPLY
) {
1156 if (command
== PA_COMMAND_ERROR
)
1157 pa_log("Failed to get info.");
1159 pa_log("Protocol error.");
1163 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1164 pa_tagstruct_gets(t
, &name
) < 0 ||
1165 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1166 pa_tagstruct_getu32(t
, &client
) < 0 ||
1167 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1168 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1169 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1170 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1171 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1172 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1173 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1174 pa_tagstruct_gets(t
, &driver
) < 0) {
1176 pa_log("Parse failure");
1180 if (u
->version
>= 11) {
1181 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1183 pa_log("Parse failure");
1188 if (u
->version
>= 13) {
1189 if (pa_tagstruct_get_proplist(t
, pl
) < 0) {
1191 pa_log("Parse failure");
1196 if (u
->version
>= 19) {
1197 if (pa_tagstruct_get_boolean(t
, &b
) < 0) {
1199 pa_log("Parse failure");
1204 if (u
->version
>= 20) {
1205 if (pa_tagstruct_get_boolean(t
, &b
) < 0 ||
1206 pa_tagstruct_get_boolean(t
, &b
) < 0) {
1208 pa_log("Parse failure");
1213 if (u
->version
>= 21) {
1214 pa_format_info format
;
1216 if (pa_tagstruct_get_format_info(t
, &format
) < 0) {
1218 pa_log("Parse failure");
1223 if (!pa_tagstruct_eof(t
)) {
1224 pa_log("Packet too long");
1228 pa_proplist_free(pl
);
1230 if (idx
!= u
->device_index
)
1235 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1236 pa_cvolume_equal(&volume
, &u
->sink
->real_volume
))
1239 pa_sink_volume_changed(u
->sink
, &volume
);
1241 if (u
->version
>= 11)
1242 pa_sink_mute_changed(u
->sink
, mute
);
1247 pa_module_unload_request(u
->module
, TRUE
);
1248 pa_proplist_free(pl
);
1253 /* Called from main context */
1254 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1255 struct userdata
*u
= userdata
;
1256 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1257 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1262 pa_usec_t latency
, configured_latency
;
1268 pl
= pa_proplist_new();
1270 if (command
!= PA_COMMAND_REPLY
) {
1271 if (command
== PA_COMMAND_ERROR
)
1272 pa_log("Failed to get info.");
1274 pa_log("Protocol error.");
1278 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1279 pa_tagstruct_gets(t
, &name
) < 0 ||
1280 pa_tagstruct_gets(t
, &description
) < 0 ||
1281 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1282 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1283 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1284 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1285 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1286 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1287 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1288 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1289 pa_tagstruct_gets(t
, &driver
) < 0 ||
1290 pa_tagstruct_getu32(t
, &flags
) < 0) {
1292 pa_log("Parse failure");
1296 if (u
->version
>= 13) {
1297 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1298 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1300 pa_log("Parse failure");
1305 if (u
->version
>= 15) {
1306 pa_volume_t base_volume
;
1307 uint32_t state
, n_volume_steps
, card
;
1309 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1310 pa_tagstruct_getu32(t
, &state
) < 0 ||
1311 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1312 pa_tagstruct_getu32(t
, &card
) < 0) {
1314 pa_log("Parse failure");
1319 if (u
->version
>= 16) {
1323 if (pa_tagstruct_getu32(t
, &n_ports
)) {
1324 pa_log("Parse failure");
1328 for (uint32_t j
= 0; j
< n_ports
; j
++) {
1331 if (pa_tagstruct_gets(t
, &s
) < 0 || /* name */
1332 pa_tagstruct_gets(t
, &s
) < 0 || /* description */
1333 pa_tagstruct_getu32(t
, &priority
) < 0) {
1335 pa_log("Parse failure");
1340 if (pa_tagstruct_gets(t
, &s
) < 0) { /* active port */
1341 pa_log("Parse failure");
1346 if (!pa_tagstruct_eof(t
)) {
1347 pa_log("Packet too long");
1351 pa_proplist_free(pl
);
1353 if (!u
->source_name
|| strcmp(name
, u
->source_name
))
1356 pa_xfree(u
->device_description
);
1357 u
->device_description
= pa_xstrdup(description
);
1359 update_description(u
);
1364 pa_module_unload_request(u
->module
, TRUE
);
1365 pa_proplist_free(pl
);
1370 /* Called from main context */
1371 static void request_info(struct userdata
*u
) {
1376 t
= pa_tagstruct_new(NULL
, 0);
1377 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1378 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1379 pa_pstream_send_tagstruct(u
->pstream
, t
);
1380 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1383 t
= pa_tagstruct_new(NULL
, 0);
1384 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1385 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1386 pa_tagstruct_putu32(t
, u
->device_index
);
1387 pa_pstream_send_tagstruct(u
->pstream
, t
);
1388 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1391 t
= pa_tagstruct_new(NULL
, 0);
1392 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1393 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1394 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1395 pa_tagstruct_puts(t
, u
->sink_name
);
1396 pa_pstream_send_tagstruct(u
->pstream
, t
);
1397 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1400 if (u
->source_name
) {
1401 t
= pa_tagstruct_new(NULL
, 0);
1402 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1403 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1404 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1405 pa_tagstruct_puts(t
, u
->source_name
);
1406 pa_pstream_send_tagstruct(u
->pstream
, t
);
1407 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1412 /* Called from main context */
1413 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1414 struct userdata
*u
= userdata
;
1415 pa_subscription_event_type_t e
;
1421 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1423 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1424 pa_tagstruct_getu32(t
, &idx
) < 0) {
1425 pa_log("Invalid protocol reply");
1426 pa_module_unload_request(u
->module
, TRUE
);
1430 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1432 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1433 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1435 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1443 /* Called from main context */
1444 static void start_subscribe(struct userdata
*u
) {
1448 t
= pa_tagstruct_new(NULL
, 0);
1449 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1450 pa_tagstruct_putu32(t
, u
->ctag
++);
1451 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1453 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1455 PA_SUBSCRIPTION_MASK_SOURCE
1459 pa_pstream_send_tagstruct(u
->pstream
, t
);
1462 /* Called from main context */
1463 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1464 struct userdata
*u
= userdata
;
1471 pa_assert(u
->pdispatch
== pd
);
1473 if (command
!= PA_COMMAND_REPLY
) {
1474 if (command
== PA_COMMAND_ERROR
)
1475 pa_log("Failed to create stream.");
1477 pa_log("Protocol error.");
1481 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1482 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1484 || pa_tagstruct_getu32(t
, &bytes
) < 0
1489 if (u
->version
>= 9) {
1491 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1492 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1493 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1494 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1497 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1498 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1503 if (u
->version
>= 12) {
1506 uint32_t device_index
;
1508 pa_bool_t suspended
;
1510 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1511 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1512 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1513 pa_tagstruct_gets(t
, &dn
) < 0 ||
1514 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1518 pa_xfree(u
->sink_name
);
1519 u
->sink_name
= pa_xstrdup(dn
);
1521 pa_xfree(u
->source_name
);
1522 u
->source_name
= pa_xstrdup(dn
);
1526 if (u
->version
>= 13) {
1529 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1532 /* #ifdef TUNNEL_SINK */
1533 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1535 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1539 if (u
->version
>= 21) {
1540 pa_format_info format
;
1542 if (pa_tagstruct_get_format_info(t
, &format
) < 0)
1546 if (!pa_tagstruct_eof(t
))
1552 pa_assert(!u
->time_event
);
1553 u
->time_event
= pa_core_rttime_new(u
->core
, pa_rtclock_now() + LATENCY_INTERVAL
, timeout_callback
, u
);
1557 pa_log_debug("Stream created.");
1560 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1566 pa_log("Invalid reply. (Create stream)");
1569 pa_module_unload_request(u
->module
, TRUE
);
1573 /* Called from main context */
1574 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1575 struct userdata
*u
= userdata
;
1576 pa_tagstruct
*reply
;
1577 char name
[256], un
[128], hn
[128];
1584 pa_assert(u
->pdispatch
== pd
);
1586 if (command
!= PA_COMMAND_REPLY
||
1587 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1588 !pa_tagstruct_eof(t
)) {
1590 if (command
== PA_COMMAND_ERROR
)
1591 pa_log("Failed to authenticate");
1593 pa_log("Protocol error.");
1598 /* Minimum supported protocol version */
1599 if (u
->version
< 8) {
1600 pa_log("Incompatible protocol version");
1604 /* Starting with protocol version 13 the MSB of the version tag
1605 reflects if shm is enabled for this connection or not. We don't
1606 support SHM here at all, so we just ignore this. */
1608 if (u
->version
>= 13)
1609 u
->version
&= 0x7FFFFFFFU
;
1611 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1614 pa_proplist_setf(u
->sink
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1615 pa_sink_update_proplist(u
->sink
, 0, NULL
);
1617 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1619 pa_get_user_name(un
, sizeof(un
)),
1620 pa_get_host_name(hn
, sizeof(hn
)));
1622 pa_proplist_setf(u
->source
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1623 pa_source_update_proplist(u
->source
, 0, NULL
);
1625 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1627 pa_get_user_name(un
, sizeof(un
)),
1628 pa_get_host_name(hn
, sizeof(hn
)));
1631 reply
= pa_tagstruct_new(NULL
, 0);
1632 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1633 pa_tagstruct_putu32(reply
, u
->ctag
++);
1635 if (u
->version
>= 13) {
1637 pl
= pa_proplist_new();
1638 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1639 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1640 pa_init_proplist(pl
);
1641 pa_tagstruct_put_proplist(reply
, pl
);
1642 pa_proplist_free(pl
);
1644 pa_tagstruct_puts(reply
, "PulseAudio");
1646 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1647 /* We ignore the server's reply here */
1649 reply
= pa_tagstruct_new(NULL
, 0);
1651 if (u
->version
< 13)
1652 /* Only for older PA versions we need to fill in the maxlength */
1653 u
->maxlength
= 4*1024*1024;
1656 u
->tlength
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1657 u
->minreq
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1658 u
->prebuf
= u
->tlength
;
1660 u
->fragsize
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1664 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1665 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1667 if (u
->version
< 13)
1668 pa_tagstruct_puts(reply
, name
);
1670 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1671 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1672 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1673 pa_tagstruct_puts(reply
, u
->sink_name
);
1674 pa_tagstruct_putu32(reply
, u
->maxlength
);
1675 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1676 pa_tagstruct_putu32(reply
, u
->tlength
);
1677 pa_tagstruct_putu32(reply
, u
->prebuf
);
1678 pa_tagstruct_putu32(reply
, u
->minreq
);
1679 pa_tagstruct_putu32(reply
, 0);
1680 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1681 pa_tagstruct_put_cvolume(reply
, &volume
);
1683 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1684 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1686 if (u
->version
< 13)
1687 pa_tagstruct_puts(reply
, name
);
1689 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1690 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1691 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1692 pa_tagstruct_puts(reply
, u
->source_name
);
1693 pa_tagstruct_putu32(reply
, u
->maxlength
);
1694 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1695 pa_tagstruct_putu32(reply
, u
->fragsize
);
1698 if (u
->version
>= 12) {
1699 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1700 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1701 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1702 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1703 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1704 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1705 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1708 if (u
->version
>= 13) {
1711 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1712 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1714 pl
= pa_proplist_new();
1715 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1716 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1717 pa_tagstruct_put_proplist(reply
, pl
);
1718 pa_proplist_free(pl
);
1721 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1725 if (u
->version
>= 14) {
1727 pa_tagstruct_put_boolean(reply
, FALSE
); /* volume_set */
1729 pa_tagstruct_put_boolean(reply
, TRUE
); /* early rquests */
1732 if (u
->version
>= 15) {
1734 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted_set */
1736 pa_tagstruct_put_boolean(reply
, FALSE
); /* don't inhibit auto suspend */
1737 pa_tagstruct_put_boolean(reply
, FALSE
); /* fail on suspend */
1741 if (u
->version
>= 17)
1742 pa_tagstruct_put_boolean(reply
, FALSE
); /* relative volume */
1744 if (u
->version
>= 18)
1745 pa_tagstruct_put_boolean(reply
, FALSE
); /* passthrough stream */
1749 if (u
->version
>= 21) {
1750 /* We're not using the extended API, so n_formats = 0 and that's that */
1751 pa_tagstruct_putu8(t
, 0);
1755 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1756 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1758 pa_log_debug("Connection authenticated, creating stream ...");
1763 pa_module_unload_request(u
->module
, TRUE
);
1766 /* Called from main context */
1767 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1768 struct userdata
*u
= userdata
;
1773 pa_log_warn("Stream died.");
1774 pa_module_unload_request(u
->module
, TRUE
);
1777 /* Called from main context */
1778 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1779 struct userdata
*u
= userdata
;
1785 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1786 pa_log("Invalid packet");
1787 pa_module_unload_request(u
->module
, TRUE
);
1793 /* Called from main context */
1794 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1795 struct userdata
*u
= userdata
;
1801 if (channel
!= u
->channel
) {
1802 pa_log("Received memory block on bad channel.");
1803 pa_module_unload_request(u
->module
, TRUE
);
1807 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1809 u
->counter_delta
+= (int64_t) chunk
->length
;
1813 /* Called from main context */
1814 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1815 struct userdata
*u
= userdata
;
1821 pa_assert(u
->client
== sc
);
1823 pa_socket_client_unref(u
->client
);
1827 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1828 pa_module_unload_request(u
->module
, TRUE
);
1832 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1833 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, TRUE
, command_table
, PA_COMMAND_MAX
);
1835 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1836 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1838 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1841 t
= pa_tagstruct_new(NULL
, 0);
1842 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1843 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1844 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1846 pa_tagstruct_put_arbitrary(t
, pa_auth_cookie_read(u
->auth_cookie
, PA_NATIVE_COOKIE_LENGTH
), PA_NATIVE_COOKIE_LENGTH
);
1852 if (pa_iochannel_creds_supported(io
))
1853 pa_iochannel_creds_enable(io
);
1855 ucred
.uid
= getuid();
1856 ucred
.gid
= getgid();
1858 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1861 pa_pstream_send_tagstruct(u
->pstream
, t
);
1864 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1866 pa_log_debug("Connection established, authenticating ...");
1871 /* Called from main context */
1872 static void sink_set_volume(pa_sink
*sink
) {
1880 t
= pa_tagstruct_new(NULL
, 0);
1881 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1882 pa_tagstruct_putu32(t
, u
->ctag
++);
1883 pa_tagstruct_putu32(t
, u
->device_index
);
1884 pa_tagstruct_put_cvolume(t
, &sink
->real_volume
);
1885 pa_pstream_send_tagstruct(u
->pstream
, t
);
1888 /* Called from main context */
1889 static void sink_set_mute(pa_sink
*sink
) {
1897 if (u
->version
< 11)
1900 t
= pa_tagstruct_new(NULL
, 0);
1901 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1902 pa_tagstruct_putu32(t
, u
->ctag
++);
1903 pa_tagstruct_putu32(t
, u
->device_index
);
1904 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1905 pa_pstream_send_tagstruct(u
->pstream
, t
);
1910 int pa__init(pa_module
*m
) {
1911 pa_modargs
*ma
= NULL
;
1912 struct userdata
*u
= NULL
;
1917 pa_sink_new_data data
;
1919 pa_source_new_data data
;
1924 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1925 pa_log("Failed to parse module arguments");
1929 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1933 u
->pdispatch
= NULL
;
1935 u
->server_name
= NULL
;
1937 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1939 u
->requested_bytes
= 0;
1941 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1944 u
->smoother
= pa_smoother_new(
1953 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1954 u
->time_event
= NULL
;
1955 u
->ignore_latency_before
= 0;
1956 u
->transport_usec
= u
->thread_transport_usec
= 0;
1957 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1958 u
->counter
= u
->counter_delta
= 0;
1960 u
->rtpoll
= pa_rtpoll_new();
1961 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1963 if (!(u
->auth_cookie
= pa_auth_cookie_get(u
->core
, pa_modargs_get_value(ma
, "cookie", PA_NATIVE_COOKIE_FILE
), PA_NATIVE_COOKIE_LENGTH
)))
1966 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1967 pa_log("No server specified.");
1971 ss
= m
->core
->default_sample_spec
;
1972 map
= m
->core
->default_channel_map
;
1973 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1974 pa_log("Invalid sample format specification");
1978 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, TRUE
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1979 pa_log("Failed to connect to server '%s'", u
->server_name
);
1983 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1987 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1988 dn
= pa_sprintf_malloc("tunnel-sink.%s", u
->server_name
);
1990 pa_sink_new_data_init(&data
);
1991 data
.driver
= __FILE__
;
1993 data
.namereg_fail
= TRUE
;
1994 pa_sink_new_data_set_name(&data
, dn
);
1995 pa_sink_new_data_set_sample_spec(&data
, &ss
);
1996 pa_sink_new_data_set_channel_map(&data
, &map
);
1997 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
1998 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
2000 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
2002 if (pa_modargs_get_proplist(ma
, "sink_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
2003 pa_log("Invalid properties");
2004 pa_sink_new_data_done(&data
);
2008 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
|PA_SINK_HW_MUTE_CTRL
);
2009 pa_sink_new_data_done(&data
);
2012 pa_log("Failed to create sink.");
2016 u
->sink
->parent
.process_msg
= sink_process_msg
;
2017 u
->sink
->userdata
= u
;
2018 u
->sink
->set_state
= sink_set_state
;
2019 u
->sink
->set_volume
= sink_set_volume
;
2020 u
->sink
->set_mute
= sink_set_mute
;
2022 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= FALSE
;
2024 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2026 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
2027 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
2031 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
2032 dn
= pa_sprintf_malloc("tunnel-source.%s", u
->server_name
);
2034 pa_source_new_data_init(&data
);
2035 data
.driver
= __FILE__
;
2037 data
.namereg_fail
= TRUE
;
2038 pa_source_new_data_set_name(&data
, dn
);
2039 pa_source_new_data_set_sample_spec(&data
, &ss
);
2040 pa_source_new_data_set_channel_map(&data
, &map
);
2041 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
2042 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
2044 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
2046 if (pa_modargs_get_proplist(ma
, "source_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
2047 pa_log("Invalid properties");
2048 pa_source_new_data_done(&data
);
2052 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
2053 pa_source_new_data_done(&data
);
2056 pa_log("Failed to create source.");
2060 u
->source
->parent
.process_msg
= source_process_msg
;
2061 u
->source
->set_state
= source_set_state
;
2062 u
->source
->userdata
= u
;
2064 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2066 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
2067 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
2069 u
->mcalign
= pa_mcalign_new(pa_frame_size(&u
->source
->sample_spec
));
2074 u
->time_event
= NULL
;
2076 u
->maxlength
= (uint32_t) -1;
2078 u
->tlength
= u
->minreq
= u
->prebuf
= (uint32_t) -1;
2080 u
->fragsize
= (uint32_t) -1;
2083 if (!(u
->thread
= pa_thread_new("module-tunnel", thread_func
, u
))) {
2084 pa_log("Failed to create thread.");
2089 pa_sink_put(u
->sink
);
2091 pa_source_put(u
->source
);
2094 pa_modargs_free(ma
);
2102 pa_modargs_free(ma
);
2109 void pa__done(pa_module
*m
) {
2114 if (!(u
= m
->userdata
))
2119 pa_sink_unlink(u
->sink
);
2122 pa_source_unlink(u
->source
);
2126 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
2127 pa_thread_free(u
->thread
);
2130 pa_thread_mq_done(&u
->thread_mq
);
2134 pa_sink_unref(u
->sink
);
2137 pa_source_unref(u
->source
);
2141 pa_rtpoll_free(u
->rtpoll
);
2144 pa_pstream_unlink(u
->pstream
);
2145 pa_pstream_unref(u
->pstream
);
2149 pa_pdispatch_unref(u
->pdispatch
);
2152 pa_socket_client_unref(u
->client
);
2155 pa_auth_cookie_unref(u
->auth_cookie
);
2158 pa_smoother_free(u
->smoother
);
2161 u
->core
->mainloop
->time_free(u
->time_event
);
2165 pa_mcalign_free(u
->mcalign
);
2169 pa_xfree(u
->sink_name
);
2171 pa_xfree(u
->source_name
);
2173 pa_xfree(u
->server_name
);
2175 pa_xfree(u
->device_description
);
2176 pa_xfree(u
->server_fqdn
);
2177 pa_xfree(u
->user_name
);