]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
tunnel: Fix reading state from wrong variable
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/time-smoother.h>
50 #include <pulsecore/thread.h>
51 #include <pulsecore/thread-mq.h>
52 #include <pulsecore/core-rtclock.h>
53 #include <pulsecore/core-error.h>
54 #include <pulsecore/proplist-util.h>
55 #include <pulsecore/auth-cookie.h>
56 #include <pulsecore/mcalign.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67 "sink_name=<name for the local sink> "
68 "sink_properties=<properties for the local sink> "
69 "server=<address> "
70 "sink=<remote sink name> "
71 "cookie=<filename> "
72 "format=<sample format> "
73 "channels=<number of channels> "
74 "rate=<sample rate> "
75 "channel_map=<channel map>");
76 #else
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
78 PA_MODULE_USAGE(
79 "source_name=<name for the local source> "
80 "source_properties=<properties for the local source> "
81 "server=<address> "
82 "source=<remote source name> "
83 "cookie=<filename> "
84 "format=<sample format> "
85 "channels=<number of channels> "
86 "rate=<sample rate> "
87 "channel_map=<channel map>");
88 #endif
89
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION);
92 PA_MODULE_LOAD_ONCE(false);
93
94 static const char* const valid_modargs[] = {
95 "server",
96 "cookie",
97 "format",
98 "channels",
99 "rate",
100 #ifdef TUNNEL_SINK
101 "sink_name",
102 "sink_properties",
103 "sink",
104 #else
105 "source_name",
106 "source_properties",
107 "source",
108 #endif
109 "channel_map",
110 NULL,
111 };
112
113 #define DEFAULT_TIMEOUT 5
114
115 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
116
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
118
119 #ifdef TUNNEL_SINK
120
121 enum {
122 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
123 SINK_MESSAGE_REMOTE_SUSPEND,
124 SINK_MESSAGE_UPDATE_LATENCY,
125 SINK_MESSAGE_POST
126 };
127
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
130
131 #else
132
133 enum {
134 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
135 SOURCE_MESSAGE_REMOTE_SUSPEND,
136 SOURCE_MESSAGE_UPDATE_LATENCY
137 };
138
139 #define DEFAULT_FRAGSIZE_MSEC 25
140
141 #endif
142
143 #ifdef TUNNEL_SINK
144 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 #endif
147 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154
155 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
156 #ifdef TUNNEL_SINK
157 [PA_COMMAND_REQUEST] = command_request,
158 [PA_COMMAND_STARTED] = command_started,
159 #endif
160 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
161 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
162 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
163 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
164 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
165 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
166 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
167 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
168 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
169 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
170 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
171 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
172 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
173 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
174 };
175
176 struct userdata {
177 pa_core *core;
178 pa_module *module;
179
180 pa_thread_mq thread_mq;
181 pa_rtpoll *rtpoll;
182 pa_thread *thread;
183
184 pa_socket_client *client;
185 pa_pstream *pstream;
186 pa_pdispatch *pdispatch;
187
188 char *server_name;
189 #ifdef TUNNEL_SINK
190 char *sink_name;
191 pa_sink *sink;
192 size_t requested_bytes;
193 #else
194 char *source_name;
195 pa_source *source;
196 pa_mcalign *mcalign;
197 #endif
198
199 pa_auth_cookie *auth_cookie;
200
201 uint32_t version;
202 uint32_t ctag;
203 uint32_t device_index;
204 uint32_t channel;
205
206 int64_t counter, counter_delta;
207
208 bool remote_corked:1;
209 bool remote_suspended:1;
210
211 pa_usec_t transport_usec; /* maintained in the main thread */
212 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
213
214 uint32_t ignore_latency_before;
215
216 pa_time_event *time_event;
217
218 pa_smoother *smoother;
219
220 char *device_description;
221 char *server_fqdn;
222 char *user_name;
223
224 uint32_t maxlength;
225 #ifdef TUNNEL_SINK
226 uint32_t tlength;
227 uint32_t minreq;
228 uint32_t prebuf;
229 #else
230 uint32_t fragsize;
231 #endif
232 };
233
234 static void request_latency(struct userdata *u);
235
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
238 pa_log_debug("Got stream or client event.");
239 }
240
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
243 struct userdata *u = userdata;
244
245 pa_assert(pd);
246 pa_assert(t);
247 pa_assert(u);
248 pa_assert(u->pdispatch == pd);
249
250 pa_log_warn("Stream killed");
251 pa_module_unload_request(u->module, true);
252 }
253
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
256 struct userdata *u = userdata;
257
258 pa_assert(pd);
259 pa_assert(t);
260 pa_assert(u);
261 pa_assert(u->pdispatch == pd);
262
263 pa_log_info("Server signalled buffer overrun/underrun.");
264 request_latency(u);
265 }
266
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
269 struct userdata *u = userdata;
270 uint32_t channel;
271 bool suspended;
272
273 pa_assert(pd);
274 pa_assert(t);
275 pa_assert(u);
276 pa_assert(u->pdispatch == pd);
277
278 if (pa_tagstruct_getu32(t, &channel) < 0 ||
279 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
280 !pa_tagstruct_eof(t)) {
281
282 pa_log("Invalid packet.");
283 pa_module_unload_request(u->module, true);
284 return;
285 }
286
287 pa_log_debug("Server reports device suspend.");
288
289 #ifdef TUNNEL_SINK
290 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
291 #else
292 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #endif
294
295 request_latency(u);
296 }
297
298 /* Called from main context */
299 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
300 struct userdata *u = userdata;
301 uint32_t channel, di;
302 const char *dn;
303 bool suspended;
304
305 pa_assert(pd);
306 pa_assert(t);
307 pa_assert(u);
308 pa_assert(u->pdispatch == pd);
309
310 if (pa_tagstruct_getu32(t, &channel) < 0 ||
311 pa_tagstruct_getu32(t, &di) < 0 ||
312 pa_tagstruct_gets(t, &dn) < 0 ||
313 pa_tagstruct_get_boolean(t, &suspended) < 0) {
314
315 pa_log_error("Invalid packet.");
316 pa_module_unload_request(u->module, true);
317 return;
318 }
319
320 pa_log_debug("Server reports a stream move.");
321
322 #ifdef TUNNEL_SINK
323 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
324 #else
325 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #endif
327
328 request_latency(u);
329 }
330
331 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
332 struct userdata *u = userdata;
333 uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
334 pa_usec_t usec;
335
336 pa_assert(pd);
337 pa_assert(t);
338 pa_assert(u);
339 pa_assert(u->pdispatch == pd);
340
341 if (pa_tagstruct_getu32(t, &channel) < 0 ||
342 pa_tagstruct_getu32(t, &maxlength) < 0) {
343
344 pa_log_error("Invalid packet.");
345 pa_module_unload_request(u->module, true);
346 return;
347 }
348
349 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
350 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
351 pa_tagstruct_get_usec(t, &usec) < 0) {
352
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u->module, true);
355 return;
356 }
357 } else {
358 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
359 pa_tagstruct_getu32(t, &prebuf) < 0 ||
360 pa_tagstruct_getu32(t, &minreq) < 0 ||
361 pa_tagstruct_get_usec(t, &usec) < 0) {
362
363 pa_log_error("Invalid packet.");
364 pa_module_unload_request(u->module, true);
365 return;
366 }
367 }
368
369 #ifdef TUNNEL_SINK
370 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
371 #endif
372
373 request_latency(u);
374 }
375
376 #ifdef TUNNEL_SINK
377
378 /* Called from main context */
379 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
380 struct userdata *u = userdata;
381
382 pa_assert(pd);
383 pa_assert(t);
384 pa_assert(u);
385 pa_assert(u->pdispatch == pd);
386
387 pa_log_debug("Server reports playback started.");
388 request_latency(u);
389 }
390
391 #endif
392
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata *u, bool past) {
395 pa_usec_t x;
396
397 pa_assert(u);
398
399 x = pa_rtclock_now();
400
401 /* Correct by the time the requested issued needs to travel to the
402 * other side. This is a valid thread-safe access, because the
403 * main thread is waiting for us */
404
405 if (past)
406 x -= u->thread_transport_usec;
407 else
408 x += u->thread_transport_usec;
409
410 if (u->remote_suspended || u->remote_corked)
411 pa_smoother_pause(u->smoother, x);
412 else
413 pa_smoother_resume(u->smoother, x, true);
414 }
415
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata *u, bool cork) {
418 pa_assert(u);
419
420 if (u->remote_corked == cork)
421 return;
422
423 u->remote_corked = cork;
424 check_smoother_status(u, false);
425 }
426
427 /* Called from main context */
428 static void stream_cork(struct userdata *u, bool cork) {
429 pa_tagstruct *t;
430 pa_assert(u);
431
432 if (!u->pstream)
433 return;
434
435 t = pa_tagstruct_new(NULL, 0);
436 #ifdef TUNNEL_SINK
437 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
438 #else
439 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
440 #endif
441 pa_tagstruct_putu32(t, u->ctag++);
442 pa_tagstruct_putu32(t, u->channel);
443 pa_tagstruct_put_boolean(t, !!cork);
444 pa_pstream_send_tagstruct(u->pstream, t);
445
446 request_latency(u);
447 }
448
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata *u, bool suspend) {
451 pa_assert(u);
452
453 if (u->remote_suspended == suspend)
454 return;
455
456 u->remote_suspended = suspend;
457 check_smoother_status(u, true);
458 }
459
460 #ifdef TUNNEL_SINK
461
462 /* Called from IO thread context */
463 static void send_data(struct userdata *u) {
464 pa_assert(u);
465
466 while (u->requested_bytes > 0) {
467 pa_memchunk memchunk;
468
469 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
470 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
471 pa_memblock_unref(memchunk.memblock);
472
473 u->requested_bytes -= memchunk.length;
474
475 u->counter += (int64_t) memchunk.length;
476 }
477 }
478
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
481 struct userdata *u = PA_SINK(o)->userdata;
482
483 switch (code) {
484
485 case PA_SINK_MESSAGE_SET_STATE: {
486 int r;
487
488 /* First, change the state, because otherwise pa_sink_render() would fail */
489 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
490
491 stream_cork_within_thread(u, u->sink->thread_info.state == PA_SINK_SUSPENDED);
492
493 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
494 send_data(u);
495 }
496
497 return r;
498 }
499
500 case PA_SINK_MESSAGE_GET_LATENCY: {
501 pa_usec_t yl, yr, *usec = data;
502
503 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
504 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
505
506 *usec = yl > yr ? yl - yr : 0;
507 return 0;
508 }
509
510 case SINK_MESSAGE_REQUEST:
511
512 pa_assert(offset > 0);
513 u->requested_bytes += (size_t) offset;
514
515 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
516 send_data(u);
517
518 return 0;
519
520 case SINK_MESSAGE_REMOTE_SUSPEND:
521
522 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
523 return 0;
524
525 case SINK_MESSAGE_UPDATE_LATENCY: {
526 pa_usec_t y;
527
528 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
529
530 if (y > (pa_usec_t) offset)
531 y -= (pa_usec_t) offset;
532 else
533 y = 0;
534
535 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
536
537 /* We can access this freely here, since the main thread is waiting for us */
538 u->thread_transport_usec = u->transport_usec;
539
540 return 0;
541 }
542
543 case SINK_MESSAGE_POST:
544
545 /* OK, This might be a bit confusing. This message is
546 * delivered to us from the main context -- NOT from the
547 * IO thread context where the rest of the messages are
548 * dispatched. Yeah, ugly, but I am a lazy bastard. */
549
550 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
551
552 u->counter_delta += (int64_t) chunk->length;
553
554 return 0;
555 }
556
557 return pa_sink_process_msg(o, code, data, offset, chunk);
558 }
559
560 /* Called from main context */
561 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
562 struct userdata *u;
563 pa_sink_assert_ref(s);
564 u = s->userdata;
565
566 switch ((pa_sink_state_t) state) {
567
568 case PA_SINK_SUSPENDED:
569 pa_assert(PA_SINK_IS_OPENED(s->state));
570 stream_cork(u, true);
571 break;
572
573 case PA_SINK_IDLE:
574 case PA_SINK_RUNNING:
575 if (s->state == PA_SINK_SUSPENDED)
576 stream_cork(u, false);
577 break;
578
579 case PA_SINK_UNLINKED:
580 case PA_SINK_INIT:
581 case PA_SINK_INVALID_STATE:
582 ;
583 }
584
585 return 0;
586 }
587
588 #else
589
590 /* This function is called from IO context -- except when it is not. */
591 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
592 struct userdata *u = PA_SOURCE(o)->userdata;
593
594 switch (code) {
595
596 case PA_SOURCE_MESSAGE_SET_STATE: {
597 int r;
598
599 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
600 stream_cork_within_thread(u, u->source->thread_info.state == PA_SOURCE_SUSPENDED);
601
602 return r;
603 }
604
605 case PA_SOURCE_MESSAGE_GET_LATENCY: {
606 pa_usec_t yr, yl, *usec = data;
607
608 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
609 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
610
611 *usec = yr > yl ? yr - yl : 0;
612 return 0;
613 }
614
615 case SOURCE_MESSAGE_POST: {
616 pa_memchunk c;
617
618 pa_mcalign_push(u->mcalign, chunk);
619
620 while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
621
622 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
623 pa_source_post(u->source, &c);
624
625 pa_memblock_unref(c.memblock);
626
627 u->counter += (int64_t) c.length;
628 }
629
630 return 0;
631 }
632
633 case SOURCE_MESSAGE_REMOTE_SUSPEND:
634
635 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
636 return 0;
637
638 case SOURCE_MESSAGE_UPDATE_LATENCY: {
639 pa_usec_t y;
640
641 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
642 y += (pa_usec_t) offset;
643
644 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
645
646 /* We can access this freely here, since the main thread is waiting for us */
647 u->thread_transport_usec = u->transport_usec;
648
649 return 0;
650 }
651 }
652
653 return pa_source_process_msg(o, code, data, offset, chunk);
654 }
655
656 /* Called from main context */
657 static int source_set_state(pa_source *s, pa_source_state_t state) {
658 struct userdata *u;
659 pa_source_assert_ref(s);
660 u = s->userdata;
661
662 switch ((pa_source_state_t) state) {
663
664 case PA_SOURCE_SUSPENDED:
665 pa_assert(PA_SOURCE_IS_OPENED(s->state));
666 stream_cork(u, true);
667 break;
668
669 case PA_SOURCE_IDLE:
670 case PA_SOURCE_RUNNING:
671 if (s->state == PA_SOURCE_SUSPENDED)
672 stream_cork(u, false);
673 break;
674
675 case PA_SOURCE_UNLINKED:
676 case PA_SOURCE_INIT:
677 case PA_SINK_INVALID_STATE:
678 ;
679 }
680
681 return 0;
682 }
683
684 #endif
685
686 static void thread_func(void *userdata) {
687 struct userdata *u = userdata;
688
689 pa_assert(u);
690
691 pa_log_debug("Thread starting up");
692
693 pa_thread_mq_install(&u->thread_mq);
694
695 for (;;) {
696 int ret;
697
698 #ifdef TUNNEL_SINK
699 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
700 pa_sink_process_rewind(u->sink, 0);
701 #endif
702
703 if ((ret = pa_rtpoll_run(u->rtpoll, true)) < 0)
704 goto fail;
705
706 if (ret == 0)
707 goto finish;
708 }
709
710 fail:
711 /* If this was no regular exit from the loop we have to continue
712 * processing messages until we received PA_MESSAGE_SHUTDOWN */
713 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
714 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
715
716 finish:
717 pa_log_debug("Thread shutting down");
718 }
719
720 #ifdef TUNNEL_SINK
721 /* Called from main context */
722 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
723 struct userdata *u = userdata;
724 uint32_t bytes, channel;
725
726 pa_assert(pd);
727 pa_assert(command == PA_COMMAND_REQUEST);
728 pa_assert(t);
729 pa_assert(u);
730 pa_assert(u->pdispatch == pd);
731
732 if (pa_tagstruct_getu32(t, &channel) < 0 ||
733 pa_tagstruct_getu32(t, &bytes) < 0) {
734 pa_log("Invalid protocol reply");
735 goto fail;
736 }
737
738 if (channel != u->channel) {
739 pa_log("Received data for invalid channel");
740 goto fail;
741 }
742
743 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
744 return;
745
746 fail:
747 pa_module_unload_request(u->module, true);
748 }
749
750 #endif
751
752 /* Called from main context */
753 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
754 struct userdata *u = userdata;
755 pa_usec_t sink_usec, source_usec;
756 bool playing;
757 int64_t write_index, read_index;
758 struct timeval local, remote, now;
759 pa_sample_spec *ss;
760 int64_t delay;
761
762 pa_assert(pd);
763 pa_assert(u);
764
765 if (command != PA_COMMAND_REPLY) {
766 if (command == PA_COMMAND_ERROR)
767 pa_log("Failed to get latency.");
768 else
769 pa_log("Protocol error.");
770 goto fail;
771 }
772
773 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
774 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
775 pa_tagstruct_get_boolean(t, &playing) < 0 ||
776 pa_tagstruct_get_timeval(t, &local) < 0 ||
777 pa_tagstruct_get_timeval(t, &remote) < 0 ||
778 pa_tagstruct_gets64(t, &write_index) < 0 ||
779 pa_tagstruct_gets64(t, &read_index) < 0) {
780 pa_log("Invalid reply.");
781 goto fail;
782 }
783
784 #ifdef TUNNEL_SINK
785 if (u->version >= 13) {
786 uint64_t underrun_for = 0, playing_for = 0;
787
788 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
789 pa_tagstruct_getu64(t, &playing_for) < 0) {
790 pa_log("Invalid reply.");
791 goto fail;
792 }
793 }
794 #endif
795
796 if (!pa_tagstruct_eof(t)) {
797 pa_log("Invalid reply.");
798 goto fail;
799 }
800
801 if (tag < u->ignore_latency_before) {
802 return;
803 }
804
805 pa_gettimeofday(&now);
806
807 /* Calculate transport usec */
808 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
809 /* local and remote seem to have synchronized clocks */
810 #ifdef TUNNEL_SINK
811 u->transport_usec = pa_timeval_diff(&remote, &local);
812 #else
813 u->transport_usec = pa_timeval_diff(&now, &remote);
814 #endif
815 } else
816 u->transport_usec = pa_timeval_diff(&now, &local)/2;
817
818 /* First, take the device's delay */
819 #ifdef TUNNEL_SINK
820 delay = (int64_t) sink_usec;
821 ss = &u->sink->sample_spec;
822 #else
823 delay = (int64_t) source_usec;
824 ss = &u->source->sample_spec;
825 #endif
826
827 /* Add the length of our server-side buffer */
828 if (write_index >= read_index)
829 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
830 else
831 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
832
833 /* Our measurements are already out of date, hence correct by the *
834 * transport latency */
835 #ifdef TUNNEL_SINK
836 delay -= (int64_t) u->transport_usec;
837 #else
838 delay += (int64_t) u->transport_usec;
839 #endif
840
841 /* Now correct by what we have have read/written since we requested the update */
842 #ifdef TUNNEL_SINK
843 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
844 #else
845 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
846 #endif
847
848 #ifdef TUNNEL_SINK
849 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
850 #else
851 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
852 #endif
853
854 return;
855
856 fail:
857
858 pa_module_unload_request(u->module, true);
859 }
860
861 /* Called from main context */
862 static void request_latency(struct userdata *u) {
863 pa_tagstruct *t;
864 struct timeval now;
865 uint32_t tag;
866 pa_assert(u);
867
868 t = pa_tagstruct_new(NULL, 0);
869 #ifdef TUNNEL_SINK
870 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
871 #else
872 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
873 #endif
874 pa_tagstruct_putu32(t, tag = u->ctag++);
875 pa_tagstruct_putu32(t, u->channel);
876
877 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
878
879 pa_pstream_send_tagstruct(u->pstream, t);
880 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
881
882 u->ignore_latency_before = tag;
883 u->counter_delta = 0;
884 }
885
886 /* Called from main context */
887 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
888 struct userdata *u = userdata;
889
890 pa_assert(m);
891 pa_assert(e);
892 pa_assert(u);
893
894 request_latency(u);
895
896 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
897 }
898
899 /* Called from main context */
900 static void update_description(struct userdata *u) {
901 char *d;
902 char un[128], hn[128];
903 pa_tagstruct *t;
904
905 pa_assert(u);
906
907 if (!u->server_fqdn || !u->user_name || !u->device_description)
908 return;
909
910 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
911
912 #ifdef TUNNEL_SINK
913 pa_sink_set_description(u->sink, d);
914 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
915 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
916 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
917 #else
918 pa_source_set_description(u->source, d);
919 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
920 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
921 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
922 #endif
923
924 pa_xfree(d);
925
926 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
927 pa_get_user_name(un, sizeof(un)),
928 pa_get_host_name(hn, sizeof(hn)));
929
930 t = pa_tagstruct_new(NULL, 0);
931 #ifdef TUNNEL_SINK
932 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
933 #else
934 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
935 #endif
936 pa_tagstruct_putu32(t, u->ctag++);
937 pa_tagstruct_putu32(t, u->channel);
938 pa_tagstruct_puts(t, d);
939 pa_pstream_send_tagstruct(u->pstream, t);
940
941 pa_xfree(d);
942 }
943
944 /* Called from main context */
945 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
946 struct userdata *u = userdata;
947 pa_sample_spec ss;
948 pa_channel_map cm;
949 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
950 uint32_t cookie;
951
952 pa_assert(pd);
953 pa_assert(u);
954
955 if (command != PA_COMMAND_REPLY) {
956 if (command == PA_COMMAND_ERROR)
957 pa_log("Failed to get info.");
958 else
959 pa_log("Protocol error.");
960 goto fail;
961 }
962
963 if (pa_tagstruct_gets(t, &server_name) < 0 ||
964 pa_tagstruct_gets(t, &server_version) < 0 ||
965 pa_tagstruct_gets(t, &user_name) < 0 ||
966 pa_tagstruct_gets(t, &host_name) < 0 ||
967 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
968 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
969 pa_tagstruct_gets(t, &default_source_name) < 0 ||
970 pa_tagstruct_getu32(t, &cookie) < 0 ||
971 (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
972
973 pa_log("Parse failure");
974 goto fail;
975 }
976
977 if (!pa_tagstruct_eof(t)) {
978 pa_log("Packet too long");
979 goto fail;
980 }
981
982 pa_xfree(u->server_fqdn);
983 u->server_fqdn = pa_xstrdup(host_name);
984
985 pa_xfree(u->user_name);
986 u->user_name = pa_xstrdup(user_name);
987
988 update_description(u);
989
990 return;
991
992 fail:
993 pa_module_unload_request(u->module, true);
994 }
995
996 static int read_ports(struct userdata *u, pa_tagstruct *t) {
997 if (u->version >= 16) {
998 uint32_t n_ports;
999 const char *s;
1000
1001 if (pa_tagstruct_getu32(t, &n_ports)) {
1002 pa_log("Parse failure");
1003 return -PA_ERR_PROTOCOL;
1004 }
1005
1006 for (uint32_t j = 0; j < n_ports; j++) {
1007 uint32_t priority;
1008
1009 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1010 pa_tagstruct_gets(t, &s) < 0 || /* description */
1011 pa_tagstruct_getu32(t, &priority) < 0) {
1012
1013 pa_log("Parse failure");
1014 return -PA_ERR_PROTOCOL;
1015 }
1016 if (u->version >= 24 && pa_tagstruct_getu32(t, &priority) < 0) { /* available */
1017 pa_log("Parse failure");
1018 return -PA_ERR_PROTOCOL;
1019 }
1020 }
1021
1022 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1023 pa_log("Parse failure");
1024 return -PA_ERR_PROTOCOL;
1025 }
1026 }
1027 return 0;
1028 }
1029
1030 static int read_formats(struct userdata *u, pa_tagstruct *t) {
1031 uint8_t n_formats;
1032 pa_format_info *format;
1033
1034 if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1035 pa_log("Parse failure");
1036 return -PA_ERR_PROTOCOL;
1037 }
1038
1039 for (uint8_t j = 0; j < n_formats; j++) {
1040 format = pa_format_info_new();
1041 if (pa_tagstruct_get_format_info(t, format)) { /* format info */
1042 pa_format_info_free(format);
1043 pa_log("Parse failure");
1044 return -PA_ERR_PROTOCOL;
1045 }
1046 pa_format_info_free(format);
1047 }
1048 return 0;
1049 }
1050
1051 #ifdef TUNNEL_SINK
1052
1053 /* Called from main context */
1054 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1055 struct userdata *u = userdata;
1056 uint32_t idx, owner_module, monitor_source, flags;
1057 const char *name, *description, *monitor_source_name, *driver;
1058 pa_sample_spec ss;
1059 pa_channel_map cm;
1060 pa_cvolume volume;
1061 bool mute;
1062 pa_usec_t latency;
1063
1064 pa_assert(pd);
1065 pa_assert(u);
1066
1067 if (command != PA_COMMAND_REPLY) {
1068 if (command == PA_COMMAND_ERROR)
1069 pa_log("Failed to get info.");
1070 else
1071 pa_log("Protocol error.");
1072 goto fail;
1073 }
1074
1075 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1076 pa_tagstruct_gets(t, &name) < 0 ||
1077 pa_tagstruct_gets(t, &description) < 0 ||
1078 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1079 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1080 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1081 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1082 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1083 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1084 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1085 pa_tagstruct_get_usec(t, &latency) < 0 ||
1086 pa_tagstruct_gets(t, &driver) < 0 ||
1087 pa_tagstruct_getu32(t, &flags) < 0) {
1088
1089 pa_log("Parse failure");
1090 goto fail;
1091 }
1092
1093 if (u->version >= 13) {
1094 pa_usec_t configured_latency;
1095
1096 if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1097 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1098
1099 pa_log("Parse failure");
1100 goto fail;
1101 }
1102 }
1103
1104 if (u->version >= 15) {
1105 pa_volume_t base_volume;
1106 uint32_t state, n_volume_steps, card;
1107
1108 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1109 pa_tagstruct_getu32(t, &state) < 0 ||
1110 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1111 pa_tagstruct_getu32(t, &card) < 0) {
1112
1113 pa_log("Parse failure");
1114 goto fail;
1115 }
1116 }
1117
1118 if (read_ports(u, t) < 0)
1119 goto fail;
1120
1121 if (u->version >= 21 && read_formats(u, t) < 0)
1122 goto fail;
1123
1124 if (!pa_tagstruct_eof(t)) {
1125 pa_log("Packet too long");
1126 goto fail;
1127 }
1128
1129 if (!u->sink_name || !pa_streq(name, u->sink_name))
1130 return;
1131
1132 pa_xfree(u->device_description);
1133 u->device_description = pa_xstrdup(description);
1134
1135 update_description(u);
1136
1137 return;
1138
1139 fail:
1140 pa_module_unload_request(u->module, true);
1141 }
1142
1143 /* Called from main context */
1144 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1145 struct userdata *u = userdata;
1146 uint32_t idx, owner_module, client, sink;
1147 pa_usec_t buffer_usec, sink_usec;
1148 const char *name, *driver, *resample_method;
1149 bool mute = false;
1150 pa_sample_spec sample_spec;
1151 pa_channel_map channel_map;
1152 pa_cvolume volume;
1153 bool b;
1154
1155 pa_assert(pd);
1156 pa_assert(u);
1157
1158 if (command != PA_COMMAND_REPLY) {
1159 if (command == PA_COMMAND_ERROR)
1160 pa_log("Failed to get info.");
1161 else
1162 pa_log("Protocol error.");
1163 goto fail;
1164 }
1165
1166 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1167 pa_tagstruct_gets(t, &name) < 0 ||
1168 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1169 pa_tagstruct_getu32(t, &client) < 0 ||
1170 pa_tagstruct_getu32(t, &sink) < 0 ||
1171 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1172 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1173 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1174 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1175 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1176 pa_tagstruct_gets(t, &resample_method) < 0 ||
1177 pa_tagstruct_gets(t, &driver) < 0) {
1178
1179 pa_log("Parse failure");
1180 goto fail;
1181 }
1182
1183 if (u->version >= 11) {
1184 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1185
1186 pa_log("Parse failure");
1187 goto fail;
1188 }
1189 }
1190
1191 if (u->version >= 13) {
1192 if (pa_tagstruct_get_proplist(t, NULL) < 0) {
1193
1194 pa_log("Parse failure");
1195 goto fail;
1196 }
1197 }
1198
1199 if (u->version >= 19) {
1200 if (pa_tagstruct_get_boolean(t, &b) < 0) {
1201
1202 pa_log("Parse failure");
1203 goto fail;
1204 }
1205 }
1206
1207 if (u->version >= 20) {
1208 if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1209 pa_tagstruct_get_boolean(t, &b) < 0) {
1210
1211 pa_log("Parse failure");
1212 goto fail;
1213 }
1214 }
1215
1216 if (u->version >= 21) {
1217 pa_format_info *format = pa_format_info_new();
1218
1219 if (pa_tagstruct_get_format_info(t, format) < 0) {
1220 pa_format_info_free(format);
1221 pa_log("Parse failure");
1222 goto fail;
1223 }
1224 pa_format_info_free(format);
1225 }
1226
1227 if (!pa_tagstruct_eof(t)) {
1228 pa_log("Packet too long");
1229 goto fail;
1230 }
1231
1232 if (idx != u->device_index)
1233 return;
1234
1235 pa_assert(u->sink);
1236
1237 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1238 pa_cvolume_equal(&volume, &u->sink->real_volume))
1239 return;
1240
1241 pa_sink_volume_changed(u->sink, &volume);
1242
1243 if (u->version >= 11)
1244 pa_sink_mute_changed(u->sink, mute);
1245
1246 return;
1247
1248 fail:
1249 pa_module_unload_request(u->module, true);
1250 }
1251
1252 #else
1253
1254 /* Called from main context */
1255 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1256 struct userdata *u = userdata;
1257 uint32_t idx, owner_module, monitor_of_sink, flags;
1258 const char *name, *description, *monitor_of_sink_name, *driver;
1259 pa_sample_spec ss;
1260 pa_channel_map cm;
1261 pa_cvolume volume;
1262 bool mute;
1263 pa_usec_t latency, configured_latency;
1264
1265 pa_assert(pd);
1266 pa_assert(u);
1267
1268 if (command != PA_COMMAND_REPLY) {
1269 if (command == PA_COMMAND_ERROR)
1270 pa_log("Failed to get info.");
1271 else
1272 pa_log("Protocol error.");
1273 goto fail;
1274 }
1275
1276 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1277 pa_tagstruct_gets(t, &name) < 0 ||
1278 pa_tagstruct_gets(t, &description) < 0 ||
1279 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1280 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1281 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1282 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1283 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1284 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1285 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1286 pa_tagstruct_get_usec(t, &latency) < 0 ||
1287 pa_tagstruct_gets(t, &driver) < 0 ||
1288 pa_tagstruct_getu32(t, &flags) < 0) {
1289
1290 pa_log("Parse failure");
1291 goto fail;
1292 }
1293
1294 if (u->version >= 13) {
1295 if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1296 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1297
1298 pa_log("Parse failure");
1299 goto fail;
1300 }
1301 }
1302
1303 if (u->version >= 15) {
1304 pa_volume_t base_volume;
1305 uint32_t state, n_volume_steps, card;
1306
1307 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1308 pa_tagstruct_getu32(t, &state) < 0 ||
1309 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1310 pa_tagstruct_getu32(t, &card) < 0) {
1311
1312 pa_log("Parse failure");
1313 goto fail;
1314 }
1315 }
1316
1317 if (read_ports(u, t) < 0)
1318 goto fail;
1319
1320 if (u->version >= 22 && read_formats(u, t) < 0)
1321 goto fail;
1322
1323 if (!pa_tagstruct_eof(t)) {
1324 pa_log("Packet too long");
1325 goto fail;
1326 }
1327
1328 if (!u->source_name || !pa_streq(name, u->source_name))
1329 return;
1330
1331 pa_xfree(u->device_description);
1332 u->device_description = pa_xstrdup(description);
1333
1334 update_description(u);
1335
1336 return;
1337
1338 fail:
1339 pa_module_unload_request(u->module, true);
1340 }
1341
1342 #endif
1343
1344 /* Called from main context */
1345 static void request_info(struct userdata *u) {
1346 pa_tagstruct *t;
1347 uint32_t tag;
1348 pa_assert(u);
1349
1350 t = pa_tagstruct_new(NULL, 0);
1351 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1352 pa_tagstruct_putu32(t, tag = u->ctag++);
1353 pa_pstream_send_tagstruct(u->pstream, t);
1354 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1355
1356 #ifdef TUNNEL_SINK
1357 t = pa_tagstruct_new(NULL, 0);
1358 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1359 pa_tagstruct_putu32(t, tag = u->ctag++);
1360 pa_tagstruct_putu32(t, u->device_index);
1361 pa_pstream_send_tagstruct(u->pstream, t);
1362 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1363
1364 if (u->sink_name) {
1365 t = pa_tagstruct_new(NULL, 0);
1366 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1367 pa_tagstruct_putu32(t, tag = u->ctag++);
1368 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1369 pa_tagstruct_puts(t, u->sink_name);
1370 pa_pstream_send_tagstruct(u->pstream, t);
1371 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1372 }
1373 #else
1374 if (u->source_name) {
1375 t = pa_tagstruct_new(NULL, 0);
1376 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1377 pa_tagstruct_putu32(t, tag = u->ctag++);
1378 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1379 pa_tagstruct_puts(t, u->source_name);
1380 pa_pstream_send_tagstruct(u->pstream, t);
1381 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1382 }
1383 #endif
1384 }
1385
1386 /* Called from main context */
1387 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1388 struct userdata *u = userdata;
1389 pa_subscription_event_type_t e;
1390 uint32_t idx;
1391
1392 pa_assert(pd);
1393 pa_assert(t);
1394 pa_assert(u);
1395 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1396
1397 if (pa_tagstruct_getu32(t, &e) < 0 ||
1398 pa_tagstruct_getu32(t, &idx) < 0) {
1399 pa_log("Invalid protocol reply");
1400 pa_module_unload_request(u->module, true);
1401 return;
1402 }
1403
1404 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1405 #ifdef TUNNEL_SINK
1406 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1407 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1408 #else
1409 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1410 #endif
1411 )
1412 return;
1413
1414 request_info(u);
1415 }
1416
1417 /* Called from main context */
1418 static void start_subscribe(struct userdata *u) {
1419 pa_tagstruct *t;
1420 pa_assert(u);
1421
1422 t = pa_tagstruct_new(NULL, 0);
1423 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1424 pa_tagstruct_putu32(t, u->ctag++);
1425 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1426 #ifdef TUNNEL_SINK
1427 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1428 #else
1429 PA_SUBSCRIPTION_MASK_SOURCE
1430 #endif
1431 );
1432
1433 pa_pstream_send_tagstruct(u->pstream, t);
1434 }
1435
1436 /* Called from main context */
1437 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1438 struct userdata *u = userdata;
1439 #ifdef TUNNEL_SINK
1440 uint32_t bytes;
1441 #endif
1442
1443 pa_assert(pd);
1444 pa_assert(u);
1445 pa_assert(u->pdispatch == pd);
1446
1447 if (command != PA_COMMAND_REPLY) {
1448 if (command == PA_COMMAND_ERROR)
1449 pa_log("Failed to create stream.");
1450 else
1451 pa_log("Protocol error.");
1452 goto fail;
1453 }
1454
1455 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1456 pa_tagstruct_getu32(t, &u->device_index) < 0
1457 #ifdef TUNNEL_SINK
1458 || pa_tagstruct_getu32(t, &bytes) < 0
1459 #endif
1460 )
1461 goto parse_error;
1462
1463 if (u->version >= 9) {
1464 #ifdef TUNNEL_SINK
1465 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1466 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1467 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1468 pa_tagstruct_getu32(t, &u->minreq) < 0)
1469 goto parse_error;
1470 #else
1471 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1472 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1473 goto parse_error;
1474 #endif
1475 }
1476
1477 if (u->version >= 12) {
1478 pa_sample_spec ss;
1479 pa_channel_map cm;
1480 uint32_t device_index;
1481 const char *dn;
1482 bool suspended;
1483
1484 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1485 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1486 pa_tagstruct_getu32(t, &device_index) < 0 ||
1487 pa_tagstruct_gets(t, &dn) < 0 ||
1488 pa_tagstruct_get_boolean(t, &suspended) < 0)
1489 goto parse_error;
1490
1491 #ifdef TUNNEL_SINK
1492 pa_xfree(u->sink_name);
1493 u->sink_name = pa_xstrdup(dn);
1494 #else
1495 pa_xfree(u->source_name);
1496 u->source_name = pa_xstrdup(dn);
1497 #endif
1498 }
1499
1500 if (u->version >= 13) {
1501 pa_usec_t usec;
1502
1503 if (pa_tagstruct_get_usec(t, &usec) < 0)
1504 goto parse_error;
1505
1506 /* #ifdef TUNNEL_SINK */
1507 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1508 /* #else */
1509 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1510 /* #endif */
1511 }
1512
1513 if (u->version >= 21) {
1514 pa_format_info *format = pa_format_info_new();
1515
1516 if (pa_tagstruct_get_format_info(t, format) < 0) {
1517 pa_format_info_free(format);
1518 goto parse_error;
1519 }
1520
1521 pa_format_info_free(format);
1522 }
1523
1524 if (!pa_tagstruct_eof(t))
1525 goto parse_error;
1526
1527 start_subscribe(u);
1528 request_info(u);
1529
1530 pa_assert(!u->time_event);
1531 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1532
1533 request_latency(u);
1534
1535 pa_log_debug("Stream created.");
1536
1537 #ifdef TUNNEL_SINK
1538 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1539 #endif
1540
1541 return;
1542
1543 parse_error:
1544 pa_log("Invalid reply. (Create stream)");
1545
1546 fail:
1547 pa_module_unload_request(u->module, true);
1548
1549 }
1550
1551 /* Called from main context */
1552 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1553 struct userdata *u = userdata;
1554 pa_tagstruct *reply;
1555 char name[256], un[128], hn[128];
1556 pa_cvolume volume;
1557
1558 pa_assert(pd);
1559 pa_assert(u);
1560 pa_assert(u->pdispatch == pd);
1561
1562 if (command != PA_COMMAND_REPLY ||
1563 pa_tagstruct_getu32(t, &u->version) < 0 ||
1564 !pa_tagstruct_eof(t)) {
1565
1566 if (command == PA_COMMAND_ERROR)
1567 pa_log("Failed to authenticate");
1568 else
1569 pa_log("Protocol error.");
1570
1571 goto fail;
1572 }
1573
1574 /* Minimum supported protocol version */
1575 if (u->version < 8) {
1576 pa_log("Incompatible protocol version");
1577 goto fail;
1578 }
1579
1580 /* Starting with protocol version 13 the MSB of the version tag
1581 reflects if shm is enabled for this connection or not. We don't
1582 support SHM here at all, so we just ignore this. */
1583
1584 if (u->version >= 13)
1585 u->version &= 0x7FFFFFFFU;
1586
1587 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1588
1589 #ifdef TUNNEL_SINK
1590 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1591 pa_sink_update_proplist(u->sink, 0, NULL);
1592
1593 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1594 u->sink_name,
1595 pa_get_user_name(un, sizeof(un)),
1596 pa_get_host_name(hn, sizeof(hn)));
1597 #else
1598 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1599 pa_source_update_proplist(u->source, 0, NULL);
1600
1601 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1602 u->source_name,
1603 pa_get_user_name(un, sizeof(un)),
1604 pa_get_host_name(hn, sizeof(hn)));
1605 #endif
1606
1607 reply = pa_tagstruct_new(NULL, 0);
1608 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1609 pa_tagstruct_putu32(reply, u->ctag++);
1610
1611 if (u->version >= 13) {
1612 pa_proplist *pl;
1613 pl = pa_proplist_new();
1614 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1615 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1616 pa_init_proplist(pl);
1617 pa_tagstruct_put_proplist(reply, pl);
1618 pa_proplist_free(pl);
1619 } else
1620 pa_tagstruct_puts(reply, "PulseAudio");
1621
1622 pa_pstream_send_tagstruct(u->pstream, reply);
1623 /* We ignore the server's reply here */
1624
1625 reply = pa_tagstruct_new(NULL, 0);
1626
1627 if (u->version < 13)
1628 /* Only for older PA versions we need to fill in the maxlength */
1629 u->maxlength = 4*1024*1024;
1630
1631 #ifdef TUNNEL_SINK
1632 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1633 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1634 u->prebuf = u->tlength;
1635 #else
1636 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1637 #endif
1638
1639 #ifdef TUNNEL_SINK
1640 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1641 pa_tagstruct_putu32(reply, tag = u->ctag++);
1642
1643 if (u->version < 13)
1644 pa_tagstruct_puts(reply, name);
1645
1646 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1647 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1648 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1649 pa_tagstruct_puts(reply, u->sink_name);
1650 pa_tagstruct_putu32(reply, u->maxlength);
1651 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1652 pa_tagstruct_putu32(reply, u->tlength);
1653 pa_tagstruct_putu32(reply, u->prebuf);
1654 pa_tagstruct_putu32(reply, u->minreq);
1655 pa_tagstruct_putu32(reply, 0);
1656 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1657 pa_tagstruct_put_cvolume(reply, &volume);
1658 #else
1659 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1660 pa_tagstruct_putu32(reply, tag = u->ctag++);
1661
1662 if (u->version < 13)
1663 pa_tagstruct_puts(reply, name);
1664
1665 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1666 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1667 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1668 pa_tagstruct_puts(reply, u->source_name);
1669 pa_tagstruct_putu32(reply, u->maxlength);
1670 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1671 pa_tagstruct_putu32(reply, u->fragsize);
1672 #endif
1673
1674 if (u->version >= 12) {
1675 pa_tagstruct_put_boolean(reply, false); /* no_remap */
1676 pa_tagstruct_put_boolean(reply, false); /* no_remix */
1677 pa_tagstruct_put_boolean(reply, false); /* fix_format */
1678 pa_tagstruct_put_boolean(reply, false); /* fix_rate */
1679 pa_tagstruct_put_boolean(reply, false); /* fix_channels */
1680 pa_tagstruct_put_boolean(reply, true); /* no_move */
1681 pa_tagstruct_put_boolean(reply, false); /* variable_rate */
1682 }
1683
1684 if (u->version >= 13) {
1685 pa_proplist *pl;
1686
1687 pa_tagstruct_put_boolean(reply, false); /* start muted/peak detect*/
1688 pa_tagstruct_put_boolean(reply, true); /* adjust_latency */
1689
1690 pl = pa_proplist_new();
1691 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1692 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1693 pa_tagstruct_put_proplist(reply, pl);
1694 pa_proplist_free(pl);
1695
1696 #ifndef TUNNEL_SINK
1697 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1698 #endif
1699 }
1700
1701 if (u->version >= 14) {
1702 #ifdef TUNNEL_SINK
1703 pa_tagstruct_put_boolean(reply, false); /* volume_set */
1704 #endif
1705 pa_tagstruct_put_boolean(reply, true); /* early rquests */
1706 }
1707
1708 if (u->version >= 15) {
1709 #ifdef TUNNEL_SINK
1710 pa_tagstruct_put_boolean(reply, false); /* muted_set */
1711 #endif
1712 pa_tagstruct_put_boolean(reply, false); /* don't inhibit auto suspend */
1713 pa_tagstruct_put_boolean(reply, false); /* fail on suspend */
1714 }
1715
1716 #ifdef TUNNEL_SINK
1717 if (u->version >= 17)
1718 pa_tagstruct_put_boolean(reply, false); /* relative volume */
1719
1720 if (u->version >= 18)
1721 pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1722 #endif
1723
1724 #ifdef TUNNEL_SINK
1725 if (u->version >= 21) {
1726 /* We're not using the extended API, so n_formats = 0 and that's that */
1727 pa_tagstruct_putu8(reply, 0);
1728 }
1729 #else
1730 if (u->version >= 22) {
1731 /* We're not using the extended API, so n_formats = 0 and that's that */
1732 pa_tagstruct_putu8(reply, 0);
1733 pa_cvolume_reset(&volume, u->source->sample_spec.channels);
1734 pa_tagstruct_put_cvolume(reply, &volume);
1735 pa_tagstruct_put_boolean(reply, false); /* muted */
1736 pa_tagstruct_put_boolean(reply, false); /* volume_set */
1737 pa_tagstruct_put_boolean(reply, false); /* muted_set */
1738 pa_tagstruct_put_boolean(reply, false); /* relative volume */
1739 pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1740 }
1741 #endif
1742
1743 pa_pstream_send_tagstruct(u->pstream, reply);
1744 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1745
1746 pa_log_debug("Connection authenticated, creating stream ...");
1747
1748 return;
1749
1750 fail:
1751 pa_module_unload_request(u->module, true);
1752 }
1753
1754 /* Called from main context */
1755 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1756 struct userdata *u = userdata;
1757
1758 pa_assert(p);
1759 pa_assert(u);
1760
1761 pa_log_warn("Stream died.");
1762 pa_module_unload_request(u->module, true);
1763 }
1764
1765 /* Called from main context */
1766 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1767 struct userdata *u = userdata;
1768
1769 pa_assert(p);
1770 pa_assert(packet);
1771 pa_assert(u);
1772
1773 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1774 pa_log("Invalid packet");
1775 pa_module_unload_request(u->module, true);
1776 return;
1777 }
1778 }
1779
1780 #ifndef TUNNEL_SINK
1781 /* Called from main context */
1782 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1783 struct userdata *u = userdata;
1784
1785 pa_assert(p);
1786 pa_assert(chunk);
1787 pa_assert(u);
1788
1789 if (channel != u->channel) {
1790 pa_log("Received memory block on bad channel.");
1791 pa_module_unload_request(u->module, true);
1792 return;
1793 }
1794
1795 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1796
1797 u->counter_delta += (int64_t) chunk->length;
1798 }
1799 #endif
1800
1801 /* Called from main context */
1802 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1803 struct userdata *u = userdata;
1804 pa_tagstruct *t;
1805 uint32_t tag;
1806
1807 pa_assert(sc);
1808 pa_assert(u);
1809 pa_assert(u->client == sc);
1810
1811 pa_socket_client_unref(u->client);
1812 u->client = NULL;
1813
1814 if (!io) {
1815 pa_log("Connection failed: %s", pa_cstrerror(errno));
1816 pa_module_unload_request(u->module, true);
1817 return;
1818 }
1819
1820 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1821 u->pdispatch = pa_pdispatch_new(u->core->mainloop, true, command_table, PA_COMMAND_MAX);
1822
1823 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1824 pa_pstream_set_receive_packet_callback(u->pstream, pstream_packet_callback, u);
1825 #ifndef TUNNEL_SINK
1826 pa_pstream_set_receive_memblock_callback(u->pstream, pstream_memblock_callback, u);
1827 #endif
1828
1829 t = pa_tagstruct_new(NULL, 0);
1830 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1831 pa_tagstruct_putu32(t, tag = u->ctag++);
1832 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1833
1834 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1835
1836 #ifdef HAVE_CREDS
1837 {
1838 pa_creds ucred;
1839
1840 if (pa_iochannel_creds_supported(io))
1841 pa_iochannel_creds_enable(io);
1842
1843 ucred.uid = getuid();
1844 ucred.gid = getgid();
1845
1846 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1847 }
1848 #else
1849 pa_pstream_send_tagstruct(u->pstream, t);
1850 #endif
1851
1852 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1853
1854 pa_log_debug("Connection established, authenticating ...");
1855 }
1856
1857 #ifdef TUNNEL_SINK
1858
1859 /* Called from main context */
1860 static void sink_set_volume(pa_sink *sink) {
1861 struct userdata *u;
1862 pa_tagstruct *t;
1863
1864 pa_assert(sink);
1865 u = sink->userdata;
1866 pa_assert(u);
1867
1868 t = pa_tagstruct_new(NULL, 0);
1869 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1870 pa_tagstruct_putu32(t, u->ctag++);
1871 pa_tagstruct_putu32(t, u->device_index);
1872 pa_tagstruct_put_cvolume(t, &sink->real_volume);
1873 pa_pstream_send_tagstruct(u->pstream, t);
1874 }
1875
1876 /* Called from main context */
1877 static void sink_set_mute(pa_sink *sink) {
1878 struct userdata *u;
1879 pa_tagstruct *t;
1880
1881 pa_assert(sink);
1882 u = sink->userdata;
1883 pa_assert(u);
1884
1885 if (u->version < 11)
1886 return;
1887
1888 t = pa_tagstruct_new(NULL, 0);
1889 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1890 pa_tagstruct_putu32(t, u->ctag++);
1891 pa_tagstruct_putu32(t, u->device_index);
1892 pa_tagstruct_put_boolean(t, !!sink->muted);
1893 pa_pstream_send_tagstruct(u->pstream, t);
1894 }
1895
1896 #endif
1897
1898 int pa__init(pa_module*m) {
1899 pa_modargs *ma = NULL;
1900 struct userdata *u = NULL;
1901 pa_sample_spec ss;
1902 pa_channel_map map;
1903 char *dn = NULL;
1904 #ifdef TUNNEL_SINK
1905 pa_sink_new_data data;
1906 #else
1907 pa_source_new_data data;
1908 #endif
1909
1910 pa_assert(m);
1911
1912 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1913 pa_log("Failed to parse module arguments");
1914 goto fail;
1915 }
1916
1917 m->userdata = u = pa_xnew0(struct userdata, 1);
1918 u->core = m->core;
1919 u->module = m;
1920 u->client = NULL;
1921 u->pdispatch = NULL;
1922 u->pstream = NULL;
1923 u->server_name = NULL;
1924 #ifdef TUNNEL_SINK
1925 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1926 u->sink = NULL;
1927 u->requested_bytes = 0;
1928 #else
1929 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1930 u->source = NULL;
1931 #endif
1932 u->smoother = pa_smoother_new(
1933 PA_USEC_PER_SEC,
1934 PA_USEC_PER_SEC*2,
1935 true,
1936 true,
1937 10,
1938 pa_rtclock_now(),
1939 false);
1940 u->ctag = 1;
1941 u->device_index = u->channel = PA_INVALID_INDEX;
1942 u->time_event = NULL;
1943 u->ignore_latency_before = 0;
1944 u->transport_usec = u->thread_transport_usec = 0;
1945 u->remote_suspended = u->remote_corked = false;
1946 u->counter = u->counter_delta = 0;
1947
1948 u->rtpoll = pa_rtpoll_new();
1949 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1950
1951 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), true, PA_NATIVE_COOKIE_LENGTH)))
1952 goto fail;
1953
1954 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1955 pa_log("No server specified.");
1956 goto fail;
1957 }
1958
1959 ss = m->core->default_sample_spec;
1960 map = m->core->default_channel_map;
1961 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1962 pa_log("Invalid sample format specification");
1963 goto fail;
1964 }
1965
1966 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, true, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1967 pa_log("Failed to connect to server '%s'", u->server_name);
1968 goto fail;
1969 }
1970
1971 pa_socket_client_set_callback(u->client, on_connection, u);
1972
1973 #ifdef TUNNEL_SINK
1974
1975 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1976 dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
1977
1978 pa_sink_new_data_init(&data);
1979 data.driver = __FILE__;
1980 data.module = m;
1981 data.namereg_fail = true;
1982 pa_sink_new_data_set_name(&data, dn);
1983 pa_sink_new_data_set_sample_spec(&data, &ss);
1984 pa_sink_new_data_set_channel_map(&data, &map);
1985 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1986 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1987 if (u->sink_name)
1988 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1989
1990 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1991 pa_log("Invalid properties");
1992 pa_sink_new_data_done(&data);
1993 goto fail;
1994 }
1995
1996 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
1997 pa_sink_new_data_done(&data);
1998
1999 if (!u->sink) {
2000 pa_log("Failed to create sink.");
2001 goto fail;
2002 }
2003
2004 u->sink->parent.process_msg = sink_process_msg;
2005 u->sink->userdata = u;
2006 u->sink->set_state = sink_set_state;
2007 pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
2008 pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
2009
2010 u->sink->refresh_volume = u->sink->refresh_muted = false;
2011
2012 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2013
2014 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2015 pa_sink_set_rtpoll(u->sink, u->rtpoll);
2016
2017 #else
2018
2019 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
2020 dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2021
2022 pa_source_new_data_init(&data);
2023 data.driver = __FILE__;
2024 data.module = m;
2025 data.namereg_fail = true;
2026 pa_source_new_data_set_name(&data, dn);
2027 pa_source_new_data_set_sample_spec(&data, &ss);
2028 pa_source_new_data_set_channel_map(&data, &map);
2029 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2030 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2031 if (u->source_name)
2032 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2033
2034 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2035 pa_log("Invalid properties");
2036 pa_source_new_data_done(&data);
2037 goto fail;
2038 }
2039
2040 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2041 pa_source_new_data_done(&data);
2042
2043 if (!u->source) {
2044 pa_log("Failed to create source.");
2045 goto fail;
2046 }
2047
2048 u->source->parent.process_msg = source_process_msg;
2049 u->source->set_state = source_set_state;
2050 u->source->userdata = u;
2051
2052 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2053
2054 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2055 pa_source_set_rtpoll(u->source, u->rtpoll);
2056
2057 u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2058 #endif
2059
2060 pa_xfree(dn);
2061
2062 u->time_event = NULL;
2063
2064 u->maxlength = (uint32_t) -1;
2065 #ifdef TUNNEL_SINK
2066 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2067 #else
2068 u->fragsize = (uint32_t) -1;
2069 #endif
2070
2071 if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2072 pa_log("Failed to create thread.");
2073 goto fail;
2074 }
2075
2076 #ifdef TUNNEL_SINK
2077 pa_sink_put(u->sink);
2078 #else
2079 pa_source_put(u->source);
2080 #endif
2081
2082 pa_modargs_free(ma);
2083
2084 return 0;
2085
2086 fail:
2087 pa__done(m);
2088
2089 if (ma)
2090 pa_modargs_free(ma);
2091
2092 pa_xfree(dn);
2093
2094 return -1;
2095 }
2096
2097 void pa__done(pa_module*m) {
2098 struct userdata* u;
2099
2100 pa_assert(m);
2101
2102 if (!(u = m->userdata))
2103 return;
2104
2105 #ifdef TUNNEL_SINK
2106 if (u->sink)
2107 pa_sink_unlink(u->sink);
2108 #else
2109 if (u->source)
2110 pa_source_unlink(u->source);
2111 #endif
2112
2113 if (u->thread) {
2114 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2115 pa_thread_free(u->thread);
2116 }
2117
2118 pa_thread_mq_done(&u->thread_mq);
2119
2120 #ifdef TUNNEL_SINK
2121 if (u->sink)
2122 pa_sink_unref(u->sink);
2123 #else
2124 if (u->source)
2125 pa_source_unref(u->source);
2126 #endif
2127
2128 if (u->rtpoll)
2129 pa_rtpoll_free(u->rtpoll);
2130
2131 if (u->pstream) {
2132 pa_pstream_unlink(u->pstream);
2133 pa_pstream_unref(u->pstream);
2134 }
2135
2136 if (u->pdispatch)
2137 pa_pdispatch_unref(u->pdispatch);
2138
2139 if (u->client)
2140 pa_socket_client_unref(u->client);
2141
2142 if (u->auth_cookie)
2143 pa_auth_cookie_unref(u->auth_cookie);
2144
2145 if (u->smoother)
2146 pa_smoother_free(u->smoother);
2147
2148 if (u->time_event)
2149 u->core->mainloop->time_free(u->time_event);
2150
2151 #ifndef TUNNEL_SINK
2152 if (u->mcalign)
2153 pa_mcalign_free(u->mcalign);
2154 #endif
2155
2156 #ifdef TUNNEL_SINK
2157 pa_xfree(u->sink_name);
2158 #else
2159 pa_xfree(u->source_name);
2160 #endif
2161 pa_xfree(u->server_name);
2162
2163 pa_xfree(u->device_description);
2164 pa_xfree(u->server_fqdn);
2165 pa_xfree(u->user_name);
2166
2167 pa_xfree(u);
2168 }