]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
tunnel: fix parsing of sink info from newer servers
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/sink-input.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/time-smoother.h>
52 #include <pulsecore/thread.h>
53 #include <pulsecore/thread-mq.h>
54 #include <pulsecore/core-rtclock.h>
55 #include <pulsecore/core-error.h>
56 #include <pulsecore/proplist-util.h>
57 #include <pulsecore/auth-cookie.h>
58 #include <pulsecore/mcalign.h>
59
60 #ifdef TUNNEL_SINK
61 #include "module-tunnel-sink-symdef.h"
62 #else
63 #include "module-tunnel-source-symdef.h"
64 #endif
65
66 #ifdef TUNNEL_SINK
67 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
68 PA_MODULE_USAGE(
69 "sink_name=<name for the local sink> "
70 "sink_properties=<properties for the local sink> "
71 "server=<address> "
72 "sink=<remote sink name> "
73 "cookie=<filename> "
74 "format=<sample format> "
75 "channels=<number of channels> "
76 "rate=<sample rate> "
77 "channel_map=<channel map>");
78 #else
79 PA_MODULE_DESCRIPTION("Tunnel module for sources");
80 PA_MODULE_USAGE(
81 "source_name=<name for the local source> "
82 "source_properties=<properties for the local source> "
83 "server=<address> "
84 "source=<remote source name> "
85 "cookie=<filename> "
86 "format=<sample format> "
87 "channels=<number of channels> "
88 "rate=<sample rate> "
89 "channel_map=<channel map>");
90 #endif
91
92 PA_MODULE_AUTHOR("Lennart Poettering");
93 PA_MODULE_VERSION(PACKAGE_VERSION);
94 PA_MODULE_LOAD_ONCE(FALSE);
95
96 static const char* const valid_modargs[] = {
97 "server",
98 "cookie",
99 "format",
100 "channels",
101 "rate",
102 #ifdef TUNNEL_SINK
103 "sink_name",
104 "sink_properties",
105 "sink",
106 #else
107 "source_name",
108 "source_properties",
109 "source",
110 #endif
111 "channel_map",
112 NULL,
113 };
114
115 #define DEFAULT_TIMEOUT 5
116
117 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
118
119 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
120
121 #ifdef TUNNEL_SINK
122
123 enum {
124 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
125 SINK_MESSAGE_REMOTE_SUSPEND,
126 SINK_MESSAGE_UPDATE_LATENCY,
127 SINK_MESSAGE_POST
128 };
129
130 #define DEFAULT_TLENGTH_MSEC 150
131 #define DEFAULT_MINREQ_MSEC 25
132
133 #else
134
135 enum {
136 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
137 SOURCE_MESSAGE_REMOTE_SUSPEND,
138 SOURCE_MESSAGE_UPDATE_LATENCY
139 };
140
141 #define DEFAULT_FRAGSIZE_MSEC 25
142
143 #endif
144
145 #ifdef TUNNEL_SINK
146 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 #endif
149 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
155 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
156
157 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
158 #ifdef TUNNEL_SINK
159 [PA_COMMAND_REQUEST] = command_request,
160 [PA_COMMAND_STARTED] = command_started,
161 #endif
162 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
163 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
164 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
165 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
166 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
167 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
168 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
169 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
170 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
171 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
172 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
173 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
174 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
175 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
176 };
177
178 struct userdata {
179 pa_core *core;
180 pa_module *module;
181
182 pa_thread_mq thread_mq;
183 pa_rtpoll *rtpoll;
184 pa_thread *thread;
185
186 pa_socket_client *client;
187 pa_pstream *pstream;
188 pa_pdispatch *pdispatch;
189
190 char *server_name;
191 #ifdef TUNNEL_SINK
192 char *sink_name;
193 pa_sink *sink;
194 size_t requested_bytes;
195 #else
196 char *source_name;
197 pa_source *source;
198 pa_mcalign *mcalign;
199 #endif
200
201 pa_auth_cookie *auth_cookie;
202
203 uint32_t version;
204 uint32_t ctag;
205 uint32_t device_index;
206 uint32_t channel;
207
208 int64_t counter, counter_delta;
209
210 pa_bool_t remote_corked:1;
211 pa_bool_t remote_suspended:1;
212
213 pa_usec_t transport_usec; /* maintained in the main thread */
214 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
215
216 uint32_t ignore_latency_before;
217
218 pa_time_event *time_event;
219
220 pa_smoother *smoother;
221
222 char *device_description;
223 char *server_fqdn;
224 char *user_name;
225
226 uint32_t maxlength;
227 #ifdef TUNNEL_SINK
228 uint32_t tlength;
229 uint32_t minreq;
230 uint32_t prebuf;
231 #else
232 uint32_t fragsize;
233 #endif
234 };
235
236 static void request_latency(struct userdata *u);
237
238 /* Called from main context */
239 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
240 pa_log_debug("Got stream or client event.");
241 }
242
243 /* Called from main context */
244 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
245 struct userdata *u = userdata;
246
247 pa_assert(pd);
248 pa_assert(t);
249 pa_assert(u);
250 pa_assert(u->pdispatch == pd);
251
252 pa_log_warn("Stream killed");
253 pa_module_unload_request(u->module, TRUE);
254 }
255
256 /* Called from main context */
257 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
258 struct userdata *u = userdata;
259
260 pa_assert(pd);
261 pa_assert(t);
262 pa_assert(u);
263 pa_assert(u->pdispatch == pd);
264
265 pa_log_info("Server signalled buffer overrun/underrun.");
266 request_latency(u);
267 }
268
269 /* Called from main context */
270 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
271 struct userdata *u = userdata;
272 uint32_t channel;
273 pa_bool_t suspended;
274
275 pa_assert(pd);
276 pa_assert(t);
277 pa_assert(u);
278 pa_assert(u->pdispatch == pd);
279
280 if (pa_tagstruct_getu32(t, &channel) < 0 ||
281 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
282 !pa_tagstruct_eof(t)) {
283
284 pa_log("Invalid packet.");
285 pa_module_unload_request(u->module, TRUE);
286 return;
287 }
288
289 pa_log_debug("Server reports device suspend.");
290
291 #ifdef TUNNEL_SINK
292 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #else
294 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
295 #endif
296
297 request_latency(u);
298 }
299
300 /* Called from main context */
301 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
302 struct userdata *u = userdata;
303 uint32_t channel, di;
304 const char *dn;
305 pa_bool_t suspended;
306
307 pa_assert(pd);
308 pa_assert(t);
309 pa_assert(u);
310 pa_assert(u->pdispatch == pd);
311
312 if (pa_tagstruct_getu32(t, &channel) < 0 ||
313 pa_tagstruct_getu32(t, &di) < 0 ||
314 pa_tagstruct_gets(t, &dn) < 0 ||
315 pa_tagstruct_get_boolean(t, &suspended) < 0) {
316
317 pa_log_error("Invalid packet.");
318 pa_module_unload_request(u->module, TRUE);
319 return;
320 }
321
322 pa_log_debug("Server reports a stream move.");
323
324 #ifdef TUNNEL_SINK
325 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #else
327 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
328 #endif
329
330 request_latency(u);
331 }
332
333 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
334 struct userdata *u = userdata;
335 uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
336 pa_usec_t usec;
337
338 pa_assert(pd);
339 pa_assert(t);
340 pa_assert(u);
341 pa_assert(u->pdispatch == pd);
342
343 if (pa_tagstruct_getu32(t, &channel) < 0 ||
344 pa_tagstruct_getu32(t, &maxlength) < 0) {
345
346 pa_log_error("Invalid packet.");
347 pa_module_unload_request(u->module, TRUE);
348 return;
349 }
350
351 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
352 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
353 pa_tagstruct_get_usec(t, &usec) < 0) {
354
355 pa_log_error("Invalid packet.");
356 pa_module_unload_request(u->module, TRUE);
357 return;
358 }
359 } else {
360 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
361 pa_tagstruct_getu32(t, &prebuf) < 0 ||
362 pa_tagstruct_getu32(t, &minreq) < 0 ||
363 pa_tagstruct_get_usec(t, &usec) < 0) {
364
365 pa_log_error("Invalid packet.");
366 pa_module_unload_request(u->module, TRUE);
367 return;
368 }
369 }
370
371 #ifdef TUNNEL_SINK
372 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
373 #endif
374
375 request_latency(u);
376 }
377
378 #ifdef TUNNEL_SINK
379
380 /* Called from main context */
381 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
382 struct userdata *u = userdata;
383
384 pa_assert(pd);
385 pa_assert(t);
386 pa_assert(u);
387 pa_assert(u->pdispatch == pd);
388
389 pa_log_debug("Server reports playback started.");
390 request_latency(u);
391 }
392
393 #endif
394
395 /* Called from IO thread context */
396 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
397 pa_usec_t x;
398
399 pa_assert(u);
400
401 x = pa_rtclock_now();
402
403 /* Correct by the time the requested issued needs to travel to the
404 * other side. This is a valid thread-safe access, because the
405 * main thread is waiting for us */
406
407 if (past)
408 x -= u->thread_transport_usec;
409 else
410 x += u->thread_transport_usec;
411
412 if (u->remote_suspended || u->remote_corked)
413 pa_smoother_pause(u->smoother, x);
414 else
415 pa_smoother_resume(u->smoother, x, TRUE);
416 }
417
418 /* Called from IO thread context */
419 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
420 pa_assert(u);
421
422 if (u->remote_corked == cork)
423 return;
424
425 u->remote_corked = cork;
426 check_smoother_status(u, FALSE);
427 }
428
429 /* Called from main context */
430 static void stream_cork(struct userdata *u, pa_bool_t cork) {
431 pa_tagstruct *t;
432 pa_assert(u);
433
434 if (!u->pstream)
435 return;
436
437 t = pa_tagstruct_new(NULL, 0);
438 #ifdef TUNNEL_SINK
439 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
440 #else
441 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
442 #endif
443 pa_tagstruct_putu32(t, u->ctag++);
444 pa_tagstruct_putu32(t, u->channel);
445 pa_tagstruct_put_boolean(t, !!cork);
446 pa_pstream_send_tagstruct(u->pstream, t);
447
448 request_latency(u);
449 }
450
451 /* Called from IO thread context */
452 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
453 pa_assert(u);
454
455 if (u->remote_suspended == suspend)
456 return;
457
458 u->remote_suspended = suspend;
459 check_smoother_status(u, TRUE);
460 }
461
462 #ifdef TUNNEL_SINK
463
464 /* Called from IO thread context */
465 static void send_data(struct userdata *u) {
466 pa_assert(u);
467
468 while (u->requested_bytes > 0) {
469 pa_memchunk memchunk;
470
471 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
472 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
473 pa_memblock_unref(memchunk.memblock);
474
475 u->requested_bytes -= memchunk.length;
476
477 u->counter += (int64_t) memchunk.length;
478 }
479 }
480
481 /* This function is called from IO context -- except when it is not. */
482 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
483 struct userdata *u = PA_SINK(o)->userdata;
484
485 switch (code) {
486
487 case PA_SINK_MESSAGE_SET_STATE: {
488 int r;
489
490 /* First, change the state, because otherwide pa_sink_render() would fail */
491 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
492
493 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
494
495 if (PA_SINK_IS_OPENED(u->sink->state))
496 send_data(u);
497 }
498
499 return r;
500 }
501
502 case PA_SINK_MESSAGE_GET_LATENCY: {
503 pa_usec_t yl, yr, *usec = data;
504
505 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
506 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
507
508 *usec = yl > yr ? yl - yr : 0;
509 return 0;
510 }
511
512 case SINK_MESSAGE_REQUEST:
513
514 pa_assert(offset > 0);
515 u->requested_bytes += (size_t) offset;
516
517 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
518 send_data(u);
519
520 return 0;
521
522
523 case SINK_MESSAGE_REMOTE_SUSPEND:
524
525 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
526 return 0;
527
528
529 case SINK_MESSAGE_UPDATE_LATENCY: {
530 pa_usec_t y;
531
532 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
533
534 if (y > (pa_usec_t) offset)
535 y -= (pa_usec_t) offset;
536 else
537 y = 0;
538
539 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
540
541 /* We can access this freely here, since the main thread is waiting for us */
542 u->thread_transport_usec = u->transport_usec;
543
544 return 0;
545 }
546
547 case SINK_MESSAGE_POST:
548
549 /* OK, This might be a bit confusing. This message is
550 * delivered to us from the main context -- NOT from the
551 * IO thread context where the rest of the messages are
552 * dispatched. Yeah, ugly, but I am a lazy bastard. */
553
554 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
555
556 u->counter_delta += (int64_t) chunk->length;
557
558 return 0;
559 }
560
561 return pa_sink_process_msg(o, code, data, offset, chunk);
562 }
563
564 /* Called from main context */
565 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
566 struct userdata *u;
567 pa_sink_assert_ref(s);
568 u = s->userdata;
569
570 switch ((pa_sink_state_t) state) {
571
572 case PA_SINK_SUSPENDED:
573 pa_assert(PA_SINK_IS_OPENED(s->state));
574 stream_cork(u, TRUE);
575 break;
576
577 case PA_SINK_IDLE:
578 case PA_SINK_RUNNING:
579 if (s->state == PA_SINK_SUSPENDED)
580 stream_cork(u, FALSE);
581 break;
582
583 case PA_SINK_UNLINKED:
584 case PA_SINK_INIT:
585 case PA_SINK_INVALID_STATE:
586 ;
587 }
588
589 return 0;
590 }
591
592 #else
593
594 /* This function is called from IO context -- except when it is not. */
595 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
596 struct userdata *u = PA_SOURCE(o)->userdata;
597
598 switch (code) {
599
600 case PA_SOURCE_MESSAGE_SET_STATE: {
601 int r;
602
603 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
604 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
605
606 return r;
607 }
608
609 case PA_SOURCE_MESSAGE_GET_LATENCY: {
610 pa_usec_t yr, yl, *usec = data;
611
612 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
613 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
614
615 *usec = yr > yl ? yr - yl : 0;
616 return 0;
617 }
618
619 case SOURCE_MESSAGE_POST: {
620 pa_memchunk c;
621
622 pa_mcalign_push(u->mcalign, chunk);
623
624 while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
625
626 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
627 pa_source_post(u->source, &c);
628
629 pa_memblock_unref(c.memblock);
630
631 u->counter += (int64_t) c.length;
632 }
633
634 return 0;
635 }
636
637 case SOURCE_MESSAGE_REMOTE_SUSPEND:
638
639 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
640 return 0;
641
642 case SOURCE_MESSAGE_UPDATE_LATENCY: {
643 pa_usec_t y;
644
645 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
646 y += (pa_usec_t) offset;
647
648 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
649
650 /* We can access this freely here, since the main thread is waiting for us */
651 u->thread_transport_usec = u->transport_usec;
652
653 return 0;
654 }
655 }
656
657 return pa_source_process_msg(o, code, data, offset, chunk);
658 }
659
660 /* Called from main context */
661 static int source_set_state(pa_source *s, pa_source_state_t state) {
662 struct userdata *u;
663 pa_source_assert_ref(s);
664 u = s->userdata;
665
666 switch ((pa_source_state_t) state) {
667
668 case PA_SOURCE_SUSPENDED:
669 pa_assert(PA_SOURCE_IS_OPENED(s->state));
670 stream_cork(u, TRUE);
671 break;
672
673 case PA_SOURCE_IDLE:
674 case PA_SOURCE_RUNNING:
675 if (s->state == PA_SOURCE_SUSPENDED)
676 stream_cork(u, FALSE);
677 break;
678
679 case PA_SOURCE_UNLINKED:
680 case PA_SOURCE_INIT:
681 case PA_SINK_INVALID_STATE:
682 ;
683 }
684
685 return 0;
686 }
687
688 #endif
689
690 static void thread_func(void *userdata) {
691 struct userdata *u = userdata;
692
693 pa_assert(u);
694
695 pa_log_debug("Thread starting up");
696
697 pa_thread_mq_install(&u->thread_mq);
698
699 for (;;) {
700 int ret;
701
702 #ifdef TUNNEL_SINK
703 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
704 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
705 pa_sink_process_rewind(u->sink, 0);
706 #endif
707
708 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
709 goto fail;
710
711 if (ret == 0)
712 goto finish;
713 }
714
715 fail:
716 /* If this was no regular exit from the loop we have to continue
717 * processing messages until we received PA_MESSAGE_SHUTDOWN */
718 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
719 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
720
721 finish:
722 pa_log_debug("Thread shutting down");
723 }
724
725 #ifdef TUNNEL_SINK
726 /* Called from main context */
727 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
728 struct userdata *u = userdata;
729 uint32_t bytes, channel;
730
731 pa_assert(pd);
732 pa_assert(command == PA_COMMAND_REQUEST);
733 pa_assert(t);
734 pa_assert(u);
735 pa_assert(u->pdispatch == pd);
736
737 if (pa_tagstruct_getu32(t, &channel) < 0 ||
738 pa_tagstruct_getu32(t, &bytes) < 0) {
739 pa_log("Invalid protocol reply");
740 goto fail;
741 }
742
743 if (channel != u->channel) {
744 pa_log("Received data for invalid channel");
745 goto fail;
746 }
747
748 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
749 return;
750
751 fail:
752 pa_module_unload_request(u->module, TRUE);
753 }
754
755 #endif
756
757 /* Called from main context */
758 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
759 struct userdata *u = userdata;
760 pa_usec_t sink_usec, source_usec;
761 pa_bool_t playing;
762 int64_t write_index, read_index;
763 struct timeval local, remote, now;
764 pa_sample_spec *ss;
765 int64_t delay;
766
767 pa_assert(pd);
768 pa_assert(u);
769
770 if (command != PA_COMMAND_REPLY) {
771 if (command == PA_COMMAND_ERROR)
772 pa_log("Failed to get latency.");
773 else
774 pa_log("Protocol error.");
775 goto fail;
776 }
777
778 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
779 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
780 pa_tagstruct_get_boolean(t, &playing) < 0 ||
781 pa_tagstruct_get_timeval(t, &local) < 0 ||
782 pa_tagstruct_get_timeval(t, &remote) < 0 ||
783 pa_tagstruct_gets64(t, &write_index) < 0 ||
784 pa_tagstruct_gets64(t, &read_index) < 0) {
785 pa_log("Invalid reply.");
786 goto fail;
787 }
788
789 #ifdef TUNNEL_SINK
790 if (u->version >= 13) {
791 uint64_t underrun_for = 0, playing_for = 0;
792
793 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
794 pa_tagstruct_getu64(t, &playing_for) < 0) {
795 pa_log("Invalid reply.");
796 goto fail;
797 }
798 }
799 #endif
800
801 if (!pa_tagstruct_eof(t)) {
802 pa_log("Invalid reply.");
803 goto fail;
804 }
805
806 if (tag < u->ignore_latency_before) {
807 return;
808 }
809
810 pa_gettimeofday(&now);
811
812 /* Calculate transport usec */
813 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
814 /* local and remote seem to have synchronized clocks */
815 #ifdef TUNNEL_SINK
816 u->transport_usec = pa_timeval_diff(&remote, &local);
817 #else
818 u->transport_usec = pa_timeval_diff(&now, &remote);
819 #endif
820 } else
821 u->transport_usec = pa_timeval_diff(&now, &local)/2;
822
823 /* First, take the device's delay */
824 #ifdef TUNNEL_SINK
825 delay = (int64_t) sink_usec;
826 ss = &u->sink->sample_spec;
827 #else
828 delay = (int64_t) source_usec;
829 ss = &u->source->sample_spec;
830 #endif
831
832 /* Add the length of our server-side buffer */
833 if (write_index >= read_index)
834 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
835 else
836 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
837
838 /* Our measurements are already out of date, hence correct by the *
839 * transport latency */
840 #ifdef TUNNEL_SINK
841 delay -= (int64_t) u->transport_usec;
842 #else
843 delay += (int64_t) u->transport_usec;
844 #endif
845
846 /* Now correct by what we have have read/written since we requested the update */
847 #ifdef TUNNEL_SINK
848 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
849 #else
850 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
851 #endif
852
853 #ifdef TUNNEL_SINK
854 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
855 #else
856 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
857 #endif
858
859 return;
860
861 fail:
862
863 pa_module_unload_request(u->module, TRUE);
864 }
865
866 /* Called from main context */
867 static void request_latency(struct userdata *u) {
868 pa_tagstruct *t;
869 struct timeval now;
870 uint32_t tag;
871 pa_assert(u);
872
873 t = pa_tagstruct_new(NULL, 0);
874 #ifdef TUNNEL_SINK
875 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
876 #else
877 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
878 #endif
879 pa_tagstruct_putu32(t, tag = u->ctag++);
880 pa_tagstruct_putu32(t, u->channel);
881
882 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
883
884 pa_pstream_send_tagstruct(u->pstream, t);
885 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
886
887 u->ignore_latency_before = tag;
888 u->counter_delta = 0;
889 }
890
891 /* Called from main context */
892 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
893 struct userdata *u = userdata;
894
895 pa_assert(m);
896 pa_assert(e);
897 pa_assert(u);
898
899 request_latency(u);
900
901 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
902 }
903
904 /* Called from main context */
905 static void update_description(struct userdata *u) {
906 char *d;
907 char un[128], hn[128];
908 pa_tagstruct *t;
909
910 pa_assert(u);
911
912 if (!u->server_fqdn || !u->user_name || !u->device_description)
913 return;
914
915 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
916
917 #ifdef TUNNEL_SINK
918 pa_sink_set_description(u->sink, d);
919 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
920 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
921 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
922 #else
923 pa_source_set_description(u->source, d);
924 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
925 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
926 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
927 #endif
928
929 pa_xfree(d);
930
931 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
932 pa_get_user_name(un, sizeof(un)),
933 pa_get_host_name(hn, sizeof(hn)));
934
935 t = pa_tagstruct_new(NULL, 0);
936 #ifdef TUNNEL_SINK
937 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
938 #else
939 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
940 #endif
941 pa_tagstruct_putu32(t, u->ctag++);
942 pa_tagstruct_putu32(t, u->channel);
943 pa_tagstruct_puts(t, d);
944 pa_pstream_send_tagstruct(u->pstream, t);
945
946 pa_xfree(d);
947 }
948
949 /* Called from main context */
950 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
951 struct userdata *u = userdata;
952 pa_sample_spec ss;
953 pa_channel_map cm;
954 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
955 uint32_t cookie;
956
957 pa_assert(pd);
958 pa_assert(u);
959
960 if (command != PA_COMMAND_REPLY) {
961 if (command == PA_COMMAND_ERROR)
962 pa_log("Failed to get info.");
963 else
964 pa_log("Protocol error.");
965 goto fail;
966 }
967
968 if (pa_tagstruct_gets(t, &server_name) < 0 ||
969 pa_tagstruct_gets(t, &server_version) < 0 ||
970 pa_tagstruct_gets(t, &user_name) < 0 ||
971 pa_tagstruct_gets(t, &host_name) < 0 ||
972 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
973 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
974 pa_tagstruct_gets(t, &default_source_name) < 0 ||
975 pa_tagstruct_getu32(t, &cookie) < 0 ||
976 (u->version >= 15 &&
977 pa_tagstruct_get_channel_map(t, &cm) < 0)) {
978
979 pa_log("Parse failure");
980 goto fail;
981 }
982
983 if (!pa_tagstruct_eof(t)) {
984 pa_log("Packet too long");
985 goto fail;
986 }
987
988 pa_xfree(u->server_fqdn);
989 u->server_fqdn = pa_xstrdup(host_name);
990
991 pa_xfree(u->user_name);
992 u->user_name = pa_xstrdup(user_name);
993
994 update_description(u);
995
996 return;
997
998 fail:
999 pa_module_unload_request(u->module, TRUE);
1000 }
1001
1002 #ifdef TUNNEL_SINK
1003
1004 /* Called from main context */
1005 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1006 struct userdata *u = userdata;
1007 uint32_t idx, owner_module, monitor_source, flags;
1008 const char *name, *description, *monitor_source_name, *driver;
1009 pa_sample_spec ss;
1010 pa_channel_map cm;
1011 pa_cvolume volume;
1012 pa_bool_t mute;
1013 pa_usec_t latency;
1014 pa_proplist *pl;
1015
1016 pa_assert(pd);
1017 pa_assert(u);
1018
1019 pl = pa_proplist_new();
1020
1021 if (command != PA_COMMAND_REPLY) {
1022 if (command == PA_COMMAND_ERROR)
1023 pa_log("Failed to get info.");
1024 else
1025 pa_log("Protocol error.");
1026 goto fail;
1027 }
1028
1029 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1030 pa_tagstruct_gets(t, &name) < 0 ||
1031 pa_tagstruct_gets(t, &description) < 0 ||
1032 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1033 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1034 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1035 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1036 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1037 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1038 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1039 pa_tagstruct_get_usec(t, &latency) < 0 ||
1040 pa_tagstruct_gets(t, &driver) < 0 ||
1041 pa_tagstruct_getu32(t, &flags) < 0) {
1042
1043 pa_log("Parse failure");
1044 goto fail;
1045 }
1046
1047 if (u->version >= 13) {
1048 pa_usec_t configured_latency;
1049
1050 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1051 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1052
1053 pa_log("Parse failure");
1054 goto fail;
1055 }
1056 }
1057
1058 if (u->version >= 15) {
1059 pa_volume_t base_volume;
1060 uint32_t state, n_volume_steps, card;
1061
1062 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1063 pa_tagstruct_getu32(t, &state) < 0 ||
1064 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1065 pa_tagstruct_getu32(t, &card) < 0) {
1066
1067 pa_log("Parse failure");
1068 goto fail;
1069 }
1070 }
1071
1072 if (u->version >= 16) {
1073 uint32_t n_ports, j;
1074 const char *s;
1075
1076 if (pa_tagstruct_getu32(t, &n_ports)) {
1077 pa_log("Parse failure");
1078 goto fail;
1079 }
1080
1081 for (j = 0; j < n_ports; j++) {
1082 uint32_t priority;
1083
1084 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1085 pa_tagstruct_gets(t, &s) < 0 || /* description */
1086 pa_tagstruct_getu32(t, &priority) < 0) {
1087
1088 pa_log("Parse failure");
1089 goto fail;
1090 }
1091 }
1092
1093 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1094 pa_log("Parse failure");
1095 goto fail;
1096 }
1097 }
1098
1099 if (!pa_tagstruct_eof(t)) {
1100 pa_log("Packet too long");
1101 goto fail;
1102 }
1103
1104 pa_proplist_free(pl);
1105
1106 if (!u->sink_name || strcmp(name, u->sink_name))
1107 return;
1108
1109 pa_xfree(u->device_description);
1110 u->device_description = pa_xstrdup(description);
1111
1112 update_description(u);
1113
1114 return;
1115
1116 fail:
1117 pa_module_unload_request(u->module, TRUE);
1118 pa_proplist_free(pl);
1119 }
1120
1121 /* Called from main context */
1122 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1123 struct userdata *u = userdata;
1124 uint32_t idx, owner_module, client, sink;
1125 pa_usec_t buffer_usec, sink_usec;
1126 const char *name, *driver, *resample_method;
1127 pa_bool_t mute = FALSE;
1128 pa_sample_spec sample_spec;
1129 pa_channel_map channel_map;
1130 pa_cvolume volume;
1131 pa_proplist *pl;
1132
1133 pa_assert(pd);
1134 pa_assert(u);
1135
1136 pl = pa_proplist_new();
1137
1138 if (command != PA_COMMAND_REPLY) {
1139 if (command == PA_COMMAND_ERROR)
1140 pa_log("Failed to get info.");
1141 else
1142 pa_log("Protocol error.");
1143 goto fail;
1144 }
1145
1146 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1147 pa_tagstruct_gets(t, &name) < 0 ||
1148 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1149 pa_tagstruct_getu32(t, &client) < 0 ||
1150 pa_tagstruct_getu32(t, &sink) < 0 ||
1151 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1152 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1153 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1154 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1155 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1156 pa_tagstruct_gets(t, &resample_method) < 0 ||
1157 pa_tagstruct_gets(t, &driver) < 0) {
1158
1159 pa_log("Parse failure");
1160 goto fail;
1161 }
1162
1163 if (u->version >= 11) {
1164 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1165
1166 pa_log("Parse failure");
1167 goto fail;
1168 }
1169 }
1170
1171 if (u->version >= 13) {
1172 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1173
1174 pa_log("Parse failure");
1175 goto fail;
1176 }
1177 }
1178
1179 if (!pa_tagstruct_eof(t)) {
1180 pa_log("Packet too long");
1181 goto fail;
1182 }
1183
1184 pa_proplist_free(pl);
1185
1186 if (idx != u->device_index)
1187 return;
1188
1189 pa_assert(u->sink);
1190
1191 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1192 pa_cvolume_equal(&volume, &u->sink->real_volume))
1193 return;
1194
1195 pa_sink_volume_changed(u->sink, &volume);
1196
1197 if (u->version >= 11)
1198 pa_sink_mute_changed(u->sink, mute);
1199
1200 return;
1201
1202 fail:
1203 pa_module_unload_request(u->module, TRUE);
1204 pa_proplist_free(pl);
1205 }
1206
1207 #else
1208
1209 /* Called from main context */
1210 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1211 struct userdata *u = userdata;
1212 uint32_t idx, owner_module, monitor_of_sink, flags;
1213 const char *name, *description, *monitor_of_sink_name, *driver;
1214 pa_sample_spec ss;
1215 pa_channel_map cm;
1216 pa_cvolume volume;
1217 pa_bool_t mute;
1218 pa_usec_t latency, configured_latency;
1219 pa_proplist *pl;
1220
1221 pa_assert(pd);
1222 pa_assert(u);
1223
1224 pl = pa_proplist_new();
1225
1226 if (command != PA_COMMAND_REPLY) {
1227 if (command == PA_COMMAND_ERROR)
1228 pa_log("Failed to get info.");
1229 else
1230 pa_log("Protocol error.");
1231 goto fail;
1232 }
1233
1234 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1235 pa_tagstruct_gets(t, &name) < 0 ||
1236 pa_tagstruct_gets(t, &description) < 0 ||
1237 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1238 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1239 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1240 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1241 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1242 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1243 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1244 pa_tagstruct_get_usec(t, &latency) < 0 ||
1245 pa_tagstruct_gets(t, &driver) < 0 ||
1246 pa_tagstruct_getu32(t, &flags) < 0) {
1247
1248 pa_log("Parse failure");
1249 goto fail;
1250 }
1251
1252 if (u->version >= 13) {
1253 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1254 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1255
1256 pa_log("Parse failure");
1257 goto fail;
1258 }
1259 }
1260
1261 if (u->version >= 15) {
1262 pa_volume_t base_volume;
1263 uint32_t state, n_volume_steps, card;
1264
1265 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1266 pa_tagstruct_getu32(t, &state) < 0 ||
1267 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1268 pa_tagstruct_getu32(t, &card) < 0) {
1269
1270 pa_log("Parse failure");
1271 goto fail;
1272 }
1273 }
1274
1275 if (!pa_tagstruct_eof(t)) {
1276 pa_log("Packet too long");
1277 goto fail;
1278 }
1279
1280 pa_proplist_free(pl);
1281
1282 if (!u->source_name || strcmp(name, u->source_name))
1283 return;
1284
1285 pa_xfree(u->device_description);
1286 u->device_description = pa_xstrdup(description);
1287
1288 update_description(u);
1289
1290 return;
1291
1292 fail:
1293 pa_module_unload_request(u->module, TRUE);
1294 pa_proplist_free(pl);
1295 }
1296
1297 #endif
1298
1299 /* Called from main context */
1300 static void request_info(struct userdata *u) {
1301 pa_tagstruct *t;
1302 uint32_t tag;
1303 pa_assert(u);
1304
1305 t = pa_tagstruct_new(NULL, 0);
1306 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1307 pa_tagstruct_putu32(t, tag = u->ctag++);
1308 pa_pstream_send_tagstruct(u->pstream, t);
1309 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1310
1311 #ifdef TUNNEL_SINK
1312 t = pa_tagstruct_new(NULL, 0);
1313 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1314 pa_tagstruct_putu32(t, tag = u->ctag++);
1315 pa_tagstruct_putu32(t, u->device_index);
1316 pa_pstream_send_tagstruct(u->pstream, t);
1317 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1318
1319 if (u->sink_name) {
1320 t = pa_tagstruct_new(NULL, 0);
1321 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1322 pa_tagstruct_putu32(t, tag = u->ctag++);
1323 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1324 pa_tagstruct_puts(t, u->sink_name);
1325 pa_pstream_send_tagstruct(u->pstream, t);
1326 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1327 }
1328 #else
1329 if (u->source_name) {
1330 t = pa_tagstruct_new(NULL, 0);
1331 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1332 pa_tagstruct_putu32(t, tag = u->ctag++);
1333 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1334 pa_tagstruct_puts(t, u->source_name);
1335 pa_pstream_send_tagstruct(u->pstream, t);
1336 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1337 }
1338 #endif
1339 }
1340
1341 /* Called from main context */
1342 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1343 struct userdata *u = userdata;
1344 pa_subscription_event_type_t e;
1345 uint32_t idx;
1346
1347 pa_assert(pd);
1348 pa_assert(t);
1349 pa_assert(u);
1350 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1351
1352 if (pa_tagstruct_getu32(t, &e) < 0 ||
1353 pa_tagstruct_getu32(t, &idx) < 0) {
1354 pa_log("Invalid protocol reply");
1355 pa_module_unload_request(u->module, TRUE);
1356 return;
1357 }
1358
1359 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1360 #ifdef TUNNEL_SINK
1361 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1362 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1363 #else
1364 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1365 #endif
1366 )
1367 return;
1368
1369 request_info(u);
1370 }
1371
1372 /* Called from main context */
1373 static void start_subscribe(struct userdata *u) {
1374 pa_tagstruct *t;
1375 pa_assert(u);
1376
1377 t = pa_tagstruct_new(NULL, 0);
1378 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1379 pa_tagstruct_putu32(t, u->ctag++);
1380 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1381 #ifdef TUNNEL_SINK
1382 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1383 #else
1384 PA_SUBSCRIPTION_MASK_SOURCE
1385 #endif
1386 );
1387
1388 pa_pstream_send_tagstruct(u->pstream, t);
1389 }
1390
1391 /* Called from main context */
1392 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1393 struct userdata *u = userdata;
1394 #ifdef TUNNEL_SINK
1395 uint32_t bytes;
1396 #endif
1397
1398 pa_assert(pd);
1399 pa_assert(u);
1400 pa_assert(u->pdispatch == pd);
1401
1402 if (command != PA_COMMAND_REPLY) {
1403 if (command == PA_COMMAND_ERROR)
1404 pa_log("Failed to create stream.");
1405 else
1406 pa_log("Protocol error.");
1407 goto fail;
1408 }
1409
1410 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1411 pa_tagstruct_getu32(t, &u->device_index) < 0
1412 #ifdef TUNNEL_SINK
1413 || pa_tagstruct_getu32(t, &bytes) < 0
1414 #endif
1415 )
1416 goto parse_error;
1417
1418 if (u->version >= 9) {
1419 #ifdef TUNNEL_SINK
1420 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1421 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1422 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1423 pa_tagstruct_getu32(t, &u->minreq) < 0)
1424 goto parse_error;
1425 #else
1426 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1427 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1428 goto parse_error;
1429 #endif
1430 }
1431
1432 if (u->version >= 12) {
1433 pa_sample_spec ss;
1434 pa_channel_map cm;
1435 uint32_t device_index;
1436 const char *dn;
1437 pa_bool_t suspended;
1438
1439 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1440 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1441 pa_tagstruct_getu32(t, &device_index) < 0 ||
1442 pa_tagstruct_gets(t, &dn) < 0 ||
1443 pa_tagstruct_get_boolean(t, &suspended) < 0)
1444 goto parse_error;
1445
1446 #ifdef TUNNEL_SINK
1447 pa_xfree(u->sink_name);
1448 u->sink_name = pa_xstrdup(dn);
1449 #else
1450 pa_xfree(u->source_name);
1451 u->source_name = pa_xstrdup(dn);
1452 #endif
1453 }
1454
1455 if (u->version >= 13) {
1456 pa_usec_t usec;
1457
1458 if (pa_tagstruct_get_usec(t, &usec) < 0)
1459 goto parse_error;
1460
1461 /* #ifdef TUNNEL_SINK */
1462 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1463 /* #else */
1464 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1465 /* #endif */
1466 }
1467
1468 if (!pa_tagstruct_eof(t))
1469 goto parse_error;
1470
1471 start_subscribe(u);
1472 request_info(u);
1473
1474 pa_assert(!u->time_event);
1475 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1476
1477 request_latency(u);
1478
1479 pa_log_debug("Stream created.");
1480
1481 #ifdef TUNNEL_SINK
1482 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1483 #endif
1484
1485 return;
1486
1487 parse_error:
1488 pa_log("Invalid reply. (Create stream)");
1489
1490 fail:
1491 pa_module_unload_request(u->module, TRUE);
1492
1493 }
1494
1495 /* Called from main context */
1496 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1497 struct userdata *u = userdata;
1498 pa_tagstruct *reply;
1499 char name[256], un[128], hn[128];
1500 #ifdef TUNNEL_SINK
1501 pa_cvolume volume;
1502 #endif
1503
1504 pa_assert(pd);
1505 pa_assert(u);
1506 pa_assert(u->pdispatch == pd);
1507
1508 if (command != PA_COMMAND_REPLY ||
1509 pa_tagstruct_getu32(t, &u->version) < 0 ||
1510 !pa_tagstruct_eof(t)) {
1511
1512 if (command == PA_COMMAND_ERROR)
1513 pa_log("Failed to authenticate");
1514 else
1515 pa_log("Protocol error.");
1516
1517 goto fail;
1518 }
1519
1520 /* Minimum supported protocol version */
1521 if (u->version < 8) {
1522 pa_log("Incompatible protocol version");
1523 goto fail;
1524 }
1525
1526 /* Starting with protocol version 13 the MSB of the version tag
1527 reflects if shm is enabled for this connection or not. We don't
1528 support SHM here at all, so we just ignore this. */
1529
1530 if (u->version >= 13)
1531 u->version &= 0x7FFFFFFFU;
1532
1533 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1534
1535 #ifdef TUNNEL_SINK
1536 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1537 pa_sink_update_proplist(u->sink, 0, NULL);
1538
1539 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1540 u->sink_name,
1541 pa_get_user_name(un, sizeof(un)),
1542 pa_get_host_name(hn, sizeof(hn)));
1543 #else
1544 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1545 pa_source_update_proplist(u->source, 0, NULL);
1546
1547 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1548 u->source_name,
1549 pa_get_user_name(un, sizeof(un)),
1550 pa_get_host_name(hn, sizeof(hn)));
1551 #endif
1552
1553 reply = pa_tagstruct_new(NULL, 0);
1554 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1555 pa_tagstruct_putu32(reply, u->ctag++);
1556
1557 if (u->version >= 13) {
1558 pa_proplist *pl;
1559 pl = pa_proplist_new();
1560 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1561 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1562 pa_init_proplist(pl);
1563 pa_tagstruct_put_proplist(reply, pl);
1564 pa_proplist_free(pl);
1565 } else
1566 pa_tagstruct_puts(reply, "PulseAudio");
1567
1568 pa_pstream_send_tagstruct(u->pstream, reply);
1569 /* We ignore the server's reply here */
1570
1571 reply = pa_tagstruct_new(NULL, 0);
1572
1573 if (u->version < 13)
1574 /* Only for older PA versions we need to fill in the maxlength */
1575 u->maxlength = 4*1024*1024;
1576
1577 #ifdef TUNNEL_SINK
1578 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1579 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1580 u->prebuf = u->tlength;
1581 #else
1582 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1583 #endif
1584
1585 #ifdef TUNNEL_SINK
1586 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1587 pa_tagstruct_putu32(reply, tag = u->ctag++);
1588
1589 if (u->version < 13)
1590 pa_tagstruct_puts(reply, name);
1591
1592 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1593 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1594 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1595 pa_tagstruct_puts(reply, u->sink_name);
1596 pa_tagstruct_putu32(reply, u->maxlength);
1597 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1598 pa_tagstruct_putu32(reply, u->tlength);
1599 pa_tagstruct_putu32(reply, u->prebuf);
1600 pa_tagstruct_putu32(reply, u->minreq);
1601 pa_tagstruct_putu32(reply, 0);
1602 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1603 pa_tagstruct_put_cvolume(reply, &volume);
1604 #else
1605 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1606 pa_tagstruct_putu32(reply, tag = u->ctag++);
1607
1608 if (u->version < 13)
1609 pa_tagstruct_puts(reply, name);
1610
1611 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1612 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1613 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1614 pa_tagstruct_puts(reply, u->source_name);
1615 pa_tagstruct_putu32(reply, u->maxlength);
1616 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1617 pa_tagstruct_putu32(reply, u->fragsize);
1618 #endif
1619
1620 if (u->version >= 12) {
1621 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1622 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1623 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1624 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1625 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1626 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1627 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1628 }
1629
1630 if (u->version >= 13) {
1631 pa_proplist *pl;
1632
1633 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1634 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1635
1636 pl = pa_proplist_new();
1637 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1638 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1639 pa_tagstruct_put_proplist(reply, pl);
1640 pa_proplist_free(pl);
1641
1642 #ifndef TUNNEL_SINK
1643 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1644 #endif
1645 }
1646
1647 if (u->version >= 14) {
1648 #ifdef TUNNEL_SINK
1649 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1650 #endif
1651 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1652 }
1653
1654 if (u->version >= 15) {
1655 #ifdef TUNNEL_SINK
1656 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1657 #endif
1658 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1659 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1660 }
1661
1662 pa_pstream_send_tagstruct(u->pstream, reply);
1663 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1664
1665 pa_log_debug("Connection authenticated, creating stream ...");
1666
1667 return;
1668
1669 fail:
1670 pa_module_unload_request(u->module, TRUE);
1671 }
1672
1673 /* Called from main context */
1674 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1675 struct userdata *u = userdata;
1676
1677 pa_assert(p);
1678 pa_assert(u);
1679
1680 pa_log_warn("Stream died.");
1681 pa_module_unload_request(u->module, TRUE);
1682 }
1683
1684 /* Called from main context */
1685 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1686 struct userdata *u = userdata;
1687
1688 pa_assert(p);
1689 pa_assert(packet);
1690 pa_assert(u);
1691
1692 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1693 pa_log("Invalid packet");
1694 pa_module_unload_request(u->module, TRUE);
1695 return;
1696 }
1697 }
1698
1699 #ifndef TUNNEL_SINK
1700 /* Called from main context */
1701 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1702 struct userdata *u = userdata;
1703
1704 pa_assert(p);
1705 pa_assert(chunk);
1706 pa_assert(u);
1707
1708 if (channel != u->channel) {
1709 pa_log("Received memory block on bad channel.");
1710 pa_module_unload_request(u->module, TRUE);
1711 return;
1712 }
1713
1714 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1715
1716 u->counter_delta += (int64_t) chunk->length;
1717 }
1718 #endif
1719
1720 /* Called from main context */
1721 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1722 struct userdata *u = userdata;
1723 pa_tagstruct *t;
1724 uint32_t tag;
1725
1726 pa_assert(sc);
1727 pa_assert(u);
1728 pa_assert(u->client == sc);
1729
1730 pa_socket_client_unref(u->client);
1731 u->client = NULL;
1732
1733 if (!io) {
1734 pa_log("Connection failed: %s", pa_cstrerror(errno));
1735 pa_module_unload_request(u->module, TRUE);
1736 return;
1737 }
1738
1739 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1740 u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
1741
1742 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1743 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1744 #ifndef TUNNEL_SINK
1745 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1746 #endif
1747
1748 t = pa_tagstruct_new(NULL, 0);
1749 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1750 pa_tagstruct_putu32(t, tag = u->ctag++);
1751 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1752
1753 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1754
1755 #ifdef HAVE_CREDS
1756 {
1757 pa_creds ucred;
1758
1759 if (pa_iochannel_creds_supported(io))
1760 pa_iochannel_creds_enable(io);
1761
1762 ucred.uid = getuid();
1763 ucred.gid = getgid();
1764
1765 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1766 }
1767 #else
1768 pa_pstream_send_tagstruct(u->pstream, t);
1769 #endif
1770
1771 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1772
1773 pa_log_debug("Connection established, authenticating ...");
1774 }
1775
1776 #ifdef TUNNEL_SINK
1777
1778 /* Called from main context */
1779 static void sink_set_volume(pa_sink *sink) {
1780 struct userdata *u;
1781 pa_tagstruct *t;
1782
1783 pa_assert(sink);
1784 u = sink->userdata;
1785 pa_assert(u);
1786
1787 t = pa_tagstruct_new(NULL, 0);
1788 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1789 pa_tagstruct_putu32(t, u->ctag++);
1790 pa_tagstruct_putu32(t, u->device_index);
1791 pa_tagstruct_put_cvolume(t, &sink->real_volume);
1792 pa_pstream_send_tagstruct(u->pstream, t);
1793 }
1794
1795 /* Called from main context */
1796 static void sink_set_mute(pa_sink *sink) {
1797 struct userdata *u;
1798 pa_tagstruct *t;
1799
1800 pa_assert(sink);
1801 u = sink->userdata;
1802 pa_assert(u);
1803
1804 if (u->version < 11)
1805 return;
1806
1807 t = pa_tagstruct_new(NULL, 0);
1808 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1809 pa_tagstruct_putu32(t, u->ctag++);
1810 pa_tagstruct_putu32(t, u->device_index);
1811 pa_tagstruct_put_boolean(t, !!sink->muted);
1812 pa_pstream_send_tagstruct(u->pstream, t);
1813 }
1814
1815 #endif
1816
1817 int pa__init(pa_module*m) {
1818 pa_modargs *ma = NULL;
1819 struct userdata *u = NULL;
1820 pa_sample_spec ss;
1821 pa_channel_map map;
1822 char *dn = NULL;
1823 #ifdef TUNNEL_SINK
1824 pa_sink_new_data data;
1825 #else
1826 pa_source_new_data data;
1827 #endif
1828
1829 pa_assert(m);
1830
1831 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1832 pa_log("Failed to parse module arguments");
1833 goto fail;
1834 }
1835
1836 m->userdata = u = pa_xnew0(struct userdata, 1);
1837 u->core = m->core;
1838 u->module = m;
1839 u->client = NULL;
1840 u->pdispatch = NULL;
1841 u->pstream = NULL;
1842 u->server_name = NULL;
1843 #ifdef TUNNEL_SINK
1844 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1845 u->sink = NULL;
1846 u->requested_bytes = 0;
1847 #else
1848 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1849 u->source = NULL;
1850 #endif
1851 u->smoother = pa_smoother_new(
1852 PA_USEC_PER_SEC,
1853 PA_USEC_PER_SEC*2,
1854 TRUE,
1855 TRUE,
1856 10,
1857 pa_rtclock_now(),
1858 FALSE);
1859 u->ctag = 1;
1860 u->device_index = u->channel = PA_INVALID_INDEX;
1861 u->time_event = NULL;
1862 u->ignore_latency_before = 0;
1863 u->transport_usec = u->thread_transport_usec = 0;
1864 u->remote_suspended = u->remote_corked = FALSE;
1865 u->counter = u->counter_delta = 0;
1866
1867 u->rtpoll = pa_rtpoll_new();
1868 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1869
1870 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1871 goto fail;
1872
1873 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1874 pa_log("No server specified.");
1875 goto fail;
1876 }
1877
1878 ss = m->core->default_sample_spec;
1879 map = m->core->default_channel_map;
1880 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1881 pa_log("Invalid sample format specification");
1882 goto fail;
1883 }
1884
1885 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1886 pa_log("Failed to connect to server '%s'", u->server_name);
1887 goto fail;
1888 }
1889
1890 pa_socket_client_set_callback(u->client, on_connection, u);
1891
1892 #ifdef TUNNEL_SINK
1893
1894 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1895 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1896
1897 pa_sink_new_data_init(&data);
1898 data.driver = __FILE__;
1899 data.module = m;
1900 data.namereg_fail = TRUE;
1901 pa_sink_new_data_set_name(&data, dn);
1902 pa_sink_new_data_set_sample_spec(&data, &ss);
1903 pa_sink_new_data_set_channel_map(&data, &map);
1904 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1905 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1906 if (u->sink_name)
1907 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1908
1909 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1910 pa_log("Invalid properties");
1911 pa_sink_new_data_done(&data);
1912 goto fail;
1913 }
1914
1915 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
1916 pa_sink_new_data_done(&data);
1917
1918 if (!u->sink) {
1919 pa_log("Failed to create sink.");
1920 goto fail;
1921 }
1922
1923 u->sink->parent.process_msg = sink_process_msg;
1924 u->sink->userdata = u;
1925 u->sink->set_state = sink_set_state;
1926 u->sink->set_volume = sink_set_volume;
1927 u->sink->set_mute = sink_set_mute;
1928
1929 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1930
1931 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1932
1933 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1934 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1935
1936 #else
1937
1938 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1939 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1940
1941 pa_source_new_data_init(&data);
1942 data.driver = __FILE__;
1943 data.module = m;
1944 data.namereg_fail = TRUE;
1945 pa_source_new_data_set_name(&data, dn);
1946 pa_source_new_data_set_sample_spec(&data, &ss);
1947 pa_source_new_data_set_channel_map(&data, &map);
1948 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1949 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1950 if (u->source_name)
1951 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1952
1953 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1954 pa_log("Invalid properties");
1955 pa_source_new_data_done(&data);
1956 goto fail;
1957 }
1958
1959 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1960 pa_source_new_data_done(&data);
1961
1962 if (!u->source) {
1963 pa_log("Failed to create source.");
1964 goto fail;
1965 }
1966
1967 u->source->parent.process_msg = source_process_msg;
1968 u->source->set_state = source_set_state;
1969 u->source->userdata = u;
1970
1971 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1972
1973 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1974 pa_source_set_rtpoll(u->source, u->rtpoll);
1975
1976 u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
1977 #endif
1978
1979 pa_xfree(dn);
1980
1981 u->time_event = NULL;
1982
1983 u->maxlength = (uint32_t) -1;
1984 #ifdef TUNNEL_SINK
1985 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
1986 #else
1987 u->fragsize = (uint32_t) -1;
1988 #endif
1989
1990 if (!(u->thread = pa_thread_new(thread_func, u))) {
1991 pa_log("Failed to create thread.");
1992 goto fail;
1993 }
1994
1995 #ifdef TUNNEL_SINK
1996 pa_sink_put(u->sink);
1997 #else
1998 pa_source_put(u->source);
1999 #endif
2000
2001 pa_modargs_free(ma);
2002
2003 return 0;
2004
2005 fail:
2006 pa__done(m);
2007
2008 if (ma)
2009 pa_modargs_free(ma);
2010
2011 pa_xfree(dn);
2012
2013 return -1;
2014 }
2015
2016 void pa__done(pa_module*m) {
2017 struct userdata* u;
2018
2019 pa_assert(m);
2020
2021 if (!(u = m->userdata))
2022 return;
2023
2024 #ifdef TUNNEL_SINK
2025 if (u->sink)
2026 pa_sink_unlink(u->sink);
2027 #else
2028 if (u->source)
2029 pa_source_unlink(u->source);
2030 #endif
2031
2032 if (u->thread) {
2033 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2034 pa_thread_free(u->thread);
2035 }
2036
2037 pa_thread_mq_done(&u->thread_mq);
2038
2039 #ifdef TUNNEL_SINK
2040 if (u->sink)
2041 pa_sink_unref(u->sink);
2042 #else
2043 if (u->source)
2044 pa_source_unref(u->source);
2045 #endif
2046
2047 if (u->rtpoll)
2048 pa_rtpoll_free(u->rtpoll);
2049
2050 if (u->pstream) {
2051 pa_pstream_unlink(u->pstream);
2052 pa_pstream_unref(u->pstream);
2053 }
2054
2055 if (u->pdispatch)
2056 pa_pdispatch_unref(u->pdispatch);
2057
2058 if (u->client)
2059 pa_socket_client_unref(u->client);
2060
2061 if (u->auth_cookie)
2062 pa_auth_cookie_unref(u->auth_cookie);
2063
2064 if (u->smoother)
2065 pa_smoother_free(u->smoother);
2066
2067 if (u->time_event)
2068 u->core->mainloop->time_free(u->time_event);
2069
2070 #ifndef TUNNEL_SINK
2071 if (u->mcalign)
2072 pa_mcalign_free(u->mcalign);
2073 #endif
2074
2075 #ifdef TUNNEL_SINK
2076 pa_xfree(u->sink_name);
2077 #else
2078 pa_xfree(u->source_name);
2079 #endif
2080 pa_xfree(u->server_name);
2081
2082 pa_xfree(u->device_description);
2083 pa_xfree(u->server_fqdn);
2084 pa_xfree(u->user_name);
2085
2086 pa_xfree(u);
2087 }