2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/time-smoother.h>
50 #include <pulsecore/thread.h>
51 #include <pulsecore/thread-mq.h>
52 #include <pulsecore/core-rtclock.h>
53 #include <pulsecore/core-error.h>
54 #include <pulsecore/proplist-util.h>
55 #include <pulsecore/auth-cookie.h>
56 #include <pulsecore/mcalign.h>
59 #include "module-tunnel-sink-symdef.h"
61 #include "module-tunnel-source-symdef.h"
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
67 "sink_name=<name for the local sink> "
68 "sink_properties=<properties for the local sink> "
70 "sink=<remote sink name> "
72 "format=<sample format> "
73 "channels=<number of channels> "
75 "channel_map=<channel map>");
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
79 "source_name=<name for the local source> "
80 "source_properties=<properties for the local source> "
82 "source=<remote source name> "
84 "format=<sample format> "
85 "channels=<number of channels> "
87 "channel_map=<channel map>");
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION
);
92 PA_MODULE_LOAD_ONCE(FALSE
);
94 static const char* const valid_modargs
[] = {
113 #define DEFAULT_TIMEOUT 5
115 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
122 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
123 SINK_MESSAGE_REMOTE_SUSPEND
,
124 SINK_MESSAGE_UPDATE_LATENCY
,
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
134 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
135 SOURCE_MESSAGE_REMOTE_SUSPEND
,
136 SOURCE_MESSAGE_UPDATE_LATENCY
139 #define DEFAULT_FRAGSIZE_MSEC 25
144 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
145 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
147 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
148 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
149 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
150 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
151 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
152 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
153 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
155 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
157 [PA_COMMAND_REQUEST
] = command_request
,
158 [PA_COMMAND_STARTED
] = command_started
,
160 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
161 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
162 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
163 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
164 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
165 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
166 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
167 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
168 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
169 [PA_COMMAND_PLAYBACK_STREAM_EVENT
] = command_stream_or_client_event
,
170 [PA_COMMAND_RECORD_STREAM_EVENT
] = command_stream_or_client_event
,
171 [PA_COMMAND_CLIENT_EVENT
] = command_stream_or_client_event
,
172 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
,
173 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
180 pa_thread_mq thread_mq
;
184 pa_socket_client
*client
;
186 pa_pdispatch
*pdispatch
;
192 size_t requested_bytes
;
199 pa_auth_cookie
*auth_cookie
;
203 uint32_t device_index
;
206 int64_t counter
, counter_delta
;
208 pa_bool_t remote_corked
:1;
209 pa_bool_t remote_suspended
:1;
211 pa_usec_t transport_usec
; /* maintained in the main thread */
212 pa_usec_t thread_transport_usec
; /* maintained in the IO thread */
214 uint32_t ignore_latency_before
;
216 pa_time_event
*time_event
;
218 pa_smoother
*smoother
;
220 char *device_description
;
234 static void request_latency(struct userdata
*u
);
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
238 pa_log_debug("Got stream or client event.");
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
243 struct userdata
*u
= userdata
;
248 pa_assert(u
->pdispatch
== pd
);
250 pa_log_warn("Stream killed");
251 pa_module_unload_request(u
->module
, TRUE
);
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
256 struct userdata
*u
= userdata
;
261 pa_assert(u
->pdispatch
== pd
);
263 pa_log_info("Server signalled buffer overrun/underrun.");
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
269 struct userdata
*u
= userdata
;
276 pa_assert(u
->pdispatch
== pd
);
278 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
279 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
280 !pa_tagstruct_eof(t
)) {
282 pa_log("Invalid packet.");
283 pa_module_unload_request(u
->module
, TRUE
);
287 pa_log_debug("Server reports device suspend.");
290 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
292 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
298 /* Called from main context */
299 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
300 struct userdata
*u
= userdata
;
301 uint32_t channel
, di
;
308 pa_assert(u
->pdispatch
== pd
);
310 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
311 pa_tagstruct_getu32(t
, &di
) < 0 ||
312 pa_tagstruct_gets(t
, &dn
) < 0 ||
313 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
315 pa_log_error("Invalid packet.");
316 pa_module_unload_request(u
->module
, TRUE
);
320 pa_log_debug("Server reports a stream move.");
323 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
325 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
331 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
332 struct userdata
*u
= userdata
;
333 uint32_t channel
, maxlength
, tlength
= 0, fragsize
, prebuf
, minreq
;
339 pa_assert(u
->pdispatch
== pd
);
341 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
342 pa_tagstruct_getu32(t
, &maxlength
) < 0) {
344 pa_log_error("Invalid packet.");
345 pa_module_unload_request(u
->module
, TRUE
);
349 if (command
== PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
) {
350 if (pa_tagstruct_getu32(t
, &fragsize
) < 0 ||
351 pa_tagstruct_get_usec(t
, &usec
) < 0) {
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u
->module
, TRUE
);
358 if (pa_tagstruct_getu32(t
, &tlength
) < 0 ||
359 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
360 pa_tagstruct_getu32(t
, &minreq
) < 0 ||
361 pa_tagstruct_get_usec(t
, &usec
) < 0) {
363 pa_log_error("Invalid packet.");
364 pa_module_unload_request(u
->module
, TRUE
);
370 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength
, (unsigned long) u
->tlength
);
378 /* Called from main context */
379 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
380 struct userdata
*u
= userdata
;
385 pa_assert(u
->pdispatch
== pd
);
387 pa_log_debug("Server reports playback started.");
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata
*u
, pa_bool_t past
) {
399 x
= pa_rtclock_now();
401 /* Correct by the time the requested issued needs to travel to the
402 * other side. This is a valid thread-safe access, because the
403 * main thread is waiting for us */
406 x
-= u
->thread_transport_usec
;
408 x
+= u
->thread_transport_usec
;
410 if (u
->remote_suspended
|| u
->remote_corked
)
411 pa_smoother_pause(u
->smoother
, x
);
413 pa_smoother_resume(u
->smoother
, x
, TRUE
);
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
420 if (u
->remote_corked
== cork
)
423 u
->remote_corked
= cork
;
424 check_smoother_status(u
, FALSE
);
427 /* Called from main context */
428 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
435 t
= pa_tagstruct_new(NULL
, 0);
437 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
439 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
441 pa_tagstruct_putu32(t
, u
->ctag
++);
442 pa_tagstruct_putu32(t
, u
->channel
);
443 pa_tagstruct_put_boolean(t
, !!cork
);
444 pa_pstream_send_tagstruct(u
->pstream
, t
);
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
453 if (u
->remote_suspended
== suspend
)
456 u
->remote_suspended
= suspend
;
457 check_smoother_status(u
, TRUE
);
462 /* Called from IO thread context */
463 static void send_data(struct userdata
*u
) {
466 while (u
->requested_bytes
> 0) {
467 pa_memchunk memchunk
;
469 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
470 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
471 pa_memblock_unref(memchunk
.memblock
);
473 u
->requested_bytes
-= memchunk
.length
;
475 u
->counter
+= (int64_t) memchunk
.length
;
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
481 struct userdata
*u
= PA_SINK(o
)->userdata
;
485 case PA_SINK_MESSAGE_SET_STATE
: {
488 /* First, change the state, because otherwise pa_sink_render() would fail */
489 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
491 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
493 if (PA_SINK_IS_OPENED(u
->sink
->state
))
500 case PA_SINK_MESSAGE_GET_LATENCY
: {
501 pa_usec_t yl
, yr
, *usec
= data
;
503 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
504 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
506 *usec
= yl
> yr
? yl
- yr
: 0;
510 case SINK_MESSAGE_REQUEST
:
512 pa_assert(offset
> 0);
513 u
->requested_bytes
+= (size_t) offset
;
515 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
520 case SINK_MESSAGE_REMOTE_SUSPEND
:
522 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
525 case SINK_MESSAGE_UPDATE_LATENCY
: {
528 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
530 if (y
> (pa_usec_t
) offset
)
531 y
-= (pa_usec_t
) offset
;
535 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
537 /* We can access this freely here, since the main thread is waiting for us */
538 u
->thread_transport_usec
= u
->transport_usec
;
543 case SINK_MESSAGE_POST
:
545 /* OK, This might be a bit confusing. This message is
546 * delivered to us from the main context -- NOT from the
547 * IO thread context where the rest of the messages are
548 * dispatched. Yeah, ugly, but I am a lazy bastard. */
550 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
552 u
->counter_delta
+= (int64_t) chunk
->length
;
557 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
560 /* Called from main context */
561 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
563 pa_sink_assert_ref(s
);
566 switch ((pa_sink_state_t
) state
) {
568 case PA_SINK_SUSPENDED
:
569 pa_assert(PA_SINK_IS_OPENED(s
->state
));
570 stream_cork(u
, TRUE
);
574 case PA_SINK_RUNNING
:
575 if (s
->state
== PA_SINK_SUSPENDED
)
576 stream_cork(u
, FALSE
);
579 case PA_SINK_UNLINKED
:
581 case PA_SINK_INVALID_STATE
:
590 /* This function is called from IO context -- except when it is not. */
591 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
592 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
596 case PA_SOURCE_MESSAGE_SET_STATE
: {
599 if ((r
= pa_source_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
600 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
605 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
606 pa_usec_t yr
, yl
, *usec
= data
;
608 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &PA_SOURCE(o
)->sample_spec
);
609 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
611 *usec
= yr
> yl
? yr
- yl
: 0;
615 case SOURCE_MESSAGE_POST
: {
618 pa_mcalign_push(u
->mcalign
, chunk
);
620 while (pa_mcalign_pop(u
->mcalign
, &c
) >= 0) {
622 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
623 pa_source_post(u
->source
, &c
);
625 pa_memblock_unref(c
.memblock
);
627 u
->counter
+= (int64_t) c
.length
;
633 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
635 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
638 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
641 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->source
->sample_spec
);
642 y
+= (pa_usec_t
) offset
;
644 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
646 /* We can access this freely here, since the main thread is waiting for us */
647 u
->thread_transport_usec
= u
->transport_usec
;
653 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
656 /* Called from main context */
657 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
659 pa_source_assert_ref(s
);
662 switch ((pa_source_state_t
) state
) {
664 case PA_SOURCE_SUSPENDED
:
665 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
666 stream_cork(u
, TRUE
);
670 case PA_SOURCE_RUNNING
:
671 if (s
->state
== PA_SOURCE_SUSPENDED
)
672 stream_cork(u
, FALSE
);
675 case PA_SOURCE_UNLINKED
:
677 case PA_SINK_INVALID_STATE
:
686 static void thread_func(void *userdata
) {
687 struct userdata
*u
= userdata
;
691 pa_log_debug("Thread starting up");
693 pa_thread_mq_install(&u
->thread_mq
);
699 if (PA_UNLIKELY(u
->sink
->thread_info
.rewind_requested
))
700 pa_sink_process_rewind(u
->sink
, 0);
703 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
711 /* If this was no regular exit from the loop we have to continue
712 * processing messages until we received PA_MESSAGE_SHUTDOWN */
713 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
714 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
717 pa_log_debug("Thread shutting down");
721 /* Called from main context */
722 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
723 struct userdata
*u
= userdata
;
724 uint32_t bytes
, channel
;
727 pa_assert(command
== PA_COMMAND_REQUEST
);
730 pa_assert(u
->pdispatch
== pd
);
732 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
733 pa_tagstruct_getu32(t
, &bytes
) < 0) {
734 pa_log("Invalid protocol reply");
738 if (channel
!= u
->channel
) {
739 pa_log("Received data for invalid channel");
743 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
747 pa_module_unload_request(u
->module
, TRUE
);
752 /* Called from main context */
753 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
754 struct userdata
*u
= userdata
;
755 pa_usec_t sink_usec
, source_usec
;
757 int64_t write_index
, read_index
;
758 struct timeval local
, remote
, now
;
765 if (command
!= PA_COMMAND_REPLY
) {
766 if (command
== PA_COMMAND_ERROR
)
767 pa_log("Failed to get latency.");
769 pa_log("Protocol error.");
773 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
774 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
775 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
776 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
777 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
778 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
779 pa_tagstruct_gets64(t
, &read_index
) < 0) {
780 pa_log("Invalid reply.");
785 if (u
->version
>= 13) {
786 uint64_t underrun_for
= 0, playing_for
= 0;
788 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
789 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
790 pa_log("Invalid reply.");
796 if (!pa_tagstruct_eof(t
)) {
797 pa_log("Invalid reply.");
801 if (tag
< u
->ignore_latency_before
) {
805 pa_gettimeofday(&now
);
807 /* Calculate transport usec */
808 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
809 /* local and remote seem to have synchronized clocks */
811 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
813 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
816 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
818 /* First, take the device's delay */
820 delay
= (int64_t) sink_usec
;
821 ss
= &u
->sink
->sample_spec
;
823 delay
= (int64_t) source_usec
;
824 ss
= &u
->source
->sample_spec
;
827 /* Add the length of our server-side buffer */
828 if (write_index
>= read_index
)
829 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) (write_index
-read_index
), ss
);
831 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) (read_index
-write_index
), ss
);
833 /* Our measurements are already out of date, hence correct by the *
834 * transport latency */
836 delay
-= (int64_t) u
->transport_usec
;
838 delay
+= (int64_t) u
->transport_usec
;
841 /* Now correct by what we have have read/written since we requested the update */
843 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
845 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
849 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
851 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
858 pa_module_unload_request(u
->module
, TRUE
);
861 /* Called from main context */
862 static void request_latency(struct userdata
*u
) {
868 t
= pa_tagstruct_new(NULL
, 0);
870 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
872 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
874 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
875 pa_tagstruct_putu32(t
, u
->channel
);
877 pa_tagstruct_put_timeval(t
, pa_gettimeofday(&now
));
879 pa_pstream_send_tagstruct(u
->pstream
, t
);
880 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
882 u
->ignore_latency_before
= tag
;
883 u
->counter_delta
= 0;
886 /* Called from main context */
887 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*t
, void *userdata
) {
888 struct userdata
*u
= userdata
;
896 pa_core_rttime_restart(u
->core
, e
, pa_rtclock_now() + LATENCY_INTERVAL
);
899 /* Called from main context */
900 static void update_description(struct userdata
*u
) {
902 char un
[128], hn
[128];
907 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
910 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
913 pa_sink_set_description(u
->sink
, d
);
914 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
915 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
916 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
918 pa_source_set_description(u
->source
, d
);
919 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
920 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
921 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
926 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
927 pa_get_user_name(un
, sizeof(un
)),
928 pa_get_host_name(hn
, sizeof(hn
)));
930 t
= pa_tagstruct_new(NULL
, 0);
932 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
934 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
936 pa_tagstruct_putu32(t
, u
->ctag
++);
937 pa_tagstruct_putu32(t
, u
->channel
);
938 pa_tagstruct_puts(t
, d
);
939 pa_pstream_send_tagstruct(u
->pstream
, t
);
944 /* Called from main context */
945 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
946 struct userdata
*u
= userdata
;
949 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
955 if (command
!= PA_COMMAND_REPLY
) {
956 if (command
== PA_COMMAND_ERROR
)
957 pa_log("Failed to get info.");
959 pa_log("Protocol error.");
963 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
964 pa_tagstruct_gets(t
, &server_version
) < 0 ||
965 pa_tagstruct_gets(t
, &user_name
) < 0 ||
966 pa_tagstruct_gets(t
, &host_name
) < 0 ||
967 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
968 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
969 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
970 pa_tagstruct_getu32(t
, &cookie
) < 0 ||
971 (u
->version
>= 15 && pa_tagstruct_get_channel_map(t
, &cm
) < 0)) {
973 pa_log("Parse failure");
977 if (!pa_tagstruct_eof(t
)) {
978 pa_log("Packet too long");
982 pa_xfree(u
->server_fqdn
);
983 u
->server_fqdn
= pa_xstrdup(host_name
);
985 pa_xfree(u
->user_name
);
986 u
->user_name
= pa_xstrdup(user_name
);
988 update_description(u
);
993 pa_module_unload_request(u
->module
, TRUE
);
996 static int read_ports(struct userdata
*u
, pa_tagstruct
*t
) {
997 if (u
->version
>= 16) {
1001 if (pa_tagstruct_getu32(t
, &n_ports
)) {
1002 pa_log("Parse failure");
1003 return -PA_ERR_PROTOCOL
;
1006 for (uint32_t j
= 0; j
< n_ports
; j
++) {
1009 if (pa_tagstruct_gets(t
, &s
) < 0 || /* name */
1010 pa_tagstruct_gets(t
, &s
) < 0 || /* description */
1011 pa_tagstruct_getu32(t
, &priority
) < 0) {
1013 pa_log("Parse failure");
1014 return -PA_ERR_PROTOCOL
;
1016 if (u
->version
>= 24 && pa_tagstruct_getu32(t
, &priority
) < 0) { /* available */
1017 pa_log("Parse failure");
1018 return -PA_ERR_PROTOCOL
;
1022 if (pa_tagstruct_gets(t
, &s
) < 0) { /* active port */
1023 pa_log("Parse failure");
1024 return -PA_ERR_PROTOCOL
;
1030 static int read_formats(struct userdata
*u
, pa_tagstruct
*t
) {
1032 pa_format_info
*format
;
1034 if (pa_tagstruct_getu8(t
, &n_formats
) < 0) { /* no. of formats */
1035 pa_log("Parse failure");
1036 return -PA_ERR_PROTOCOL
;
1039 for (uint8_t j
= 0; j
< n_formats
; j
++) {
1040 format
= pa_format_info_new();
1041 if (pa_tagstruct_get_format_info(t
, format
)) { /* format info */
1042 pa_format_info_free(format
);
1043 pa_log("Parse failure");
1044 return -PA_ERR_PROTOCOL
;
1046 pa_format_info_free(format
);
1053 /* Called from main context */
1054 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1055 struct userdata
*u
= userdata
;
1056 uint32_t idx
, owner_module
, monitor_source
, flags
;
1057 const char *name
, *description
, *monitor_source_name
, *driver
;
1067 if (command
!= PA_COMMAND_REPLY
) {
1068 if (command
== PA_COMMAND_ERROR
)
1069 pa_log("Failed to get info.");
1071 pa_log("Protocol error.");
1075 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1076 pa_tagstruct_gets(t
, &name
) < 0 ||
1077 pa_tagstruct_gets(t
, &description
) < 0 ||
1078 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1079 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1080 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1081 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1082 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1083 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
1084 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
1085 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1086 pa_tagstruct_gets(t
, &driver
) < 0 ||
1087 pa_tagstruct_getu32(t
, &flags
) < 0) {
1089 pa_log("Parse failure");
1093 if (u
->version
>= 13) {
1094 pa_usec_t configured_latency
;
1096 if (pa_tagstruct_get_proplist(t
, NULL
) < 0 ||
1097 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1099 pa_log("Parse failure");
1104 if (u
->version
>= 15) {
1105 pa_volume_t base_volume
;
1106 uint32_t state
, n_volume_steps
, card
;
1108 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1109 pa_tagstruct_getu32(t
, &state
) < 0 ||
1110 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1111 pa_tagstruct_getu32(t
, &card
) < 0) {
1113 pa_log("Parse failure");
1118 if (read_ports(u
, t
) < 0)
1121 if (u
->version
>= 21 && read_formats(u
, t
) < 0)
1124 if (!pa_tagstruct_eof(t
)) {
1125 pa_log("Packet too long");
1129 if (!u
->sink_name
|| !pa_streq(name
, u
->sink_name
))
1132 pa_xfree(u
->device_description
);
1133 u
->device_description
= pa_xstrdup(description
);
1135 update_description(u
);
1140 pa_module_unload_request(u
->module
, TRUE
);
1143 /* Called from main context */
1144 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1145 struct userdata
*u
= userdata
;
1146 uint32_t idx
, owner_module
, client
, sink
;
1147 pa_usec_t buffer_usec
, sink_usec
;
1148 const char *name
, *driver
, *resample_method
;
1149 pa_bool_t mute
= FALSE
;
1150 pa_sample_spec sample_spec
;
1151 pa_channel_map channel_map
;
1158 if (command
!= PA_COMMAND_REPLY
) {
1159 if (command
== PA_COMMAND_ERROR
)
1160 pa_log("Failed to get info.");
1162 pa_log("Protocol error.");
1166 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1167 pa_tagstruct_gets(t
, &name
) < 0 ||
1168 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1169 pa_tagstruct_getu32(t
, &client
) < 0 ||
1170 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1171 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1172 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1173 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1174 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1175 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1176 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1177 pa_tagstruct_gets(t
, &driver
) < 0) {
1179 pa_log("Parse failure");
1183 if (u
->version
>= 11) {
1184 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1186 pa_log("Parse failure");
1191 if (u
->version
>= 13) {
1192 if (pa_tagstruct_get_proplist(t
, NULL
) < 0) {
1194 pa_log("Parse failure");
1199 if (u
->version
>= 19) {
1200 if (pa_tagstruct_get_boolean(t
, &b
) < 0) {
1202 pa_log("Parse failure");
1207 if (u
->version
>= 20) {
1208 if (pa_tagstruct_get_boolean(t
, &b
) < 0 ||
1209 pa_tagstruct_get_boolean(t
, &b
) < 0) {
1211 pa_log("Parse failure");
1216 if (u
->version
>= 21) {
1217 pa_format_info
*format
= pa_format_info_new();
1219 if (pa_tagstruct_get_format_info(t
, format
) < 0) {
1220 pa_format_info_free(format
);
1221 pa_log("Parse failure");
1224 pa_format_info_free(format
);
1227 if (!pa_tagstruct_eof(t
)) {
1228 pa_log("Packet too long");
1232 if (idx
!= u
->device_index
)
1237 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1238 pa_cvolume_equal(&volume
, &u
->sink
->real_volume
))
1241 pa_sink_volume_changed(u
->sink
, &volume
);
1243 if (u
->version
>= 11)
1244 pa_sink_mute_changed(u
->sink
, mute
);
1249 pa_module_unload_request(u
->module
, TRUE
);
1254 /* Called from main context */
1255 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1256 struct userdata
*u
= userdata
;
1257 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1258 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1263 pa_usec_t latency
, configured_latency
;
1268 if (command
!= PA_COMMAND_REPLY
) {
1269 if (command
== PA_COMMAND_ERROR
)
1270 pa_log("Failed to get info.");
1272 pa_log("Protocol error.");
1276 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1277 pa_tagstruct_gets(t
, &name
) < 0 ||
1278 pa_tagstruct_gets(t
, &description
) < 0 ||
1279 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1280 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1281 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1282 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1283 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1284 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1285 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1286 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1287 pa_tagstruct_gets(t
, &driver
) < 0 ||
1288 pa_tagstruct_getu32(t
, &flags
) < 0) {
1290 pa_log("Parse failure");
1294 if (u
->version
>= 13) {
1295 if (pa_tagstruct_get_proplist(t
, NULL
) < 0 ||
1296 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1298 pa_log("Parse failure");
1303 if (u
->version
>= 15) {
1304 pa_volume_t base_volume
;
1305 uint32_t state
, n_volume_steps
, card
;
1307 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1308 pa_tagstruct_getu32(t
, &state
) < 0 ||
1309 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1310 pa_tagstruct_getu32(t
, &card
) < 0) {
1312 pa_log("Parse failure");
1317 if (read_ports(u
, t
) < 0)
1320 if (u
->version
>= 22 && read_formats(u
, t
) < 0)
1323 if (!pa_tagstruct_eof(t
)) {
1324 pa_log("Packet too long");
1328 if (!u
->source_name
|| !pa_streq(name
, u
->source_name
))
1331 pa_xfree(u
->device_description
);
1332 u
->device_description
= pa_xstrdup(description
);
1334 update_description(u
);
1339 pa_module_unload_request(u
->module
, TRUE
);
1344 /* Called from main context */
1345 static void request_info(struct userdata
*u
) {
1350 t
= pa_tagstruct_new(NULL
, 0);
1351 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1352 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1353 pa_pstream_send_tagstruct(u
->pstream
, t
);
1354 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1357 t
= pa_tagstruct_new(NULL
, 0);
1358 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1359 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1360 pa_tagstruct_putu32(t
, u
->device_index
);
1361 pa_pstream_send_tagstruct(u
->pstream
, t
);
1362 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1365 t
= pa_tagstruct_new(NULL
, 0);
1366 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1367 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1368 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1369 pa_tagstruct_puts(t
, u
->sink_name
);
1370 pa_pstream_send_tagstruct(u
->pstream
, t
);
1371 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1374 if (u
->source_name
) {
1375 t
= pa_tagstruct_new(NULL
, 0);
1376 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1377 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1378 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1379 pa_tagstruct_puts(t
, u
->source_name
);
1380 pa_pstream_send_tagstruct(u
->pstream
, t
);
1381 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1386 /* Called from main context */
1387 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1388 struct userdata
*u
= userdata
;
1389 pa_subscription_event_type_t e
;
1395 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1397 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1398 pa_tagstruct_getu32(t
, &idx
) < 0) {
1399 pa_log("Invalid protocol reply");
1400 pa_module_unload_request(u
->module
, TRUE
);
1404 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1406 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1407 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1409 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1417 /* Called from main context */
1418 static void start_subscribe(struct userdata
*u
) {
1422 t
= pa_tagstruct_new(NULL
, 0);
1423 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1424 pa_tagstruct_putu32(t
, u
->ctag
++);
1425 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1427 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1429 PA_SUBSCRIPTION_MASK_SOURCE
1433 pa_pstream_send_tagstruct(u
->pstream
, t
);
1436 /* Called from main context */
1437 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1438 struct userdata
*u
= userdata
;
1445 pa_assert(u
->pdispatch
== pd
);
1447 if (command
!= PA_COMMAND_REPLY
) {
1448 if (command
== PA_COMMAND_ERROR
)
1449 pa_log("Failed to create stream.");
1451 pa_log("Protocol error.");
1455 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1456 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1458 || pa_tagstruct_getu32(t
, &bytes
) < 0
1463 if (u
->version
>= 9) {
1465 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1466 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1467 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1468 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1471 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1472 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1477 if (u
->version
>= 12) {
1480 uint32_t device_index
;
1482 pa_bool_t suspended
;
1484 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1485 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1486 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1487 pa_tagstruct_gets(t
, &dn
) < 0 ||
1488 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1492 pa_xfree(u
->sink_name
);
1493 u
->sink_name
= pa_xstrdup(dn
);
1495 pa_xfree(u
->source_name
);
1496 u
->source_name
= pa_xstrdup(dn
);
1500 if (u
->version
>= 13) {
1503 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1506 /* #ifdef TUNNEL_SINK */
1507 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1509 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1513 if (u
->version
>= 21) {
1514 pa_format_info
*format
= pa_format_info_new();
1516 if (pa_tagstruct_get_format_info(t
, format
) < 0) {
1517 pa_format_info_free(format
);
1521 pa_format_info_free(format
);
1524 if (!pa_tagstruct_eof(t
))
1530 pa_assert(!u
->time_event
);
1531 u
->time_event
= pa_core_rttime_new(u
->core
, pa_rtclock_now() + LATENCY_INTERVAL
, timeout_callback
, u
);
1535 pa_log_debug("Stream created.");
1538 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1544 pa_log("Invalid reply. (Create stream)");
1547 pa_module_unload_request(u
->module
, TRUE
);
1551 /* Called from main context */
1552 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1553 struct userdata
*u
= userdata
;
1554 pa_tagstruct
*reply
;
1555 char name
[256], un
[128], hn
[128];
1560 pa_assert(u
->pdispatch
== pd
);
1562 if (command
!= PA_COMMAND_REPLY
||
1563 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1564 !pa_tagstruct_eof(t
)) {
1566 if (command
== PA_COMMAND_ERROR
)
1567 pa_log("Failed to authenticate");
1569 pa_log("Protocol error.");
1574 /* Minimum supported protocol version */
1575 if (u
->version
< 8) {
1576 pa_log("Incompatible protocol version");
1580 /* Starting with protocol version 13 the MSB of the version tag
1581 reflects if shm is enabled for this connection or not. We don't
1582 support SHM here at all, so we just ignore this. */
1584 if (u
->version
>= 13)
1585 u
->version
&= 0x7FFFFFFFU
;
1587 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1590 pa_proplist_setf(u
->sink
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1591 pa_sink_update_proplist(u
->sink
, 0, NULL
);
1593 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1595 pa_get_user_name(un
, sizeof(un
)),
1596 pa_get_host_name(hn
, sizeof(hn
)));
1598 pa_proplist_setf(u
->source
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1599 pa_source_update_proplist(u
->source
, 0, NULL
);
1601 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1603 pa_get_user_name(un
, sizeof(un
)),
1604 pa_get_host_name(hn
, sizeof(hn
)));
1607 reply
= pa_tagstruct_new(NULL
, 0);
1608 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1609 pa_tagstruct_putu32(reply
, u
->ctag
++);
1611 if (u
->version
>= 13) {
1613 pl
= pa_proplist_new();
1614 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1615 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1616 pa_init_proplist(pl
);
1617 pa_tagstruct_put_proplist(reply
, pl
);
1618 pa_proplist_free(pl
);
1620 pa_tagstruct_puts(reply
, "PulseAudio");
1622 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1623 /* We ignore the server's reply here */
1625 reply
= pa_tagstruct_new(NULL
, 0);
1627 if (u
->version
< 13)
1628 /* Only for older PA versions we need to fill in the maxlength */
1629 u
->maxlength
= 4*1024*1024;
1632 u
->tlength
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1633 u
->minreq
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1634 u
->prebuf
= u
->tlength
;
1636 u
->fragsize
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1640 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1641 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1643 if (u
->version
< 13)
1644 pa_tagstruct_puts(reply
, name
);
1646 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1647 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1648 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1649 pa_tagstruct_puts(reply
, u
->sink_name
);
1650 pa_tagstruct_putu32(reply
, u
->maxlength
);
1651 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1652 pa_tagstruct_putu32(reply
, u
->tlength
);
1653 pa_tagstruct_putu32(reply
, u
->prebuf
);
1654 pa_tagstruct_putu32(reply
, u
->minreq
);
1655 pa_tagstruct_putu32(reply
, 0);
1656 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1657 pa_tagstruct_put_cvolume(reply
, &volume
);
1659 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1660 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1662 if (u
->version
< 13)
1663 pa_tagstruct_puts(reply
, name
);
1665 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1666 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1667 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1668 pa_tagstruct_puts(reply
, u
->source_name
);
1669 pa_tagstruct_putu32(reply
, u
->maxlength
);
1670 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1671 pa_tagstruct_putu32(reply
, u
->fragsize
);
1674 if (u
->version
>= 12) {
1675 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1676 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1677 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1678 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1679 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1680 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1681 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1684 if (u
->version
>= 13) {
1687 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1688 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1690 pl
= pa_proplist_new();
1691 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1692 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1693 pa_tagstruct_put_proplist(reply
, pl
);
1694 pa_proplist_free(pl
);
1697 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1701 if (u
->version
>= 14) {
1703 pa_tagstruct_put_boolean(reply
, FALSE
); /* volume_set */
1705 pa_tagstruct_put_boolean(reply
, TRUE
); /* early rquests */
1708 if (u
->version
>= 15) {
1710 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted_set */
1712 pa_tagstruct_put_boolean(reply
, FALSE
); /* don't inhibit auto suspend */
1713 pa_tagstruct_put_boolean(reply
, FALSE
); /* fail on suspend */
1717 if (u
->version
>= 17)
1718 pa_tagstruct_put_boolean(reply
, FALSE
); /* relative volume */
1720 if (u
->version
>= 18)
1721 pa_tagstruct_put_boolean(reply
, FALSE
); /* passthrough stream */
1725 if (u
->version
>= 21) {
1726 /* We're not using the extended API, so n_formats = 0 and that's that */
1727 pa_tagstruct_putu8(reply
, 0);
1730 if (u
->version
>= 22) {
1731 /* We're not using the extended API, so n_formats = 0 and that's that */
1732 pa_tagstruct_putu8(reply
, 0);
1733 pa_cvolume_reset(&volume
, u
->source
->sample_spec
.channels
);
1734 pa_tagstruct_put_cvolume(reply
, &volume
);
1735 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted */
1736 pa_tagstruct_put_boolean(reply
, FALSE
); /* volume_set */
1737 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted_set */
1738 pa_tagstruct_put_boolean(reply
, FALSE
); /* relative volume */
1739 pa_tagstruct_put_boolean(reply
, FALSE
); /* passthrough stream */
1743 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1744 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1746 pa_log_debug("Connection authenticated, creating stream ...");
1751 pa_module_unload_request(u
->module
, TRUE
);
1754 /* Called from main context */
1755 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1756 struct userdata
*u
= userdata
;
1761 pa_log_warn("Stream died.");
1762 pa_module_unload_request(u
->module
, TRUE
);
1765 /* Called from main context */
1766 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1767 struct userdata
*u
= userdata
;
1773 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1774 pa_log("Invalid packet");
1775 pa_module_unload_request(u
->module
, TRUE
);
1781 /* Called from main context */
1782 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1783 struct userdata
*u
= userdata
;
1789 if (channel
!= u
->channel
) {
1790 pa_log("Received memory block on bad channel.");
1791 pa_module_unload_request(u
->module
, TRUE
);
1795 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1797 u
->counter_delta
+= (int64_t) chunk
->length
;
1801 /* Called from main context */
1802 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1803 struct userdata
*u
= userdata
;
1809 pa_assert(u
->client
== sc
);
1811 pa_socket_client_unref(u
->client
);
1815 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1816 pa_module_unload_request(u
->module
, TRUE
);
1820 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1821 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, TRUE
, command_table
, PA_COMMAND_MAX
);
1823 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1824 pa_pstream_set_receive_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1826 pa_pstream_set_receive_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1829 t
= pa_tagstruct_new(NULL
, 0);
1830 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1831 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1832 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1834 pa_tagstruct_put_arbitrary(t
, pa_auth_cookie_read(u
->auth_cookie
, PA_NATIVE_COOKIE_LENGTH
), PA_NATIVE_COOKIE_LENGTH
);
1840 if (pa_iochannel_creds_supported(io
))
1841 pa_iochannel_creds_enable(io
);
1843 ucred
.uid
= getuid();
1844 ucred
.gid
= getgid();
1846 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1849 pa_pstream_send_tagstruct(u
->pstream
, t
);
1852 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1854 pa_log_debug("Connection established, authenticating ...");
1859 /* Called from main context */
1860 static void sink_set_volume(pa_sink
*sink
) {
1868 t
= pa_tagstruct_new(NULL
, 0);
1869 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1870 pa_tagstruct_putu32(t
, u
->ctag
++);
1871 pa_tagstruct_putu32(t
, u
->device_index
);
1872 pa_tagstruct_put_cvolume(t
, &sink
->real_volume
);
1873 pa_pstream_send_tagstruct(u
->pstream
, t
);
1876 /* Called from main context */
1877 static void sink_set_mute(pa_sink
*sink
) {
1885 if (u
->version
< 11)
1888 t
= pa_tagstruct_new(NULL
, 0);
1889 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1890 pa_tagstruct_putu32(t
, u
->ctag
++);
1891 pa_tagstruct_putu32(t
, u
->device_index
);
1892 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1893 pa_pstream_send_tagstruct(u
->pstream
, t
);
1898 int pa__init(pa_module
*m
) {
1899 pa_modargs
*ma
= NULL
;
1900 struct userdata
*u
= NULL
;
1905 pa_sink_new_data data
;
1907 pa_source_new_data data
;
1912 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1913 pa_log("Failed to parse module arguments");
1917 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1921 u
->pdispatch
= NULL
;
1923 u
->server_name
= NULL
;
1925 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1927 u
->requested_bytes
= 0;
1929 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1932 u
->smoother
= pa_smoother_new(
1941 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1942 u
->time_event
= NULL
;
1943 u
->ignore_latency_before
= 0;
1944 u
->transport_usec
= u
->thread_transport_usec
= 0;
1945 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1946 u
->counter
= u
->counter_delta
= 0;
1948 u
->rtpoll
= pa_rtpoll_new();
1949 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1951 if (!(u
->auth_cookie
= pa_auth_cookie_get(u
->core
, pa_modargs_get_value(ma
, "cookie", PA_NATIVE_COOKIE_FILE
), TRUE
, PA_NATIVE_COOKIE_LENGTH
)))
1954 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1955 pa_log("No server specified.");
1959 ss
= m
->core
->default_sample_spec
;
1960 map
= m
->core
->default_channel_map
;
1961 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1962 pa_log("Invalid sample format specification");
1966 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, TRUE
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1967 pa_log("Failed to connect to server '%s'", u
->server_name
);
1971 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1975 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1976 dn
= pa_sprintf_malloc("tunnel-sink.%s", u
->server_name
);
1978 pa_sink_new_data_init(&data
);
1979 data
.driver
= __FILE__
;
1981 data
.namereg_fail
= TRUE
;
1982 pa_sink_new_data_set_name(&data
, dn
);
1983 pa_sink_new_data_set_sample_spec(&data
, &ss
);
1984 pa_sink_new_data_set_channel_map(&data
, &map
);
1985 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
1986 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1988 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
1990 if (pa_modargs_get_proplist(ma
, "sink_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
1991 pa_log("Invalid properties");
1992 pa_sink_new_data_done(&data
);
1996 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
);
1997 pa_sink_new_data_done(&data
);
2000 pa_log("Failed to create sink.");
2004 u
->sink
->parent
.process_msg
= sink_process_msg
;
2005 u
->sink
->userdata
= u
;
2006 u
->sink
->set_state
= sink_set_state
;
2007 pa_sink_set_set_volume_callback(u
->sink
, sink_set_volume
);
2008 pa_sink_set_set_mute_callback(u
->sink
, sink_set_mute
);
2010 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= FALSE
;
2012 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2014 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
2015 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
2019 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
2020 dn
= pa_sprintf_malloc("tunnel-source.%s", u
->server_name
);
2022 pa_source_new_data_init(&data
);
2023 data
.driver
= __FILE__
;
2025 data
.namereg_fail
= TRUE
;
2026 pa_source_new_data_set_name(&data
, dn
);
2027 pa_source_new_data_set_sample_spec(&data
, &ss
);
2028 pa_source_new_data_set_channel_map(&data
, &map
);
2029 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
2030 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
2032 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
2034 if (pa_modargs_get_proplist(ma
, "source_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
2035 pa_log("Invalid properties");
2036 pa_source_new_data_done(&data
);
2040 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
2041 pa_source_new_data_done(&data
);
2044 pa_log("Failed to create source.");
2048 u
->source
->parent
.process_msg
= source_process_msg
;
2049 u
->source
->set_state
= source_set_state
;
2050 u
->source
->userdata
= u
;
2052 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2054 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
2055 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
2057 u
->mcalign
= pa_mcalign_new(pa_frame_size(&u
->source
->sample_spec
));
2062 u
->time_event
= NULL
;
2064 u
->maxlength
= (uint32_t) -1;
2066 u
->tlength
= u
->minreq
= u
->prebuf
= (uint32_t) -1;
2068 u
->fragsize
= (uint32_t) -1;
2071 if (!(u
->thread
= pa_thread_new("module-tunnel", thread_func
, u
))) {
2072 pa_log("Failed to create thread.");
2077 pa_sink_put(u
->sink
);
2079 pa_source_put(u
->source
);
2082 pa_modargs_free(ma
);
2090 pa_modargs_free(ma
);
2097 void pa__done(pa_module
*m
) {
2102 if (!(u
= m
->userdata
))
2107 pa_sink_unlink(u
->sink
);
2110 pa_source_unlink(u
->source
);
2114 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
2115 pa_thread_free(u
->thread
);
2118 pa_thread_mq_done(&u
->thread_mq
);
2122 pa_sink_unref(u
->sink
);
2125 pa_source_unref(u
->source
);
2129 pa_rtpoll_free(u
->rtpoll
);
2132 pa_pstream_unlink(u
->pstream
);
2133 pa_pstream_unref(u
->pstream
);
2137 pa_pdispatch_unref(u
->pdispatch
);
2140 pa_socket_client_unref(u
->client
);
2143 pa_auth_cookie_unref(u
->auth_cookie
);
2146 pa_smoother_free(u
->smoother
);
2149 u
->core
->mainloop
->time_free(u
->time_event
);
2153 pa_mcalign_free(u
->mcalign
);
2157 pa_xfree(u
->sink_name
);
2159 pa_xfree(u
->source_name
);
2161 pa_xfree(u
->server_name
);
2163 pa_xfree(u
->device_description
);
2164 pa_xfree(u
->server_fqdn
);
2165 pa_xfree(u
->user_name
);