]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
virtual-surround: check if resampled memblock is not equal to input
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/time-smoother.h>
50 #include <pulsecore/thread.h>
51 #include <pulsecore/thread-mq.h>
52 #include <pulsecore/core-rtclock.h>
53 #include <pulsecore/core-error.h>
54 #include <pulsecore/proplist-util.h>
55 #include <pulsecore/auth-cookie.h>
56 #include <pulsecore/mcalign.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67 "sink_name=<name for the local sink> "
68 "sink_properties=<properties for the local sink> "
69 "server=<address> "
70 "sink=<remote sink name> "
71 "cookie=<filename> "
72 "format=<sample format> "
73 "channels=<number of channels> "
74 "rate=<sample rate> "
75 "channel_map=<channel map>");
76 #else
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
78 PA_MODULE_USAGE(
79 "source_name=<name for the local source> "
80 "source_properties=<properties for the local source> "
81 "server=<address> "
82 "source=<remote source name> "
83 "cookie=<filename> "
84 "format=<sample format> "
85 "channels=<number of channels> "
86 "rate=<sample rate> "
87 "channel_map=<channel map>");
88 #endif
89
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION);
92 PA_MODULE_LOAD_ONCE(FALSE);
93
94 static const char* const valid_modargs[] = {
95 "server",
96 "cookie",
97 "format",
98 "channels",
99 "rate",
100 #ifdef TUNNEL_SINK
101 "sink_name",
102 "sink_properties",
103 "sink",
104 #else
105 "source_name",
106 "source_properties",
107 "source",
108 #endif
109 "channel_map",
110 NULL,
111 };
112
113 #define DEFAULT_TIMEOUT 5
114
115 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
116
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
118
119 #ifdef TUNNEL_SINK
120
121 enum {
122 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
123 SINK_MESSAGE_REMOTE_SUSPEND,
124 SINK_MESSAGE_UPDATE_LATENCY,
125 SINK_MESSAGE_POST
126 };
127
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
130
131 #else
132
133 enum {
134 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
135 SOURCE_MESSAGE_REMOTE_SUSPEND,
136 SOURCE_MESSAGE_UPDATE_LATENCY
137 };
138
139 #define DEFAULT_FRAGSIZE_MSEC 25
140
141 #endif
142
143 #ifdef TUNNEL_SINK
144 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 #endif
147 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154
155 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
156 #ifdef TUNNEL_SINK
157 [PA_COMMAND_REQUEST] = command_request,
158 [PA_COMMAND_STARTED] = command_started,
159 #endif
160 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
161 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
162 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
163 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
164 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
165 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
166 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
167 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
168 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
169 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
170 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
171 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
172 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
173 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
174 };
175
176 struct userdata {
177 pa_core *core;
178 pa_module *module;
179
180 pa_thread_mq thread_mq;
181 pa_rtpoll *rtpoll;
182 pa_thread *thread;
183
184 pa_socket_client *client;
185 pa_pstream *pstream;
186 pa_pdispatch *pdispatch;
187
188 char *server_name;
189 #ifdef TUNNEL_SINK
190 char *sink_name;
191 pa_sink *sink;
192 size_t requested_bytes;
193 #else
194 char *source_name;
195 pa_source *source;
196 pa_mcalign *mcalign;
197 #endif
198
199 pa_auth_cookie *auth_cookie;
200
201 uint32_t version;
202 uint32_t ctag;
203 uint32_t device_index;
204 uint32_t channel;
205
206 int64_t counter, counter_delta;
207
208 pa_bool_t remote_corked:1;
209 pa_bool_t remote_suspended:1;
210
211 pa_usec_t transport_usec; /* maintained in the main thread */
212 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
213
214 uint32_t ignore_latency_before;
215
216 pa_time_event *time_event;
217
218 pa_smoother *smoother;
219
220 char *device_description;
221 char *server_fqdn;
222 char *user_name;
223
224 uint32_t maxlength;
225 #ifdef TUNNEL_SINK
226 uint32_t tlength;
227 uint32_t minreq;
228 uint32_t prebuf;
229 #else
230 uint32_t fragsize;
231 #endif
232 };
233
234 static void request_latency(struct userdata *u);
235
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
238 pa_log_debug("Got stream or client event.");
239 }
240
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
243 struct userdata *u = userdata;
244
245 pa_assert(pd);
246 pa_assert(t);
247 pa_assert(u);
248 pa_assert(u->pdispatch == pd);
249
250 pa_log_warn("Stream killed");
251 pa_module_unload_request(u->module, TRUE);
252 }
253
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
256 struct userdata *u = userdata;
257
258 pa_assert(pd);
259 pa_assert(t);
260 pa_assert(u);
261 pa_assert(u->pdispatch == pd);
262
263 pa_log_info("Server signalled buffer overrun/underrun.");
264 request_latency(u);
265 }
266
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
269 struct userdata *u = userdata;
270 uint32_t channel;
271 pa_bool_t suspended;
272
273 pa_assert(pd);
274 pa_assert(t);
275 pa_assert(u);
276 pa_assert(u->pdispatch == pd);
277
278 if (pa_tagstruct_getu32(t, &channel) < 0 ||
279 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
280 !pa_tagstruct_eof(t)) {
281
282 pa_log("Invalid packet.");
283 pa_module_unload_request(u->module, TRUE);
284 return;
285 }
286
287 pa_log_debug("Server reports device suspend.");
288
289 #ifdef TUNNEL_SINK
290 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
291 #else
292 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #endif
294
295 request_latency(u);
296 }
297
298 /* Called from main context */
299 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
300 struct userdata *u = userdata;
301 uint32_t channel, di;
302 const char *dn;
303 pa_bool_t suspended;
304
305 pa_assert(pd);
306 pa_assert(t);
307 pa_assert(u);
308 pa_assert(u->pdispatch == pd);
309
310 if (pa_tagstruct_getu32(t, &channel) < 0 ||
311 pa_tagstruct_getu32(t, &di) < 0 ||
312 pa_tagstruct_gets(t, &dn) < 0 ||
313 pa_tagstruct_get_boolean(t, &suspended) < 0) {
314
315 pa_log_error("Invalid packet.");
316 pa_module_unload_request(u->module, TRUE);
317 return;
318 }
319
320 pa_log_debug("Server reports a stream move.");
321
322 #ifdef TUNNEL_SINK
323 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
324 #else
325 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #endif
327
328 request_latency(u);
329 }
330
331 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
332 struct userdata *u = userdata;
333 uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
334 pa_usec_t usec;
335
336 pa_assert(pd);
337 pa_assert(t);
338 pa_assert(u);
339 pa_assert(u->pdispatch == pd);
340
341 if (pa_tagstruct_getu32(t, &channel) < 0 ||
342 pa_tagstruct_getu32(t, &maxlength) < 0) {
343
344 pa_log_error("Invalid packet.");
345 pa_module_unload_request(u->module, TRUE);
346 return;
347 }
348
349 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
350 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
351 pa_tagstruct_get_usec(t, &usec) < 0) {
352
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u->module, TRUE);
355 return;
356 }
357 } else {
358 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
359 pa_tagstruct_getu32(t, &prebuf) < 0 ||
360 pa_tagstruct_getu32(t, &minreq) < 0 ||
361 pa_tagstruct_get_usec(t, &usec) < 0) {
362
363 pa_log_error("Invalid packet.");
364 pa_module_unload_request(u->module, TRUE);
365 return;
366 }
367 }
368
369 #ifdef TUNNEL_SINK
370 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
371 #endif
372
373 request_latency(u);
374 }
375
376 #ifdef TUNNEL_SINK
377
378 /* Called from main context */
379 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
380 struct userdata *u = userdata;
381
382 pa_assert(pd);
383 pa_assert(t);
384 pa_assert(u);
385 pa_assert(u->pdispatch == pd);
386
387 pa_log_debug("Server reports playback started.");
388 request_latency(u);
389 }
390
391 #endif
392
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
395 pa_usec_t x;
396
397 pa_assert(u);
398
399 x = pa_rtclock_now();
400
401 /* Correct by the time the requested issued needs to travel to the
402 * other side. This is a valid thread-safe access, because the
403 * main thread is waiting for us */
404
405 if (past)
406 x -= u->thread_transport_usec;
407 else
408 x += u->thread_transport_usec;
409
410 if (u->remote_suspended || u->remote_corked)
411 pa_smoother_pause(u->smoother, x);
412 else
413 pa_smoother_resume(u->smoother, x, TRUE);
414 }
415
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
418 pa_assert(u);
419
420 if (u->remote_corked == cork)
421 return;
422
423 u->remote_corked = cork;
424 check_smoother_status(u, FALSE);
425 }
426
427 /* Called from main context */
428 static void stream_cork(struct userdata *u, pa_bool_t cork) {
429 pa_tagstruct *t;
430 pa_assert(u);
431
432 if (!u->pstream)
433 return;
434
435 t = pa_tagstruct_new(NULL, 0);
436 #ifdef TUNNEL_SINK
437 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
438 #else
439 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
440 #endif
441 pa_tagstruct_putu32(t, u->ctag++);
442 pa_tagstruct_putu32(t, u->channel);
443 pa_tagstruct_put_boolean(t, !!cork);
444 pa_pstream_send_tagstruct(u->pstream, t);
445
446 request_latency(u);
447 }
448
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
451 pa_assert(u);
452
453 if (u->remote_suspended == suspend)
454 return;
455
456 u->remote_suspended = suspend;
457 check_smoother_status(u, TRUE);
458 }
459
460 #ifdef TUNNEL_SINK
461
462 /* Called from IO thread context */
463 static void send_data(struct userdata *u) {
464 pa_assert(u);
465
466 while (u->requested_bytes > 0) {
467 pa_memchunk memchunk;
468
469 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
470 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
471 pa_memblock_unref(memchunk.memblock);
472
473 u->requested_bytes -= memchunk.length;
474
475 u->counter += (int64_t) memchunk.length;
476 }
477 }
478
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
481 struct userdata *u = PA_SINK(o)->userdata;
482
483 switch (code) {
484
485 case PA_SINK_MESSAGE_SET_STATE: {
486 int r;
487
488 /* First, change the state, because otherwise pa_sink_render() would fail */
489 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
490
491 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
492
493 if (PA_SINK_IS_OPENED(u->sink->state))
494 send_data(u);
495 }
496
497 return r;
498 }
499
500 case PA_SINK_MESSAGE_GET_LATENCY: {
501 pa_usec_t yl, yr, *usec = data;
502
503 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
504 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
505
506 *usec = yl > yr ? yl - yr : 0;
507 return 0;
508 }
509
510 case SINK_MESSAGE_REQUEST:
511
512 pa_assert(offset > 0);
513 u->requested_bytes += (size_t) offset;
514
515 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
516 send_data(u);
517
518 return 0;
519
520
521 case SINK_MESSAGE_REMOTE_SUSPEND:
522
523 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
524 return 0;
525
526
527 case SINK_MESSAGE_UPDATE_LATENCY: {
528 pa_usec_t y;
529
530 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
531
532 if (y > (pa_usec_t) offset)
533 y -= (pa_usec_t) offset;
534 else
535 y = 0;
536
537 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
538
539 /* We can access this freely here, since the main thread is waiting for us */
540 u->thread_transport_usec = u->transport_usec;
541
542 return 0;
543 }
544
545 case SINK_MESSAGE_POST:
546
547 /* OK, This might be a bit confusing. This message is
548 * delivered to us from the main context -- NOT from the
549 * IO thread context where the rest of the messages are
550 * dispatched. Yeah, ugly, but I am a lazy bastard. */
551
552 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
553
554 u->counter_delta += (int64_t) chunk->length;
555
556 return 0;
557 }
558
559 return pa_sink_process_msg(o, code, data, offset, chunk);
560 }
561
562 /* Called from main context */
563 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
564 struct userdata *u;
565 pa_sink_assert_ref(s);
566 u = s->userdata;
567
568 switch ((pa_sink_state_t) state) {
569
570 case PA_SINK_SUSPENDED:
571 pa_assert(PA_SINK_IS_OPENED(s->state));
572 stream_cork(u, TRUE);
573 break;
574
575 case PA_SINK_IDLE:
576 case PA_SINK_RUNNING:
577 if (s->state == PA_SINK_SUSPENDED)
578 stream_cork(u, FALSE);
579 break;
580
581 case PA_SINK_UNLINKED:
582 case PA_SINK_INIT:
583 case PA_SINK_INVALID_STATE:
584 ;
585 }
586
587 return 0;
588 }
589
590 #else
591
592 /* This function is called from IO context -- except when it is not. */
593 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
594 struct userdata *u = PA_SOURCE(o)->userdata;
595
596 switch (code) {
597
598 case PA_SOURCE_MESSAGE_SET_STATE: {
599 int r;
600
601 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
602 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
603
604 return r;
605 }
606
607 case PA_SOURCE_MESSAGE_GET_LATENCY: {
608 pa_usec_t yr, yl, *usec = data;
609
610 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
611 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
612
613 *usec = yr > yl ? yr - yl : 0;
614 return 0;
615 }
616
617 case SOURCE_MESSAGE_POST: {
618 pa_memchunk c;
619
620 pa_mcalign_push(u->mcalign, chunk);
621
622 while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
623
624 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
625 pa_source_post(u->source, &c);
626
627 pa_memblock_unref(c.memblock);
628
629 u->counter += (int64_t) c.length;
630 }
631
632 return 0;
633 }
634
635 case SOURCE_MESSAGE_REMOTE_SUSPEND:
636
637 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
638 return 0;
639
640 case SOURCE_MESSAGE_UPDATE_LATENCY: {
641 pa_usec_t y;
642
643 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
644 y += (pa_usec_t) offset;
645
646 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
647
648 /* We can access this freely here, since the main thread is waiting for us */
649 u->thread_transport_usec = u->transport_usec;
650
651 return 0;
652 }
653 }
654
655 return pa_source_process_msg(o, code, data, offset, chunk);
656 }
657
658 /* Called from main context */
659 static int source_set_state(pa_source *s, pa_source_state_t state) {
660 struct userdata *u;
661 pa_source_assert_ref(s);
662 u = s->userdata;
663
664 switch ((pa_source_state_t) state) {
665
666 case PA_SOURCE_SUSPENDED:
667 pa_assert(PA_SOURCE_IS_OPENED(s->state));
668 stream_cork(u, TRUE);
669 break;
670
671 case PA_SOURCE_IDLE:
672 case PA_SOURCE_RUNNING:
673 if (s->state == PA_SOURCE_SUSPENDED)
674 stream_cork(u, FALSE);
675 break;
676
677 case PA_SOURCE_UNLINKED:
678 case PA_SOURCE_INIT:
679 case PA_SINK_INVALID_STATE:
680 ;
681 }
682
683 return 0;
684 }
685
686 #endif
687
688 static void thread_func(void *userdata) {
689 struct userdata *u = userdata;
690
691 pa_assert(u);
692
693 pa_log_debug("Thread starting up");
694
695 pa_thread_mq_install(&u->thread_mq);
696
697 for (;;) {
698 int ret;
699
700 #ifdef TUNNEL_SINK
701 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
702 pa_sink_process_rewind(u->sink, 0);
703 #endif
704
705 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
706 goto fail;
707
708 if (ret == 0)
709 goto finish;
710 }
711
712 fail:
713 /* If this was no regular exit from the loop we have to continue
714 * processing messages until we received PA_MESSAGE_SHUTDOWN */
715 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
716 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
717
718 finish:
719 pa_log_debug("Thread shutting down");
720 }
721
722 #ifdef TUNNEL_SINK
723 /* Called from main context */
724 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
725 struct userdata *u = userdata;
726 uint32_t bytes, channel;
727
728 pa_assert(pd);
729 pa_assert(command == PA_COMMAND_REQUEST);
730 pa_assert(t);
731 pa_assert(u);
732 pa_assert(u->pdispatch == pd);
733
734 if (pa_tagstruct_getu32(t, &channel) < 0 ||
735 pa_tagstruct_getu32(t, &bytes) < 0) {
736 pa_log("Invalid protocol reply");
737 goto fail;
738 }
739
740 if (channel != u->channel) {
741 pa_log("Received data for invalid channel");
742 goto fail;
743 }
744
745 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
746 return;
747
748 fail:
749 pa_module_unload_request(u->module, TRUE);
750 }
751
752 #endif
753
754 /* Called from main context */
755 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
756 struct userdata *u = userdata;
757 pa_usec_t sink_usec, source_usec;
758 pa_bool_t playing;
759 int64_t write_index, read_index;
760 struct timeval local, remote, now;
761 pa_sample_spec *ss;
762 int64_t delay;
763
764 pa_assert(pd);
765 pa_assert(u);
766
767 if (command != PA_COMMAND_REPLY) {
768 if (command == PA_COMMAND_ERROR)
769 pa_log("Failed to get latency.");
770 else
771 pa_log("Protocol error.");
772 goto fail;
773 }
774
775 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
776 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
777 pa_tagstruct_get_boolean(t, &playing) < 0 ||
778 pa_tagstruct_get_timeval(t, &local) < 0 ||
779 pa_tagstruct_get_timeval(t, &remote) < 0 ||
780 pa_tagstruct_gets64(t, &write_index) < 0 ||
781 pa_tagstruct_gets64(t, &read_index) < 0) {
782 pa_log("Invalid reply.");
783 goto fail;
784 }
785
786 #ifdef TUNNEL_SINK
787 if (u->version >= 13) {
788 uint64_t underrun_for = 0, playing_for = 0;
789
790 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
791 pa_tagstruct_getu64(t, &playing_for) < 0) {
792 pa_log("Invalid reply.");
793 goto fail;
794 }
795 }
796 #endif
797
798 if (!pa_tagstruct_eof(t)) {
799 pa_log("Invalid reply.");
800 goto fail;
801 }
802
803 if (tag < u->ignore_latency_before) {
804 return;
805 }
806
807 pa_gettimeofday(&now);
808
809 /* Calculate transport usec */
810 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
811 /* local and remote seem to have synchronized clocks */
812 #ifdef TUNNEL_SINK
813 u->transport_usec = pa_timeval_diff(&remote, &local);
814 #else
815 u->transport_usec = pa_timeval_diff(&now, &remote);
816 #endif
817 } else
818 u->transport_usec = pa_timeval_diff(&now, &local)/2;
819
820 /* First, take the device's delay */
821 #ifdef TUNNEL_SINK
822 delay = (int64_t) sink_usec;
823 ss = &u->sink->sample_spec;
824 #else
825 delay = (int64_t) source_usec;
826 ss = &u->source->sample_spec;
827 #endif
828
829 /* Add the length of our server-side buffer */
830 if (write_index >= read_index)
831 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
832 else
833 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
834
835 /* Our measurements are already out of date, hence correct by the *
836 * transport latency */
837 #ifdef TUNNEL_SINK
838 delay -= (int64_t) u->transport_usec;
839 #else
840 delay += (int64_t) u->transport_usec;
841 #endif
842
843 /* Now correct by what we have have read/written since we requested the update */
844 #ifdef TUNNEL_SINK
845 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
846 #else
847 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
848 #endif
849
850 #ifdef TUNNEL_SINK
851 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
852 #else
853 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
854 #endif
855
856 return;
857
858 fail:
859
860 pa_module_unload_request(u->module, TRUE);
861 }
862
863 /* Called from main context */
864 static void request_latency(struct userdata *u) {
865 pa_tagstruct *t;
866 struct timeval now;
867 uint32_t tag;
868 pa_assert(u);
869
870 t = pa_tagstruct_new(NULL, 0);
871 #ifdef TUNNEL_SINK
872 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
873 #else
874 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
875 #endif
876 pa_tagstruct_putu32(t, tag = u->ctag++);
877 pa_tagstruct_putu32(t, u->channel);
878
879 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
880
881 pa_pstream_send_tagstruct(u->pstream, t);
882 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
883
884 u->ignore_latency_before = tag;
885 u->counter_delta = 0;
886 }
887
888 /* Called from main context */
889 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
890 struct userdata *u = userdata;
891
892 pa_assert(m);
893 pa_assert(e);
894 pa_assert(u);
895
896 request_latency(u);
897
898 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
899 }
900
901 /* Called from main context */
902 static void update_description(struct userdata *u) {
903 char *d;
904 char un[128], hn[128];
905 pa_tagstruct *t;
906
907 pa_assert(u);
908
909 if (!u->server_fqdn || !u->user_name || !u->device_description)
910 return;
911
912 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
913
914 #ifdef TUNNEL_SINK
915 pa_sink_set_description(u->sink, d);
916 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
917 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
918 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
919 #else
920 pa_source_set_description(u->source, d);
921 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
922 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
923 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
924 #endif
925
926 pa_xfree(d);
927
928 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
929 pa_get_user_name(un, sizeof(un)),
930 pa_get_host_name(hn, sizeof(hn)));
931
932 t = pa_tagstruct_new(NULL, 0);
933 #ifdef TUNNEL_SINK
934 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
935 #else
936 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
937 #endif
938 pa_tagstruct_putu32(t, u->ctag++);
939 pa_tagstruct_putu32(t, u->channel);
940 pa_tagstruct_puts(t, d);
941 pa_pstream_send_tagstruct(u->pstream, t);
942
943 pa_xfree(d);
944 }
945
946 /* Called from main context */
947 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
948 struct userdata *u = userdata;
949 pa_sample_spec ss;
950 pa_channel_map cm;
951 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
952 uint32_t cookie;
953
954 pa_assert(pd);
955 pa_assert(u);
956
957 if (command != PA_COMMAND_REPLY) {
958 if (command == PA_COMMAND_ERROR)
959 pa_log("Failed to get info.");
960 else
961 pa_log("Protocol error.");
962 goto fail;
963 }
964
965 if (pa_tagstruct_gets(t, &server_name) < 0 ||
966 pa_tagstruct_gets(t, &server_version) < 0 ||
967 pa_tagstruct_gets(t, &user_name) < 0 ||
968 pa_tagstruct_gets(t, &host_name) < 0 ||
969 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
970 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
971 pa_tagstruct_gets(t, &default_source_name) < 0 ||
972 pa_tagstruct_getu32(t, &cookie) < 0 ||
973 (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
974
975 pa_log("Parse failure");
976 goto fail;
977 }
978
979 if (!pa_tagstruct_eof(t)) {
980 pa_log("Packet too long");
981 goto fail;
982 }
983
984 pa_xfree(u->server_fqdn);
985 u->server_fqdn = pa_xstrdup(host_name);
986
987 pa_xfree(u->user_name);
988 u->user_name = pa_xstrdup(user_name);
989
990 update_description(u);
991
992 return;
993
994 fail:
995 pa_module_unload_request(u->module, TRUE);
996 }
997
998 static int read_ports(struct userdata *u, pa_tagstruct *t)
999 {
1000 if (u->version >= 16) {
1001 uint32_t n_ports;
1002 const char *s;
1003
1004 if (pa_tagstruct_getu32(t, &n_ports)) {
1005 pa_log("Parse failure");
1006 return -PA_ERR_PROTOCOL;
1007 }
1008
1009 for (uint32_t j = 0; j < n_ports; j++) {
1010 uint32_t priority;
1011
1012 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1013 pa_tagstruct_gets(t, &s) < 0 || /* description */
1014 pa_tagstruct_getu32(t, &priority) < 0) {
1015
1016 pa_log("Parse failure");
1017 return -PA_ERR_PROTOCOL;
1018 }
1019 if (u->version >= 24 && pa_tagstruct_getu32(t, &priority) < 0) { /* available */
1020 pa_log("Parse failure");
1021 return -PA_ERR_PROTOCOL;
1022 }
1023 }
1024
1025 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1026 pa_log("Parse failure");
1027 return -PA_ERR_PROTOCOL;
1028 }
1029 }
1030 return 0;
1031 }
1032
1033
1034 static int read_formats(struct userdata *u, pa_tagstruct *t) {
1035 uint8_t n_formats;
1036 pa_format_info *format;
1037
1038 if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1039 pa_log("Parse failure");
1040 return -PA_ERR_PROTOCOL;
1041 }
1042
1043 for (uint8_t j = 0; j < n_formats; j++) {
1044 format = pa_format_info_new();
1045 if (pa_tagstruct_get_format_info(t, format)) { /* format info */
1046 pa_format_info_free(format);
1047 pa_log("Parse failure");
1048 return -PA_ERR_PROTOCOL;
1049 }
1050 pa_format_info_free(format);
1051 }
1052 return 0;
1053 }
1054
1055 #ifdef TUNNEL_SINK
1056
1057 /* Called from main context */
1058 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1059 struct userdata *u = userdata;
1060 uint32_t idx, owner_module, monitor_source, flags;
1061 const char *name, *description, *monitor_source_name, *driver;
1062 pa_sample_spec ss;
1063 pa_channel_map cm;
1064 pa_cvolume volume;
1065 pa_bool_t mute;
1066 pa_usec_t latency;
1067
1068 pa_assert(pd);
1069 pa_assert(u);
1070
1071 if (command != PA_COMMAND_REPLY) {
1072 if (command == PA_COMMAND_ERROR)
1073 pa_log("Failed to get info.");
1074 else
1075 pa_log("Protocol error.");
1076 goto fail;
1077 }
1078
1079 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1080 pa_tagstruct_gets(t, &name) < 0 ||
1081 pa_tagstruct_gets(t, &description) < 0 ||
1082 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1083 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1084 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1085 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1086 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1087 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1088 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1089 pa_tagstruct_get_usec(t, &latency) < 0 ||
1090 pa_tagstruct_gets(t, &driver) < 0 ||
1091 pa_tagstruct_getu32(t, &flags) < 0) {
1092
1093 pa_log("Parse failure");
1094 goto fail;
1095 }
1096
1097 if (u->version >= 13) {
1098 pa_usec_t configured_latency;
1099
1100 if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1101 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1102
1103 pa_log("Parse failure");
1104 goto fail;
1105 }
1106 }
1107
1108 if (u->version >= 15) {
1109 pa_volume_t base_volume;
1110 uint32_t state, n_volume_steps, card;
1111
1112 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1113 pa_tagstruct_getu32(t, &state) < 0 ||
1114 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1115 pa_tagstruct_getu32(t, &card) < 0) {
1116
1117 pa_log("Parse failure");
1118 goto fail;
1119 }
1120 }
1121
1122 if (read_ports(u, t) < 0)
1123 goto fail;
1124
1125 if (u->version >= 21 && read_formats(u, t) < 0)
1126 goto fail;
1127
1128 if (!pa_tagstruct_eof(t)) {
1129 pa_log("Packet too long");
1130 goto fail;
1131 }
1132
1133 if (!u->sink_name || !pa_streq(name, u->sink_name))
1134 return;
1135
1136 pa_xfree(u->device_description);
1137 u->device_description = pa_xstrdup(description);
1138
1139 update_description(u);
1140
1141 return;
1142
1143 fail:
1144 pa_module_unload_request(u->module, TRUE);
1145 }
1146
1147 /* Called from main context */
1148 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1149 struct userdata *u = userdata;
1150 uint32_t idx, owner_module, client, sink;
1151 pa_usec_t buffer_usec, sink_usec;
1152 const char *name, *driver, *resample_method;
1153 pa_bool_t mute = FALSE;
1154 pa_sample_spec sample_spec;
1155 pa_channel_map channel_map;
1156 pa_cvolume volume;
1157 pa_bool_t b;
1158
1159 pa_assert(pd);
1160 pa_assert(u);
1161
1162 if (command != PA_COMMAND_REPLY) {
1163 if (command == PA_COMMAND_ERROR)
1164 pa_log("Failed to get info.");
1165 else
1166 pa_log("Protocol error.");
1167 goto fail;
1168 }
1169
1170 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1171 pa_tagstruct_gets(t, &name) < 0 ||
1172 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1173 pa_tagstruct_getu32(t, &client) < 0 ||
1174 pa_tagstruct_getu32(t, &sink) < 0 ||
1175 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1176 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1177 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1178 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1179 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1180 pa_tagstruct_gets(t, &resample_method) < 0 ||
1181 pa_tagstruct_gets(t, &driver) < 0) {
1182
1183 pa_log("Parse failure");
1184 goto fail;
1185 }
1186
1187 if (u->version >= 11) {
1188 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1189
1190 pa_log("Parse failure");
1191 goto fail;
1192 }
1193 }
1194
1195 if (u->version >= 13) {
1196 if (pa_tagstruct_get_proplist(t, NULL) < 0) {
1197
1198 pa_log("Parse failure");
1199 goto fail;
1200 }
1201 }
1202
1203 if (u->version >= 19) {
1204 if (pa_tagstruct_get_boolean(t, &b) < 0) {
1205
1206 pa_log("Parse failure");
1207 goto fail;
1208 }
1209 }
1210
1211 if (u->version >= 20) {
1212 if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1213 pa_tagstruct_get_boolean(t, &b) < 0) {
1214
1215 pa_log("Parse failure");
1216 goto fail;
1217 }
1218 }
1219
1220 if (u->version >= 21) {
1221 pa_format_info *format = pa_format_info_new();
1222
1223 if (pa_tagstruct_get_format_info(t, format) < 0) {
1224 pa_format_info_free(format);
1225 pa_log("Parse failure");
1226 goto fail;
1227 }
1228 pa_format_info_free(format);
1229 }
1230
1231 if (!pa_tagstruct_eof(t)) {
1232 pa_log("Packet too long");
1233 goto fail;
1234 }
1235
1236 if (idx != u->device_index)
1237 return;
1238
1239 pa_assert(u->sink);
1240
1241 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1242 pa_cvolume_equal(&volume, &u->sink->real_volume))
1243 return;
1244
1245 pa_sink_volume_changed(u->sink, &volume);
1246
1247 if (u->version >= 11)
1248 pa_sink_mute_changed(u->sink, mute);
1249
1250 return;
1251
1252 fail:
1253 pa_module_unload_request(u->module, TRUE);
1254 }
1255
1256 #else
1257
1258 /* Called from main context */
1259 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1260 struct userdata *u = userdata;
1261 uint32_t idx, owner_module, monitor_of_sink, flags;
1262 const char *name, *description, *monitor_of_sink_name, *driver;
1263 pa_sample_spec ss;
1264 pa_channel_map cm;
1265 pa_cvolume volume;
1266 pa_bool_t mute;
1267 pa_usec_t latency, configured_latency;
1268
1269 pa_assert(pd);
1270 pa_assert(u);
1271
1272 if (command != PA_COMMAND_REPLY) {
1273 if (command == PA_COMMAND_ERROR)
1274 pa_log("Failed to get info.");
1275 else
1276 pa_log("Protocol error.");
1277 goto fail;
1278 }
1279
1280 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1281 pa_tagstruct_gets(t, &name) < 0 ||
1282 pa_tagstruct_gets(t, &description) < 0 ||
1283 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1284 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1285 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1286 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1287 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1288 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1289 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1290 pa_tagstruct_get_usec(t, &latency) < 0 ||
1291 pa_tagstruct_gets(t, &driver) < 0 ||
1292 pa_tagstruct_getu32(t, &flags) < 0) {
1293
1294 pa_log("Parse failure");
1295 goto fail;
1296 }
1297
1298 if (u->version >= 13) {
1299 if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1300 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1301
1302 pa_log("Parse failure");
1303 goto fail;
1304 }
1305 }
1306
1307 if (u->version >= 15) {
1308 pa_volume_t base_volume;
1309 uint32_t state, n_volume_steps, card;
1310
1311 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1312 pa_tagstruct_getu32(t, &state) < 0 ||
1313 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1314 pa_tagstruct_getu32(t, &card) < 0) {
1315
1316 pa_log("Parse failure");
1317 goto fail;
1318 }
1319 }
1320
1321 if (read_ports(u, t) < 0)
1322 goto fail;
1323
1324 if (u->version >= 22 && read_formats(u, t) < 0)
1325 goto fail;
1326
1327 if (!pa_tagstruct_eof(t)) {
1328 pa_log("Packet too long");
1329 goto fail;
1330 }
1331
1332 if (!u->source_name || !pa_streq(name, u->source_name))
1333 return;
1334
1335 pa_xfree(u->device_description);
1336 u->device_description = pa_xstrdup(description);
1337
1338 update_description(u);
1339
1340 return;
1341
1342 fail:
1343 pa_module_unload_request(u->module, TRUE);
1344 }
1345
1346 #endif
1347
1348 /* Called from main context */
1349 static void request_info(struct userdata *u) {
1350 pa_tagstruct *t;
1351 uint32_t tag;
1352 pa_assert(u);
1353
1354 t = pa_tagstruct_new(NULL, 0);
1355 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1356 pa_tagstruct_putu32(t, tag = u->ctag++);
1357 pa_pstream_send_tagstruct(u->pstream, t);
1358 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1359
1360 #ifdef TUNNEL_SINK
1361 t = pa_tagstruct_new(NULL, 0);
1362 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1363 pa_tagstruct_putu32(t, tag = u->ctag++);
1364 pa_tagstruct_putu32(t, u->device_index);
1365 pa_pstream_send_tagstruct(u->pstream, t);
1366 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1367
1368 if (u->sink_name) {
1369 t = pa_tagstruct_new(NULL, 0);
1370 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1371 pa_tagstruct_putu32(t, tag = u->ctag++);
1372 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1373 pa_tagstruct_puts(t, u->sink_name);
1374 pa_pstream_send_tagstruct(u->pstream, t);
1375 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1376 }
1377 #else
1378 if (u->source_name) {
1379 t = pa_tagstruct_new(NULL, 0);
1380 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1381 pa_tagstruct_putu32(t, tag = u->ctag++);
1382 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1383 pa_tagstruct_puts(t, u->source_name);
1384 pa_pstream_send_tagstruct(u->pstream, t);
1385 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1386 }
1387 #endif
1388 }
1389
1390 /* Called from main context */
1391 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1392 struct userdata *u = userdata;
1393 pa_subscription_event_type_t e;
1394 uint32_t idx;
1395
1396 pa_assert(pd);
1397 pa_assert(t);
1398 pa_assert(u);
1399 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1400
1401 if (pa_tagstruct_getu32(t, &e) < 0 ||
1402 pa_tagstruct_getu32(t, &idx) < 0) {
1403 pa_log("Invalid protocol reply");
1404 pa_module_unload_request(u->module, TRUE);
1405 return;
1406 }
1407
1408 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1409 #ifdef TUNNEL_SINK
1410 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1411 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1412 #else
1413 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1414 #endif
1415 )
1416 return;
1417
1418 request_info(u);
1419 }
1420
1421 /* Called from main context */
1422 static void start_subscribe(struct userdata *u) {
1423 pa_tagstruct *t;
1424 pa_assert(u);
1425
1426 t = pa_tagstruct_new(NULL, 0);
1427 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1428 pa_tagstruct_putu32(t, u->ctag++);
1429 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1430 #ifdef TUNNEL_SINK
1431 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1432 #else
1433 PA_SUBSCRIPTION_MASK_SOURCE
1434 #endif
1435 );
1436
1437 pa_pstream_send_tagstruct(u->pstream, t);
1438 }
1439
1440 /* Called from main context */
1441 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1442 struct userdata *u = userdata;
1443 #ifdef TUNNEL_SINK
1444 uint32_t bytes;
1445 #endif
1446
1447 pa_assert(pd);
1448 pa_assert(u);
1449 pa_assert(u->pdispatch == pd);
1450
1451 if (command != PA_COMMAND_REPLY) {
1452 if (command == PA_COMMAND_ERROR)
1453 pa_log("Failed to create stream.");
1454 else
1455 pa_log("Protocol error.");
1456 goto fail;
1457 }
1458
1459 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1460 pa_tagstruct_getu32(t, &u->device_index) < 0
1461 #ifdef TUNNEL_SINK
1462 || pa_tagstruct_getu32(t, &bytes) < 0
1463 #endif
1464 )
1465 goto parse_error;
1466
1467 if (u->version >= 9) {
1468 #ifdef TUNNEL_SINK
1469 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1470 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1471 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1472 pa_tagstruct_getu32(t, &u->minreq) < 0)
1473 goto parse_error;
1474 #else
1475 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1476 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1477 goto parse_error;
1478 #endif
1479 }
1480
1481 if (u->version >= 12) {
1482 pa_sample_spec ss;
1483 pa_channel_map cm;
1484 uint32_t device_index;
1485 const char *dn;
1486 pa_bool_t suspended;
1487
1488 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1489 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1490 pa_tagstruct_getu32(t, &device_index) < 0 ||
1491 pa_tagstruct_gets(t, &dn) < 0 ||
1492 pa_tagstruct_get_boolean(t, &suspended) < 0)
1493 goto parse_error;
1494
1495 #ifdef TUNNEL_SINK
1496 pa_xfree(u->sink_name);
1497 u->sink_name = pa_xstrdup(dn);
1498 #else
1499 pa_xfree(u->source_name);
1500 u->source_name = pa_xstrdup(dn);
1501 #endif
1502 }
1503
1504 if (u->version >= 13) {
1505 pa_usec_t usec;
1506
1507 if (pa_tagstruct_get_usec(t, &usec) < 0)
1508 goto parse_error;
1509
1510 /* #ifdef TUNNEL_SINK */
1511 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1512 /* #else */
1513 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1514 /* #endif */
1515 }
1516
1517 if (u->version >= 21) {
1518 pa_format_info *format = pa_format_info_new();
1519
1520 if (pa_tagstruct_get_format_info(t, format) < 0) {
1521 pa_format_info_free(format);
1522 goto parse_error;
1523 }
1524
1525 pa_format_info_free(format);
1526 }
1527
1528 if (!pa_tagstruct_eof(t))
1529 goto parse_error;
1530
1531 start_subscribe(u);
1532 request_info(u);
1533
1534 pa_assert(!u->time_event);
1535 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1536
1537 request_latency(u);
1538
1539 pa_log_debug("Stream created.");
1540
1541 #ifdef TUNNEL_SINK
1542 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1543 #endif
1544
1545 return;
1546
1547 parse_error:
1548 pa_log("Invalid reply. (Create stream)");
1549
1550 fail:
1551 pa_module_unload_request(u->module, TRUE);
1552
1553 }
1554
1555 /* Called from main context */
1556 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1557 struct userdata *u = userdata;
1558 pa_tagstruct *reply;
1559 char name[256], un[128], hn[128];
1560 pa_cvolume volume;
1561
1562 pa_assert(pd);
1563 pa_assert(u);
1564 pa_assert(u->pdispatch == pd);
1565
1566 if (command != PA_COMMAND_REPLY ||
1567 pa_tagstruct_getu32(t, &u->version) < 0 ||
1568 !pa_tagstruct_eof(t)) {
1569
1570 if (command == PA_COMMAND_ERROR)
1571 pa_log("Failed to authenticate");
1572 else
1573 pa_log("Protocol error.");
1574
1575 goto fail;
1576 }
1577
1578 /* Minimum supported protocol version */
1579 if (u->version < 8) {
1580 pa_log("Incompatible protocol version");
1581 goto fail;
1582 }
1583
1584 /* Starting with protocol version 13 the MSB of the version tag
1585 reflects if shm is enabled for this connection or not. We don't
1586 support SHM here at all, so we just ignore this. */
1587
1588 if (u->version >= 13)
1589 u->version &= 0x7FFFFFFFU;
1590
1591 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1592
1593 #ifdef TUNNEL_SINK
1594 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1595 pa_sink_update_proplist(u->sink, 0, NULL);
1596
1597 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1598 u->sink_name,
1599 pa_get_user_name(un, sizeof(un)),
1600 pa_get_host_name(hn, sizeof(hn)));
1601 #else
1602 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1603 pa_source_update_proplist(u->source, 0, NULL);
1604
1605 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1606 u->source_name,
1607 pa_get_user_name(un, sizeof(un)),
1608 pa_get_host_name(hn, sizeof(hn)));
1609 #endif
1610
1611 reply = pa_tagstruct_new(NULL, 0);
1612 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1613 pa_tagstruct_putu32(reply, u->ctag++);
1614
1615 if (u->version >= 13) {
1616 pa_proplist *pl;
1617 pl = pa_proplist_new();
1618 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1619 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1620 pa_init_proplist(pl);
1621 pa_tagstruct_put_proplist(reply, pl);
1622 pa_proplist_free(pl);
1623 } else
1624 pa_tagstruct_puts(reply, "PulseAudio");
1625
1626 pa_pstream_send_tagstruct(u->pstream, reply);
1627 /* We ignore the server's reply here */
1628
1629 reply = pa_tagstruct_new(NULL, 0);
1630
1631 if (u->version < 13)
1632 /* Only for older PA versions we need to fill in the maxlength */
1633 u->maxlength = 4*1024*1024;
1634
1635 #ifdef TUNNEL_SINK
1636 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1637 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1638 u->prebuf = u->tlength;
1639 #else
1640 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1641 #endif
1642
1643 #ifdef TUNNEL_SINK
1644 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1645 pa_tagstruct_putu32(reply, tag = u->ctag++);
1646
1647 if (u->version < 13)
1648 pa_tagstruct_puts(reply, name);
1649
1650 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1651 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1652 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1653 pa_tagstruct_puts(reply, u->sink_name);
1654 pa_tagstruct_putu32(reply, u->maxlength);
1655 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1656 pa_tagstruct_putu32(reply, u->tlength);
1657 pa_tagstruct_putu32(reply, u->prebuf);
1658 pa_tagstruct_putu32(reply, u->minreq);
1659 pa_tagstruct_putu32(reply, 0);
1660 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1661 pa_tagstruct_put_cvolume(reply, &volume);
1662 #else
1663 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1664 pa_tagstruct_putu32(reply, tag = u->ctag++);
1665
1666 if (u->version < 13)
1667 pa_tagstruct_puts(reply, name);
1668
1669 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1670 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1671 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1672 pa_tagstruct_puts(reply, u->source_name);
1673 pa_tagstruct_putu32(reply, u->maxlength);
1674 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1675 pa_tagstruct_putu32(reply, u->fragsize);
1676 #endif
1677
1678 if (u->version >= 12) {
1679 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1680 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1681 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1682 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1683 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1684 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1685 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1686 }
1687
1688 if (u->version >= 13) {
1689 pa_proplist *pl;
1690
1691 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1692 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1693
1694 pl = pa_proplist_new();
1695 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1696 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1697 pa_tagstruct_put_proplist(reply, pl);
1698 pa_proplist_free(pl);
1699
1700 #ifndef TUNNEL_SINK
1701 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1702 #endif
1703 }
1704
1705 if (u->version >= 14) {
1706 #ifdef TUNNEL_SINK
1707 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1708 #endif
1709 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1710 }
1711
1712 if (u->version >= 15) {
1713 #ifdef TUNNEL_SINK
1714 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1715 #endif
1716 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1717 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1718 }
1719
1720 #ifdef TUNNEL_SINK
1721 if (u->version >= 17)
1722 pa_tagstruct_put_boolean(reply, FALSE); /* relative volume */
1723
1724 if (u->version >= 18)
1725 pa_tagstruct_put_boolean(reply, FALSE); /* passthrough stream */
1726 #endif
1727
1728 #ifdef TUNNEL_SINK
1729 if (u->version >= 21) {
1730 /* We're not using the extended API, so n_formats = 0 and that's that */
1731 pa_tagstruct_putu8(reply, 0);
1732 }
1733 #else
1734 if (u->version >= 22) {
1735 /* We're not using the extended API, so n_formats = 0 and that's that */
1736 pa_tagstruct_putu8(reply, 0);
1737 pa_cvolume_reset(&volume, u->source->sample_spec.channels);
1738 pa_tagstruct_put_cvolume(reply, &volume);
1739 pa_tagstruct_put_boolean(reply, FALSE); /* muted */
1740 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1741 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1742 pa_tagstruct_put_boolean(reply, FALSE); /* relative volume */
1743 pa_tagstruct_put_boolean(reply, FALSE); /* passthrough stream */
1744 }
1745 #endif
1746
1747 pa_pstream_send_tagstruct(u->pstream, reply);
1748 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1749
1750 pa_log_debug("Connection authenticated, creating stream ...");
1751
1752 return;
1753
1754 fail:
1755 pa_module_unload_request(u->module, TRUE);
1756 }
1757
1758 /* Called from main context */
1759 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1760 struct userdata *u = userdata;
1761
1762 pa_assert(p);
1763 pa_assert(u);
1764
1765 pa_log_warn("Stream died.");
1766 pa_module_unload_request(u->module, TRUE);
1767 }
1768
1769 /* Called from main context */
1770 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1771 struct userdata *u = userdata;
1772
1773 pa_assert(p);
1774 pa_assert(packet);
1775 pa_assert(u);
1776
1777 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1778 pa_log("Invalid packet");
1779 pa_module_unload_request(u->module, TRUE);
1780 return;
1781 }
1782 }
1783
1784 #ifndef TUNNEL_SINK
1785 /* Called from main context */
1786 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1787 struct userdata *u = userdata;
1788
1789 pa_assert(p);
1790 pa_assert(chunk);
1791 pa_assert(u);
1792
1793 if (channel != u->channel) {
1794 pa_log("Received memory block on bad channel.");
1795 pa_module_unload_request(u->module, TRUE);
1796 return;
1797 }
1798
1799 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1800
1801 u->counter_delta += (int64_t) chunk->length;
1802 }
1803 #endif
1804
1805 /* Called from main context */
1806 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1807 struct userdata *u = userdata;
1808 pa_tagstruct *t;
1809 uint32_t tag;
1810
1811 pa_assert(sc);
1812 pa_assert(u);
1813 pa_assert(u->client == sc);
1814
1815 pa_socket_client_unref(u->client);
1816 u->client = NULL;
1817
1818 if (!io) {
1819 pa_log("Connection failed: %s", pa_cstrerror(errno));
1820 pa_module_unload_request(u->module, TRUE);
1821 return;
1822 }
1823
1824 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1825 u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
1826
1827 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1828 pa_pstream_set_receive_packet_callback(u->pstream, pstream_packet_callback, u);
1829 #ifndef TUNNEL_SINK
1830 pa_pstream_set_receive_memblock_callback(u->pstream, pstream_memblock_callback, u);
1831 #endif
1832
1833 t = pa_tagstruct_new(NULL, 0);
1834 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1835 pa_tagstruct_putu32(t, tag = u->ctag++);
1836 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1837
1838 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1839
1840 #ifdef HAVE_CREDS
1841 {
1842 pa_creds ucred;
1843
1844 if (pa_iochannel_creds_supported(io))
1845 pa_iochannel_creds_enable(io);
1846
1847 ucred.uid = getuid();
1848 ucred.gid = getgid();
1849
1850 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1851 }
1852 #else
1853 pa_pstream_send_tagstruct(u->pstream, t);
1854 #endif
1855
1856 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1857
1858 pa_log_debug("Connection established, authenticating ...");
1859 }
1860
1861 #ifdef TUNNEL_SINK
1862
1863 /* Called from main context */
1864 static void sink_set_volume(pa_sink *sink) {
1865 struct userdata *u;
1866 pa_tagstruct *t;
1867
1868 pa_assert(sink);
1869 u = sink->userdata;
1870 pa_assert(u);
1871
1872 t = pa_tagstruct_new(NULL, 0);
1873 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1874 pa_tagstruct_putu32(t, u->ctag++);
1875 pa_tagstruct_putu32(t, u->device_index);
1876 pa_tagstruct_put_cvolume(t, &sink->real_volume);
1877 pa_pstream_send_tagstruct(u->pstream, t);
1878 }
1879
1880 /* Called from main context */
1881 static void sink_set_mute(pa_sink *sink) {
1882 struct userdata *u;
1883 pa_tagstruct *t;
1884
1885 pa_assert(sink);
1886 u = sink->userdata;
1887 pa_assert(u);
1888
1889 if (u->version < 11)
1890 return;
1891
1892 t = pa_tagstruct_new(NULL, 0);
1893 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1894 pa_tagstruct_putu32(t, u->ctag++);
1895 pa_tagstruct_putu32(t, u->device_index);
1896 pa_tagstruct_put_boolean(t, !!sink->muted);
1897 pa_pstream_send_tagstruct(u->pstream, t);
1898 }
1899
1900 #endif
1901
1902 int pa__init(pa_module*m) {
1903 pa_modargs *ma = NULL;
1904 struct userdata *u = NULL;
1905 pa_sample_spec ss;
1906 pa_channel_map map;
1907 char *dn = NULL;
1908 #ifdef TUNNEL_SINK
1909 pa_sink_new_data data;
1910 #else
1911 pa_source_new_data data;
1912 #endif
1913
1914 pa_assert(m);
1915
1916 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1917 pa_log("Failed to parse module arguments");
1918 goto fail;
1919 }
1920
1921 m->userdata = u = pa_xnew0(struct userdata, 1);
1922 u->core = m->core;
1923 u->module = m;
1924 u->client = NULL;
1925 u->pdispatch = NULL;
1926 u->pstream = NULL;
1927 u->server_name = NULL;
1928 #ifdef TUNNEL_SINK
1929 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1930 u->sink = NULL;
1931 u->requested_bytes = 0;
1932 #else
1933 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1934 u->source = NULL;
1935 #endif
1936 u->smoother = pa_smoother_new(
1937 PA_USEC_PER_SEC,
1938 PA_USEC_PER_SEC*2,
1939 TRUE,
1940 TRUE,
1941 10,
1942 pa_rtclock_now(),
1943 FALSE);
1944 u->ctag = 1;
1945 u->device_index = u->channel = PA_INVALID_INDEX;
1946 u->time_event = NULL;
1947 u->ignore_latency_before = 0;
1948 u->transport_usec = u->thread_transport_usec = 0;
1949 u->remote_suspended = u->remote_corked = FALSE;
1950 u->counter = u->counter_delta = 0;
1951
1952 u->rtpoll = pa_rtpoll_new();
1953 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1954
1955 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), TRUE, PA_NATIVE_COOKIE_LENGTH)))
1956 goto fail;
1957
1958 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1959 pa_log("No server specified.");
1960 goto fail;
1961 }
1962
1963 ss = m->core->default_sample_spec;
1964 map = m->core->default_channel_map;
1965 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1966 pa_log("Invalid sample format specification");
1967 goto fail;
1968 }
1969
1970 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1971 pa_log("Failed to connect to server '%s'", u->server_name);
1972 goto fail;
1973 }
1974
1975 pa_socket_client_set_callback(u->client, on_connection, u);
1976
1977 #ifdef TUNNEL_SINK
1978
1979 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1980 dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
1981
1982 pa_sink_new_data_init(&data);
1983 data.driver = __FILE__;
1984 data.module = m;
1985 data.namereg_fail = TRUE;
1986 pa_sink_new_data_set_name(&data, dn);
1987 pa_sink_new_data_set_sample_spec(&data, &ss);
1988 pa_sink_new_data_set_channel_map(&data, &map);
1989 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1990 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1991 if (u->sink_name)
1992 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1993
1994 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1995 pa_log("Invalid properties");
1996 pa_sink_new_data_done(&data);
1997 goto fail;
1998 }
1999
2000 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
2001 pa_sink_new_data_done(&data);
2002
2003 if (!u->sink) {
2004 pa_log("Failed to create sink.");
2005 goto fail;
2006 }
2007
2008 u->sink->parent.process_msg = sink_process_msg;
2009 u->sink->userdata = u;
2010 u->sink->set_state = sink_set_state;
2011 pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
2012 pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
2013
2014 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
2015
2016 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2017
2018 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2019 pa_sink_set_rtpoll(u->sink, u->rtpoll);
2020
2021 #else
2022
2023 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
2024 dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2025
2026 pa_source_new_data_init(&data);
2027 data.driver = __FILE__;
2028 data.module = m;
2029 data.namereg_fail = TRUE;
2030 pa_source_new_data_set_name(&data, dn);
2031 pa_source_new_data_set_sample_spec(&data, &ss);
2032 pa_source_new_data_set_channel_map(&data, &map);
2033 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2034 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2035 if (u->source_name)
2036 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2037
2038 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2039 pa_log("Invalid properties");
2040 pa_source_new_data_done(&data);
2041 goto fail;
2042 }
2043
2044 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2045 pa_source_new_data_done(&data);
2046
2047 if (!u->source) {
2048 pa_log("Failed to create source.");
2049 goto fail;
2050 }
2051
2052 u->source->parent.process_msg = source_process_msg;
2053 u->source->set_state = source_set_state;
2054 u->source->userdata = u;
2055
2056 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2057
2058 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2059 pa_source_set_rtpoll(u->source, u->rtpoll);
2060
2061 u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2062 #endif
2063
2064 pa_xfree(dn);
2065
2066 u->time_event = NULL;
2067
2068 u->maxlength = (uint32_t) -1;
2069 #ifdef TUNNEL_SINK
2070 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2071 #else
2072 u->fragsize = (uint32_t) -1;
2073 #endif
2074
2075 if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2076 pa_log("Failed to create thread.");
2077 goto fail;
2078 }
2079
2080 #ifdef TUNNEL_SINK
2081 pa_sink_put(u->sink);
2082 #else
2083 pa_source_put(u->source);
2084 #endif
2085
2086 pa_modargs_free(ma);
2087
2088 return 0;
2089
2090 fail:
2091 pa__done(m);
2092
2093 if (ma)
2094 pa_modargs_free(ma);
2095
2096 pa_xfree(dn);
2097
2098 return -1;
2099 }
2100
2101 void pa__done(pa_module*m) {
2102 struct userdata* u;
2103
2104 pa_assert(m);
2105
2106 if (!(u = m->userdata))
2107 return;
2108
2109 #ifdef TUNNEL_SINK
2110 if (u->sink)
2111 pa_sink_unlink(u->sink);
2112 #else
2113 if (u->source)
2114 pa_source_unlink(u->source);
2115 #endif
2116
2117 if (u->thread) {
2118 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2119 pa_thread_free(u->thread);
2120 }
2121
2122 pa_thread_mq_done(&u->thread_mq);
2123
2124 #ifdef TUNNEL_SINK
2125 if (u->sink)
2126 pa_sink_unref(u->sink);
2127 #else
2128 if (u->source)
2129 pa_source_unref(u->source);
2130 #endif
2131
2132 if (u->rtpoll)
2133 pa_rtpoll_free(u->rtpoll);
2134
2135 if (u->pstream) {
2136 pa_pstream_unlink(u->pstream);
2137 pa_pstream_unref(u->pstream);
2138 }
2139
2140 if (u->pdispatch)
2141 pa_pdispatch_unref(u->pdispatch);
2142
2143 if (u->client)
2144 pa_socket_client_unref(u->client);
2145
2146 if (u->auth_cookie)
2147 pa_auth_cookie_unref(u->auth_cookie);
2148
2149 if (u->smoother)
2150 pa_smoother_free(u->smoother);
2151
2152 if (u->time_event)
2153 u->core->mainloop->time_free(u->time_event);
2154
2155 #ifndef TUNNEL_SINK
2156 if (u->mcalign)
2157 pa_mcalign_free(u->mcalign);
2158 #endif
2159
2160 #ifdef TUNNEL_SINK
2161 pa_xfree(u->sink_name);
2162 #else
2163 pa_xfree(u->source_name);
2164 #endif
2165 pa_xfree(u->server_name);
2166
2167 pa_xfree(u->device_description);
2168 pa_xfree(u->server_fqdn);
2169 pa_xfree(u->user_name);
2170
2171 pa_xfree(u);
2172 }