]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
Merge branch 'passthrough'
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/sink-input.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/time-smoother.h>
52 #include <pulsecore/thread.h>
53 #include <pulsecore/thread-mq.h>
54 #include <pulsecore/core-rtclock.h>
55 #include <pulsecore/core-error.h>
56 #include <pulsecore/proplist-util.h>
57 #include <pulsecore/auth-cookie.h>
58 #include <pulsecore/mcalign.h>
59
60 #ifdef TUNNEL_SINK
61 #include "module-tunnel-sink-symdef.h"
62 #else
63 #include "module-tunnel-source-symdef.h"
64 #endif
65
66 #ifdef TUNNEL_SINK
67 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
68 PA_MODULE_USAGE(
69 "sink_name=<name for the local sink> "
70 "sink_properties=<properties for the local sink> "
71 "server=<address> "
72 "sink=<remote sink name> "
73 "cookie=<filename> "
74 "format=<sample format> "
75 "channels=<number of channels> "
76 "rate=<sample rate> "
77 "channel_map=<channel map>");
78 #else
79 PA_MODULE_DESCRIPTION("Tunnel module for sources");
80 PA_MODULE_USAGE(
81 "source_name=<name for the local source> "
82 "source_properties=<properties for the local source> "
83 "server=<address> "
84 "source=<remote source name> "
85 "cookie=<filename> "
86 "format=<sample format> "
87 "channels=<number of channels> "
88 "rate=<sample rate> "
89 "channel_map=<channel map>");
90 #endif
91
92 PA_MODULE_AUTHOR("Lennart Poettering");
93 PA_MODULE_VERSION(PACKAGE_VERSION);
94 PA_MODULE_LOAD_ONCE(FALSE);
95
96 static const char* const valid_modargs[] = {
97 "server",
98 "cookie",
99 "format",
100 "channels",
101 "rate",
102 #ifdef TUNNEL_SINK
103 "sink_name",
104 "sink_properties",
105 "sink",
106 #else
107 "source_name",
108 "source_properties",
109 "source",
110 #endif
111 "channel_map",
112 NULL,
113 };
114
115 #define DEFAULT_TIMEOUT 5
116
117 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
118
119 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
120
121 #ifdef TUNNEL_SINK
122
123 enum {
124 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
125 SINK_MESSAGE_REMOTE_SUSPEND,
126 SINK_MESSAGE_UPDATE_LATENCY,
127 SINK_MESSAGE_POST
128 };
129
130 #define DEFAULT_TLENGTH_MSEC 150
131 #define DEFAULT_MINREQ_MSEC 25
132
133 #else
134
135 enum {
136 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
137 SOURCE_MESSAGE_REMOTE_SUSPEND,
138 SOURCE_MESSAGE_UPDATE_LATENCY
139 };
140
141 #define DEFAULT_FRAGSIZE_MSEC 25
142
143 #endif
144
145 #ifdef TUNNEL_SINK
146 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 #endif
149 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
155 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
156
157 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
158 #ifdef TUNNEL_SINK
159 [PA_COMMAND_REQUEST] = command_request,
160 [PA_COMMAND_STARTED] = command_started,
161 #endif
162 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
163 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
164 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
165 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
166 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
167 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
168 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
169 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
170 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
171 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
172 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
173 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
174 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
175 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
176 };
177
178 struct userdata {
179 pa_core *core;
180 pa_module *module;
181
182 pa_thread_mq thread_mq;
183 pa_rtpoll *rtpoll;
184 pa_thread *thread;
185
186 pa_socket_client *client;
187 pa_pstream *pstream;
188 pa_pdispatch *pdispatch;
189
190 char *server_name;
191 #ifdef TUNNEL_SINK
192 char *sink_name;
193 pa_sink *sink;
194 size_t requested_bytes;
195 #else
196 char *source_name;
197 pa_source *source;
198 pa_mcalign *mcalign;
199 #endif
200
201 pa_auth_cookie *auth_cookie;
202
203 uint32_t version;
204 uint32_t ctag;
205 uint32_t device_index;
206 uint32_t channel;
207
208 int64_t counter, counter_delta;
209
210 pa_bool_t remote_corked:1;
211 pa_bool_t remote_suspended:1;
212
213 pa_usec_t transport_usec; /* maintained in the main thread */
214 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
215
216 uint32_t ignore_latency_before;
217
218 pa_time_event *time_event;
219
220 pa_smoother *smoother;
221
222 char *device_description;
223 char *server_fqdn;
224 char *user_name;
225
226 uint32_t maxlength;
227 #ifdef TUNNEL_SINK
228 uint32_t tlength;
229 uint32_t minreq;
230 uint32_t prebuf;
231 #else
232 uint32_t fragsize;
233 #endif
234 };
235
236 static void request_latency(struct userdata *u);
237
238 /* Called from main context */
239 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
240 pa_log_debug("Got stream or client event.");
241 }
242
243 /* Called from main context */
244 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
245 struct userdata *u = userdata;
246
247 pa_assert(pd);
248 pa_assert(t);
249 pa_assert(u);
250 pa_assert(u->pdispatch == pd);
251
252 pa_log_warn("Stream killed");
253 pa_module_unload_request(u->module, TRUE);
254 }
255
256 /* Called from main context */
257 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
258 struct userdata *u = userdata;
259
260 pa_assert(pd);
261 pa_assert(t);
262 pa_assert(u);
263 pa_assert(u->pdispatch == pd);
264
265 pa_log_info("Server signalled buffer overrun/underrun.");
266 request_latency(u);
267 }
268
269 /* Called from main context */
270 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
271 struct userdata *u = userdata;
272 uint32_t channel;
273 pa_bool_t suspended;
274
275 pa_assert(pd);
276 pa_assert(t);
277 pa_assert(u);
278 pa_assert(u->pdispatch == pd);
279
280 if (pa_tagstruct_getu32(t, &channel) < 0 ||
281 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
282 !pa_tagstruct_eof(t)) {
283
284 pa_log("Invalid packet.");
285 pa_module_unload_request(u->module, TRUE);
286 return;
287 }
288
289 pa_log_debug("Server reports device suspend.");
290
291 #ifdef TUNNEL_SINK
292 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #else
294 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
295 #endif
296
297 request_latency(u);
298 }
299
300 /* Called from main context */
301 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
302 struct userdata *u = userdata;
303 uint32_t channel, di;
304 const char *dn;
305 pa_bool_t suspended;
306
307 pa_assert(pd);
308 pa_assert(t);
309 pa_assert(u);
310 pa_assert(u->pdispatch == pd);
311
312 if (pa_tagstruct_getu32(t, &channel) < 0 ||
313 pa_tagstruct_getu32(t, &di) < 0 ||
314 pa_tagstruct_gets(t, &dn) < 0 ||
315 pa_tagstruct_get_boolean(t, &suspended) < 0) {
316
317 pa_log_error("Invalid packet.");
318 pa_module_unload_request(u->module, TRUE);
319 return;
320 }
321
322 pa_log_debug("Server reports a stream move.");
323
324 #ifdef TUNNEL_SINK
325 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #else
327 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
328 #endif
329
330 request_latency(u);
331 }
332
333 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
334 struct userdata *u = userdata;
335 uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
336 pa_usec_t usec;
337
338 pa_assert(pd);
339 pa_assert(t);
340 pa_assert(u);
341 pa_assert(u->pdispatch == pd);
342
343 if (pa_tagstruct_getu32(t, &channel) < 0 ||
344 pa_tagstruct_getu32(t, &maxlength) < 0) {
345
346 pa_log_error("Invalid packet.");
347 pa_module_unload_request(u->module, TRUE);
348 return;
349 }
350
351 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
352 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
353 pa_tagstruct_get_usec(t, &usec) < 0) {
354
355 pa_log_error("Invalid packet.");
356 pa_module_unload_request(u->module, TRUE);
357 return;
358 }
359 } else {
360 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
361 pa_tagstruct_getu32(t, &prebuf) < 0 ||
362 pa_tagstruct_getu32(t, &minreq) < 0 ||
363 pa_tagstruct_get_usec(t, &usec) < 0) {
364
365 pa_log_error("Invalid packet.");
366 pa_module_unload_request(u->module, TRUE);
367 return;
368 }
369 }
370
371 #ifdef TUNNEL_SINK
372 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
373 #endif
374
375 request_latency(u);
376 }
377
378 #ifdef TUNNEL_SINK
379
380 /* Called from main context */
381 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
382 struct userdata *u = userdata;
383
384 pa_assert(pd);
385 pa_assert(t);
386 pa_assert(u);
387 pa_assert(u->pdispatch == pd);
388
389 pa_log_debug("Server reports playback started.");
390 request_latency(u);
391 }
392
393 #endif
394
395 /* Called from IO thread context */
396 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
397 pa_usec_t x;
398
399 pa_assert(u);
400
401 x = pa_rtclock_now();
402
403 /* Correct by the time the requested issued needs to travel to the
404 * other side. This is a valid thread-safe access, because the
405 * main thread is waiting for us */
406
407 if (past)
408 x -= u->thread_transport_usec;
409 else
410 x += u->thread_transport_usec;
411
412 if (u->remote_suspended || u->remote_corked)
413 pa_smoother_pause(u->smoother, x);
414 else
415 pa_smoother_resume(u->smoother, x, TRUE);
416 }
417
418 /* Called from IO thread context */
419 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
420 pa_assert(u);
421
422 if (u->remote_corked == cork)
423 return;
424
425 u->remote_corked = cork;
426 check_smoother_status(u, FALSE);
427 }
428
429 /* Called from main context */
430 static void stream_cork(struct userdata *u, pa_bool_t cork) {
431 pa_tagstruct *t;
432 pa_assert(u);
433
434 if (!u->pstream)
435 return;
436
437 t = pa_tagstruct_new(NULL, 0);
438 #ifdef TUNNEL_SINK
439 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
440 #else
441 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
442 #endif
443 pa_tagstruct_putu32(t, u->ctag++);
444 pa_tagstruct_putu32(t, u->channel);
445 pa_tagstruct_put_boolean(t, !!cork);
446 pa_pstream_send_tagstruct(u->pstream, t);
447
448 request_latency(u);
449 }
450
451 /* Called from IO thread context */
452 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
453 pa_assert(u);
454
455 if (u->remote_suspended == suspend)
456 return;
457
458 u->remote_suspended = suspend;
459 check_smoother_status(u, TRUE);
460 }
461
462 #ifdef TUNNEL_SINK
463
464 /* Called from IO thread context */
465 static void send_data(struct userdata *u) {
466 pa_assert(u);
467
468 while (u->requested_bytes > 0) {
469 pa_memchunk memchunk;
470
471 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
472 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
473 pa_memblock_unref(memchunk.memblock);
474
475 u->requested_bytes -= memchunk.length;
476
477 u->counter += (int64_t) memchunk.length;
478 }
479 }
480
481 /* This function is called from IO context -- except when it is not. */
482 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
483 struct userdata *u = PA_SINK(o)->userdata;
484
485 switch (code) {
486
487 case PA_SINK_MESSAGE_SET_STATE: {
488 int r;
489
490 /* First, change the state, because otherwide pa_sink_render() would fail */
491 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
492
493 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
494
495 if (PA_SINK_IS_OPENED(u->sink->state))
496 send_data(u);
497 }
498
499 return r;
500 }
501
502 case PA_SINK_MESSAGE_GET_LATENCY: {
503 pa_usec_t yl, yr, *usec = data;
504
505 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
506 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
507
508 *usec = yl > yr ? yl - yr : 0;
509 return 0;
510 }
511
512 case SINK_MESSAGE_REQUEST:
513
514 pa_assert(offset > 0);
515 u->requested_bytes += (size_t) offset;
516
517 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
518 send_data(u);
519
520 return 0;
521
522
523 case SINK_MESSAGE_REMOTE_SUSPEND:
524
525 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
526 return 0;
527
528
529 case SINK_MESSAGE_UPDATE_LATENCY: {
530 pa_usec_t y;
531
532 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
533
534 if (y > (pa_usec_t) offset)
535 y -= (pa_usec_t) offset;
536 else
537 y = 0;
538
539 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
540
541 /* We can access this freely here, since the main thread is waiting for us */
542 u->thread_transport_usec = u->transport_usec;
543
544 return 0;
545 }
546
547 case SINK_MESSAGE_POST:
548
549 /* OK, This might be a bit confusing. This message is
550 * delivered to us from the main context -- NOT from the
551 * IO thread context where the rest of the messages are
552 * dispatched. Yeah, ugly, but I am a lazy bastard. */
553
554 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
555
556 u->counter_delta += (int64_t) chunk->length;
557
558 return 0;
559 }
560
561 return pa_sink_process_msg(o, code, data, offset, chunk);
562 }
563
564 /* Called from main context */
565 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
566 struct userdata *u;
567 pa_sink_assert_ref(s);
568 u = s->userdata;
569
570 switch ((pa_sink_state_t) state) {
571
572 case PA_SINK_SUSPENDED:
573 pa_assert(PA_SINK_IS_OPENED(s->state));
574 stream_cork(u, TRUE);
575 break;
576
577 case PA_SINK_IDLE:
578 case PA_SINK_RUNNING:
579 if (s->state == PA_SINK_SUSPENDED)
580 stream_cork(u, FALSE);
581 break;
582
583 case PA_SINK_UNLINKED:
584 case PA_SINK_INIT:
585 case PA_SINK_INVALID_STATE:
586 ;
587 }
588
589 return 0;
590 }
591
592 #else
593
594 /* This function is called from IO context -- except when it is not. */
595 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
596 struct userdata *u = PA_SOURCE(o)->userdata;
597
598 switch (code) {
599
600 case PA_SOURCE_MESSAGE_SET_STATE: {
601 int r;
602
603 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
604 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
605
606 return r;
607 }
608
609 case PA_SOURCE_MESSAGE_GET_LATENCY: {
610 pa_usec_t yr, yl, *usec = data;
611
612 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
613 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
614
615 *usec = yr > yl ? yr - yl : 0;
616 return 0;
617 }
618
619 case SOURCE_MESSAGE_POST: {
620 pa_memchunk c;
621
622 pa_mcalign_push(u->mcalign, chunk);
623
624 while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
625
626 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
627 pa_source_post(u->source, &c);
628
629 pa_memblock_unref(c.memblock);
630
631 u->counter += (int64_t) c.length;
632 }
633
634 return 0;
635 }
636
637 case SOURCE_MESSAGE_REMOTE_SUSPEND:
638
639 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
640 return 0;
641
642 case SOURCE_MESSAGE_UPDATE_LATENCY: {
643 pa_usec_t y;
644
645 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
646 y += (pa_usec_t) offset;
647
648 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
649
650 /* We can access this freely here, since the main thread is waiting for us */
651 u->thread_transport_usec = u->transport_usec;
652
653 return 0;
654 }
655 }
656
657 return pa_source_process_msg(o, code, data, offset, chunk);
658 }
659
660 /* Called from main context */
661 static int source_set_state(pa_source *s, pa_source_state_t state) {
662 struct userdata *u;
663 pa_source_assert_ref(s);
664 u = s->userdata;
665
666 switch ((pa_source_state_t) state) {
667
668 case PA_SOURCE_SUSPENDED:
669 pa_assert(PA_SOURCE_IS_OPENED(s->state));
670 stream_cork(u, TRUE);
671 break;
672
673 case PA_SOURCE_IDLE:
674 case PA_SOURCE_RUNNING:
675 if (s->state == PA_SOURCE_SUSPENDED)
676 stream_cork(u, FALSE);
677 break;
678
679 case PA_SOURCE_UNLINKED:
680 case PA_SOURCE_INIT:
681 case PA_SINK_INVALID_STATE:
682 ;
683 }
684
685 return 0;
686 }
687
688 #endif
689
690 static void thread_func(void *userdata) {
691 struct userdata *u = userdata;
692
693 pa_assert(u);
694
695 pa_log_debug("Thread starting up");
696
697 pa_thread_mq_install(&u->thread_mq);
698
699 for (;;) {
700 int ret;
701
702 #ifdef TUNNEL_SINK
703 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
704 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
705 pa_sink_process_rewind(u->sink, 0);
706 #endif
707
708 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
709 goto fail;
710
711 if (ret == 0)
712 goto finish;
713 }
714
715 fail:
716 /* If this was no regular exit from the loop we have to continue
717 * processing messages until we received PA_MESSAGE_SHUTDOWN */
718 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
719 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
720
721 finish:
722 pa_log_debug("Thread shutting down");
723 }
724
725 #ifdef TUNNEL_SINK
726 /* Called from main context */
727 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
728 struct userdata *u = userdata;
729 uint32_t bytes, channel;
730
731 pa_assert(pd);
732 pa_assert(command == PA_COMMAND_REQUEST);
733 pa_assert(t);
734 pa_assert(u);
735 pa_assert(u->pdispatch == pd);
736
737 if (pa_tagstruct_getu32(t, &channel) < 0 ||
738 pa_tagstruct_getu32(t, &bytes) < 0) {
739 pa_log("Invalid protocol reply");
740 goto fail;
741 }
742
743 if (channel != u->channel) {
744 pa_log("Received data for invalid channel");
745 goto fail;
746 }
747
748 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
749 return;
750
751 fail:
752 pa_module_unload_request(u->module, TRUE);
753 }
754
755 #endif
756
757 /* Called from main context */
758 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
759 struct userdata *u = userdata;
760 pa_usec_t sink_usec, source_usec;
761 pa_bool_t playing;
762 int64_t write_index, read_index;
763 struct timeval local, remote, now;
764 pa_sample_spec *ss;
765 int64_t delay;
766
767 pa_assert(pd);
768 pa_assert(u);
769
770 if (command != PA_COMMAND_REPLY) {
771 if (command == PA_COMMAND_ERROR)
772 pa_log("Failed to get latency.");
773 else
774 pa_log("Protocol error.");
775 goto fail;
776 }
777
778 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
779 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
780 pa_tagstruct_get_boolean(t, &playing) < 0 ||
781 pa_tagstruct_get_timeval(t, &local) < 0 ||
782 pa_tagstruct_get_timeval(t, &remote) < 0 ||
783 pa_tagstruct_gets64(t, &write_index) < 0 ||
784 pa_tagstruct_gets64(t, &read_index) < 0) {
785 pa_log("Invalid reply.");
786 goto fail;
787 }
788
789 #ifdef TUNNEL_SINK
790 if (u->version >= 13) {
791 uint64_t underrun_for = 0, playing_for = 0;
792
793 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
794 pa_tagstruct_getu64(t, &playing_for) < 0) {
795 pa_log("Invalid reply.");
796 goto fail;
797 }
798 }
799 #endif
800
801 if (!pa_tagstruct_eof(t)) {
802 pa_log("Invalid reply.");
803 goto fail;
804 }
805
806 if (tag < u->ignore_latency_before) {
807 return;
808 }
809
810 pa_gettimeofday(&now);
811
812 /* Calculate transport usec */
813 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
814 /* local and remote seem to have synchronized clocks */
815 #ifdef TUNNEL_SINK
816 u->transport_usec = pa_timeval_diff(&remote, &local);
817 #else
818 u->transport_usec = pa_timeval_diff(&now, &remote);
819 #endif
820 } else
821 u->transport_usec = pa_timeval_diff(&now, &local)/2;
822
823 /* First, take the device's delay */
824 #ifdef TUNNEL_SINK
825 delay = (int64_t) sink_usec;
826 ss = &u->sink->sample_spec;
827 #else
828 delay = (int64_t) source_usec;
829 ss = &u->source->sample_spec;
830 #endif
831
832 /* Add the length of our server-side buffer */
833 if (write_index >= read_index)
834 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
835 else
836 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
837
838 /* Our measurements are already out of date, hence correct by the *
839 * transport latency */
840 #ifdef TUNNEL_SINK
841 delay -= (int64_t) u->transport_usec;
842 #else
843 delay += (int64_t) u->transport_usec;
844 #endif
845
846 /* Now correct by what we have have read/written since we requested the update */
847 #ifdef TUNNEL_SINK
848 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
849 #else
850 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
851 #endif
852
853 #ifdef TUNNEL_SINK
854 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
855 #else
856 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
857 #endif
858
859 return;
860
861 fail:
862
863 pa_module_unload_request(u->module, TRUE);
864 }
865
866 /* Called from main context */
867 static void request_latency(struct userdata *u) {
868 pa_tagstruct *t;
869 struct timeval now;
870 uint32_t tag;
871 pa_assert(u);
872
873 t = pa_tagstruct_new(NULL, 0);
874 #ifdef TUNNEL_SINK
875 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
876 #else
877 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
878 #endif
879 pa_tagstruct_putu32(t, tag = u->ctag++);
880 pa_tagstruct_putu32(t, u->channel);
881
882 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
883
884 pa_pstream_send_tagstruct(u->pstream, t);
885 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
886
887 u->ignore_latency_before = tag;
888 u->counter_delta = 0;
889 }
890
891 /* Called from main context */
892 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
893 struct userdata *u = userdata;
894
895 pa_assert(m);
896 pa_assert(e);
897 pa_assert(u);
898
899 request_latency(u);
900
901 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
902 }
903
904 /* Called from main context */
905 static void update_description(struct userdata *u) {
906 char *d;
907 char un[128], hn[128];
908 pa_tagstruct *t;
909
910 pa_assert(u);
911
912 if (!u->server_fqdn || !u->user_name || !u->device_description)
913 return;
914
915 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
916
917 #ifdef TUNNEL_SINK
918 pa_sink_set_description(u->sink, d);
919 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
920 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
921 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
922 #else
923 pa_source_set_description(u->source, d);
924 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
925 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
926 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
927 #endif
928
929 pa_xfree(d);
930
931 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
932 pa_get_user_name(un, sizeof(un)),
933 pa_get_host_name(hn, sizeof(hn)));
934
935 t = pa_tagstruct_new(NULL, 0);
936 #ifdef TUNNEL_SINK
937 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
938 #else
939 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
940 #endif
941 pa_tagstruct_putu32(t, u->ctag++);
942 pa_tagstruct_putu32(t, u->channel);
943 pa_tagstruct_puts(t, d);
944 pa_pstream_send_tagstruct(u->pstream, t);
945
946 pa_xfree(d);
947 }
948
949 /* Called from main context */
950 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
951 struct userdata *u = userdata;
952 pa_sample_spec ss;
953 pa_channel_map cm;
954 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
955 uint32_t cookie;
956
957 pa_assert(pd);
958 pa_assert(u);
959
960 if (command != PA_COMMAND_REPLY) {
961 if (command == PA_COMMAND_ERROR)
962 pa_log("Failed to get info.");
963 else
964 pa_log("Protocol error.");
965 goto fail;
966 }
967
968 if (pa_tagstruct_gets(t, &server_name) < 0 ||
969 pa_tagstruct_gets(t, &server_version) < 0 ||
970 pa_tagstruct_gets(t, &user_name) < 0 ||
971 pa_tagstruct_gets(t, &host_name) < 0 ||
972 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
973 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
974 pa_tagstruct_gets(t, &default_source_name) < 0 ||
975 pa_tagstruct_getu32(t, &cookie) < 0 ||
976 (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
977
978 pa_log("Parse failure");
979 goto fail;
980 }
981
982 if (!pa_tagstruct_eof(t)) {
983 pa_log("Packet too long");
984 goto fail;
985 }
986
987 pa_xfree(u->server_fqdn);
988 u->server_fqdn = pa_xstrdup(host_name);
989
990 pa_xfree(u->user_name);
991 u->user_name = pa_xstrdup(user_name);
992
993 update_description(u);
994
995 return;
996
997 fail:
998 pa_module_unload_request(u->module, TRUE);
999 }
1000
1001 #ifdef TUNNEL_SINK
1002
1003 /* Called from main context */
1004 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1005 struct userdata *u = userdata;
1006 uint32_t idx, owner_module, monitor_source, flags;
1007 const char *name, *description, *monitor_source_name, *driver;
1008 pa_sample_spec ss;
1009 pa_channel_map cm;
1010 pa_cvolume volume;
1011 pa_bool_t mute;
1012 pa_usec_t latency;
1013 pa_proplist *pl;
1014
1015 pa_assert(pd);
1016 pa_assert(u);
1017
1018 pl = pa_proplist_new();
1019
1020 if (command != PA_COMMAND_REPLY) {
1021 if (command == PA_COMMAND_ERROR)
1022 pa_log("Failed to get info.");
1023 else
1024 pa_log("Protocol error.");
1025 goto fail;
1026 }
1027
1028 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1029 pa_tagstruct_gets(t, &name) < 0 ||
1030 pa_tagstruct_gets(t, &description) < 0 ||
1031 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1032 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1033 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1034 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1035 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1036 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1037 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1038 pa_tagstruct_get_usec(t, &latency) < 0 ||
1039 pa_tagstruct_gets(t, &driver) < 0 ||
1040 pa_tagstruct_getu32(t, &flags) < 0) {
1041
1042 pa_log("Parse failure");
1043 goto fail;
1044 }
1045
1046 if (u->version >= 13) {
1047 pa_usec_t configured_latency;
1048
1049 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1050 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1051
1052 pa_log("Parse failure");
1053 goto fail;
1054 }
1055 }
1056
1057 if (u->version >= 15) {
1058 pa_volume_t base_volume;
1059 uint32_t state, n_volume_steps, card;
1060
1061 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1062 pa_tagstruct_getu32(t, &state) < 0 ||
1063 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1064 pa_tagstruct_getu32(t, &card) < 0) {
1065
1066 pa_log("Parse failure");
1067 goto fail;
1068 }
1069 }
1070
1071 if (u->version >= 16) {
1072 uint32_t n_ports;
1073 const char *s;
1074
1075 if (pa_tagstruct_getu32(t, &n_ports)) {
1076 pa_log("Parse failure");
1077 goto fail;
1078 }
1079
1080 for (uint32_t j = 0; j < n_ports; j++) {
1081 uint32_t priority;
1082
1083 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1084 pa_tagstruct_gets(t, &s) < 0 || /* description */
1085 pa_tagstruct_getu32(t, &priority) < 0) {
1086
1087 pa_log("Parse failure");
1088 goto fail;
1089 }
1090 }
1091
1092 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1093 pa_log("Parse failure");
1094 goto fail;
1095 }
1096 }
1097
1098 if (u->version >= 21) {
1099 uint8_t n_formats;
1100 pa_format_info format;
1101
1102 if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1103 pa_log("Parse failure");
1104 goto fail;
1105 }
1106
1107 for (uint8_t j = 0; j < n_formats; j++) {
1108 if (pa_tagstruct_get_format_info(t, &format)) { /* format info */
1109 pa_log("Parse failure");
1110 goto fail;
1111 }
1112 }
1113 }
1114
1115 if (!pa_tagstruct_eof(t)) {
1116 pa_log("Packet too long");
1117 goto fail;
1118 }
1119
1120 pa_proplist_free(pl);
1121
1122 if (!u->sink_name || strcmp(name, u->sink_name))
1123 return;
1124
1125 pa_xfree(u->device_description);
1126 u->device_description = pa_xstrdup(description);
1127
1128 update_description(u);
1129
1130 return;
1131
1132 fail:
1133 pa_module_unload_request(u->module, TRUE);
1134 pa_proplist_free(pl);
1135 }
1136
1137 /* Called from main context */
1138 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1139 struct userdata *u = userdata;
1140 uint32_t idx, owner_module, client, sink;
1141 pa_usec_t buffer_usec, sink_usec;
1142 const char *name, *driver, *resample_method;
1143 pa_bool_t mute = FALSE;
1144 pa_sample_spec sample_spec;
1145 pa_channel_map channel_map;
1146 pa_cvolume volume;
1147 pa_proplist *pl;
1148 pa_bool_t b;
1149
1150 pa_assert(pd);
1151 pa_assert(u);
1152
1153 pl = pa_proplist_new();
1154
1155 if (command != PA_COMMAND_REPLY) {
1156 if (command == PA_COMMAND_ERROR)
1157 pa_log("Failed to get info.");
1158 else
1159 pa_log("Protocol error.");
1160 goto fail;
1161 }
1162
1163 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1164 pa_tagstruct_gets(t, &name) < 0 ||
1165 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1166 pa_tagstruct_getu32(t, &client) < 0 ||
1167 pa_tagstruct_getu32(t, &sink) < 0 ||
1168 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1169 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1170 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1171 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1172 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1173 pa_tagstruct_gets(t, &resample_method) < 0 ||
1174 pa_tagstruct_gets(t, &driver) < 0) {
1175
1176 pa_log("Parse failure");
1177 goto fail;
1178 }
1179
1180 if (u->version >= 11) {
1181 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1182
1183 pa_log("Parse failure");
1184 goto fail;
1185 }
1186 }
1187
1188 if (u->version >= 13) {
1189 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1190
1191 pa_log("Parse failure");
1192 goto fail;
1193 }
1194 }
1195
1196 if (u->version >= 19) {
1197 if (pa_tagstruct_get_boolean(t, &b) < 0) {
1198
1199 pa_log("Parse failure");
1200 goto fail;
1201 }
1202 }
1203
1204 if (u->version >= 20) {
1205 if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1206 pa_tagstruct_get_boolean(t, &b) < 0) {
1207
1208 pa_log("Parse failure");
1209 goto fail;
1210 }
1211 }
1212
1213 if (u->version >= 21) {
1214 pa_format_info format;
1215
1216 if (pa_tagstruct_get_format_info(t, &format) < 0) {
1217
1218 pa_log("Parse failure");
1219 goto fail;
1220 }
1221 }
1222
1223 if (!pa_tagstruct_eof(t)) {
1224 pa_log("Packet too long");
1225 goto fail;
1226 }
1227
1228 pa_proplist_free(pl);
1229
1230 if (idx != u->device_index)
1231 return;
1232
1233 pa_assert(u->sink);
1234
1235 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1236 pa_cvolume_equal(&volume, &u->sink->real_volume))
1237 return;
1238
1239 pa_sink_volume_changed(u->sink, &volume);
1240
1241 if (u->version >= 11)
1242 pa_sink_mute_changed(u->sink, mute);
1243
1244 return;
1245
1246 fail:
1247 pa_module_unload_request(u->module, TRUE);
1248 pa_proplist_free(pl);
1249 }
1250
1251 #else
1252
1253 /* Called from main context */
1254 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1255 struct userdata *u = userdata;
1256 uint32_t idx, owner_module, monitor_of_sink, flags;
1257 const char *name, *description, *monitor_of_sink_name, *driver;
1258 pa_sample_spec ss;
1259 pa_channel_map cm;
1260 pa_cvolume volume;
1261 pa_bool_t mute;
1262 pa_usec_t latency, configured_latency;
1263 pa_proplist *pl;
1264
1265 pa_assert(pd);
1266 pa_assert(u);
1267
1268 pl = pa_proplist_new();
1269
1270 if (command != PA_COMMAND_REPLY) {
1271 if (command == PA_COMMAND_ERROR)
1272 pa_log("Failed to get info.");
1273 else
1274 pa_log("Protocol error.");
1275 goto fail;
1276 }
1277
1278 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1279 pa_tagstruct_gets(t, &name) < 0 ||
1280 pa_tagstruct_gets(t, &description) < 0 ||
1281 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1282 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1283 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1284 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1285 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1286 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1287 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1288 pa_tagstruct_get_usec(t, &latency) < 0 ||
1289 pa_tagstruct_gets(t, &driver) < 0 ||
1290 pa_tagstruct_getu32(t, &flags) < 0) {
1291
1292 pa_log("Parse failure");
1293 goto fail;
1294 }
1295
1296 if (u->version >= 13) {
1297 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1298 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1299
1300 pa_log("Parse failure");
1301 goto fail;
1302 }
1303 }
1304
1305 if (u->version >= 15) {
1306 pa_volume_t base_volume;
1307 uint32_t state, n_volume_steps, card;
1308
1309 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1310 pa_tagstruct_getu32(t, &state) < 0 ||
1311 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1312 pa_tagstruct_getu32(t, &card) < 0) {
1313
1314 pa_log("Parse failure");
1315 goto fail;
1316 }
1317 }
1318
1319 if (u->version >= 16) {
1320 uint32_t n_ports;
1321 const char *s;
1322
1323 if (pa_tagstruct_getu32(t, &n_ports)) {
1324 pa_log("Parse failure");
1325 goto fail;
1326 }
1327
1328 for (uint32_t j = 0; j < n_ports; j++) {
1329 uint32_t priority;
1330
1331 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1332 pa_tagstruct_gets(t, &s) < 0 || /* description */
1333 pa_tagstruct_getu32(t, &priority) < 0) {
1334
1335 pa_log("Parse failure");
1336 goto fail;
1337 }
1338 }
1339
1340 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1341 pa_log("Parse failure");
1342 goto fail;
1343 }
1344 }
1345
1346 if (!pa_tagstruct_eof(t)) {
1347 pa_log("Packet too long");
1348 goto fail;
1349 }
1350
1351 pa_proplist_free(pl);
1352
1353 if (!u->source_name || strcmp(name, u->source_name))
1354 return;
1355
1356 pa_xfree(u->device_description);
1357 u->device_description = pa_xstrdup(description);
1358
1359 update_description(u);
1360
1361 return;
1362
1363 fail:
1364 pa_module_unload_request(u->module, TRUE);
1365 pa_proplist_free(pl);
1366 }
1367
1368 #endif
1369
1370 /* Called from main context */
1371 static void request_info(struct userdata *u) {
1372 pa_tagstruct *t;
1373 uint32_t tag;
1374 pa_assert(u);
1375
1376 t = pa_tagstruct_new(NULL, 0);
1377 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1378 pa_tagstruct_putu32(t, tag = u->ctag++);
1379 pa_pstream_send_tagstruct(u->pstream, t);
1380 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1381
1382 #ifdef TUNNEL_SINK
1383 t = pa_tagstruct_new(NULL, 0);
1384 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1385 pa_tagstruct_putu32(t, tag = u->ctag++);
1386 pa_tagstruct_putu32(t, u->device_index);
1387 pa_pstream_send_tagstruct(u->pstream, t);
1388 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1389
1390 if (u->sink_name) {
1391 t = pa_tagstruct_new(NULL, 0);
1392 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1393 pa_tagstruct_putu32(t, tag = u->ctag++);
1394 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1395 pa_tagstruct_puts(t, u->sink_name);
1396 pa_pstream_send_tagstruct(u->pstream, t);
1397 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1398 }
1399 #else
1400 if (u->source_name) {
1401 t = pa_tagstruct_new(NULL, 0);
1402 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1403 pa_tagstruct_putu32(t, tag = u->ctag++);
1404 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1405 pa_tagstruct_puts(t, u->source_name);
1406 pa_pstream_send_tagstruct(u->pstream, t);
1407 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1408 }
1409 #endif
1410 }
1411
1412 /* Called from main context */
1413 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1414 struct userdata *u = userdata;
1415 pa_subscription_event_type_t e;
1416 uint32_t idx;
1417
1418 pa_assert(pd);
1419 pa_assert(t);
1420 pa_assert(u);
1421 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1422
1423 if (pa_tagstruct_getu32(t, &e) < 0 ||
1424 pa_tagstruct_getu32(t, &idx) < 0) {
1425 pa_log("Invalid protocol reply");
1426 pa_module_unload_request(u->module, TRUE);
1427 return;
1428 }
1429
1430 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1431 #ifdef TUNNEL_SINK
1432 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1433 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1434 #else
1435 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1436 #endif
1437 )
1438 return;
1439
1440 request_info(u);
1441 }
1442
1443 /* Called from main context */
1444 static void start_subscribe(struct userdata *u) {
1445 pa_tagstruct *t;
1446 pa_assert(u);
1447
1448 t = pa_tagstruct_new(NULL, 0);
1449 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1450 pa_tagstruct_putu32(t, u->ctag++);
1451 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1452 #ifdef TUNNEL_SINK
1453 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1454 #else
1455 PA_SUBSCRIPTION_MASK_SOURCE
1456 #endif
1457 );
1458
1459 pa_pstream_send_tagstruct(u->pstream, t);
1460 }
1461
1462 /* Called from main context */
1463 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1464 struct userdata *u = userdata;
1465 #ifdef TUNNEL_SINK
1466 uint32_t bytes;
1467 #endif
1468
1469 pa_assert(pd);
1470 pa_assert(u);
1471 pa_assert(u->pdispatch == pd);
1472
1473 if (command != PA_COMMAND_REPLY) {
1474 if (command == PA_COMMAND_ERROR)
1475 pa_log("Failed to create stream.");
1476 else
1477 pa_log("Protocol error.");
1478 goto fail;
1479 }
1480
1481 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1482 pa_tagstruct_getu32(t, &u->device_index) < 0
1483 #ifdef TUNNEL_SINK
1484 || pa_tagstruct_getu32(t, &bytes) < 0
1485 #endif
1486 )
1487 goto parse_error;
1488
1489 if (u->version >= 9) {
1490 #ifdef TUNNEL_SINK
1491 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1492 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1493 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1494 pa_tagstruct_getu32(t, &u->minreq) < 0)
1495 goto parse_error;
1496 #else
1497 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1498 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1499 goto parse_error;
1500 #endif
1501 }
1502
1503 if (u->version >= 12) {
1504 pa_sample_spec ss;
1505 pa_channel_map cm;
1506 uint32_t device_index;
1507 const char *dn;
1508 pa_bool_t suspended;
1509
1510 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1511 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1512 pa_tagstruct_getu32(t, &device_index) < 0 ||
1513 pa_tagstruct_gets(t, &dn) < 0 ||
1514 pa_tagstruct_get_boolean(t, &suspended) < 0)
1515 goto parse_error;
1516
1517 #ifdef TUNNEL_SINK
1518 pa_xfree(u->sink_name);
1519 u->sink_name = pa_xstrdup(dn);
1520 #else
1521 pa_xfree(u->source_name);
1522 u->source_name = pa_xstrdup(dn);
1523 #endif
1524 }
1525
1526 if (u->version >= 13) {
1527 pa_usec_t usec;
1528
1529 if (pa_tagstruct_get_usec(t, &usec) < 0)
1530 goto parse_error;
1531
1532 /* #ifdef TUNNEL_SINK */
1533 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1534 /* #else */
1535 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1536 /* #endif */
1537 }
1538
1539 if (u->version >= 21) {
1540 pa_format_info format;
1541
1542 if (pa_tagstruct_get_format_info(t, &format) < 0)
1543 goto parse_error;
1544 }
1545
1546 if (!pa_tagstruct_eof(t))
1547 goto parse_error;
1548
1549 start_subscribe(u);
1550 request_info(u);
1551
1552 pa_assert(!u->time_event);
1553 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1554
1555 request_latency(u);
1556
1557 pa_log_debug("Stream created.");
1558
1559 #ifdef TUNNEL_SINK
1560 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1561 #endif
1562
1563 return;
1564
1565 parse_error:
1566 pa_log("Invalid reply. (Create stream)");
1567
1568 fail:
1569 pa_module_unload_request(u->module, TRUE);
1570
1571 }
1572
1573 /* Called from main context */
1574 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1575 struct userdata *u = userdata;
1576 pa_tagstruct *reply;
1577 char name[256], un[128], hn[128];
1578 #ifdef TUNNEL_SINK
1579 pa_cvolume volume;
1580 #endif
1581
1582 pa_assert(pd);
1583 pa_assert(u);
1584 pa_assert(u->pdispatch == pd);
1585
1586 if (command != PA_COMMAND_REPLY ||
1587 pa_tagstruct_getu32(t, &u->version) < 0 ||
1588 !pa_tagstruct_eof(t)) {
1589
1590 if (command == PA_COMMAND_ERROR)
1591 pa_log("Failed to authenticate");
1592 else
1593 pa_log("Protocol error.");
1594
1595 goto fail;
1596 }
1597
1598 /* Minimum supported protocol version */
1599 if (u->version < 8) {
1600 pa_log("Incompatible protocol version");
1601 goto fail;
1602 }
1603
1604 /* Starting with protocol version 13 the MSB of the version tag
1605 reflects if shm is enabled for this connection or not. We don't
1606 support SHM here at all, so we just ignore this. */
1607
1608 if (u->version >= 13)
1609 u->version &= 0x7FFFFFFFU;
1610
1611 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1612
1613 #ifdef TUNNEL_SINK
1614 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1615 pa_sink_update_proplist(u->sink, 0, NULL);
1616
1617 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1618 u->sink_name,
1619 pa_get_user_name(un, sizeof(un)),
1620 pa_get_host_name(hn, sizeof(hn)));
1621 #else
1622 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1623 pa_source_update_proplist(u->source, 0, NULL);
1624
1625 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1626 u->source_name,
1627 pa_get_user_name(un, sizeof(un)),
1628 pa_get_host_name(hn, sizeof(hn)));
1629 #endif
1630
1631 reply = pa_tagstruct_new(NULL, 0);
1632 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1633 pa_tagstruct_putu32(reply, u->ctag++);
1634
1635 if (u->version >= 13) {
1636 pa_proplist *pl;
1637 pl = pa_proplist_new();
1638 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1639 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1640 pa_init_proplist(pl);
1641 pa_tagstruct_put_proplist(reply, pl);
1642 pa_proplist_free(pl);
1643 } else
1644 pa_tagstruct_puts(reply, "PulseAudio");
1645
1646 pa_pstream_send_tagstruct(u->pstream, reply);
1647 /* We ignore the server's reply here */
1648
1649 reply = pa_tagstruct_new(NULL, 0);
1650
1651 if (u->version < 13)
1652 /* Only for older PA versions we need to fill in the maxlength */
1653 u->maxlength = 4*1024*1024;
1654
1655 #ifdef TUNNEL_SINK
1656 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1657 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1658 u->prebuf = u->tlength;
1659 #else
1660 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1661 #endif
1662
1663 #ifdef TUNNEL_SINK
1664 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1665 pa_tagstruct_putu32(reply, tag = u->ctag++);
1666
1667 if (u->version < 13)
1668 pa_tagstruct_puts(reply, name);
1669
1670 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1671 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1672 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1673 pa_tagstruct_puts(reply, u->sink_name);
1674 pa_tagstruct_putu32(reply, u->maxlength);
1675 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1676 pa_tagstruct_putu32(reply, u->tlength);
1677 pa_tagstruct_putu32(reply, u->prebuf);
1678 pa_tagstruct_putu32(reply, u->minreq);
1679 pa_tagstruct_putu32(reply, 0);
1680 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1681 pa_tagstruct_put_cvolume(reply, &volume);
1682 #else
1683 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1684 pa_tagstruct_putu32(reply, tag = u->ctag++);
1685
1686 if (u->version < 13)
1687 pa_tagstruct_puts(reply, name);
1688
1689 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1690 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1691 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1692 pa_tagstruct_puts(reply, u->source_name);
1693 pa_tagstruct_putu32(reply, u->maxlength);
1694 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1695 pa_tagstruct_putu32(reply, u->fragsize);
1696 #endif
1697
1698 if (u->version >= 12) {
1699 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1700 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1701 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1702 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1703 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1704 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1705 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1706 }
1707
1708 if (u->version >= 13) {
1709 pa_proplist *pl;
1710
1711 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1712 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1713
1714 pl = pa_proplist_new();
1715 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1716 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1717 pa_tagstruct_put_proplist(reply, pl);
1718 pa_proplist_free(pl);
1719
1720 #ifndef TUNNEL_SINK
1721 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1722 #endif
1723 }
1724
1725 if (u->version >= 14) {
1726 #ifdef TUNNEL_SINK
1727 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1728 #endif
1729 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1730 }
1731
1732 if (u->version >= 15) {
1733 #ifdef TUNNEL_SINK
1734 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1735 #endif
1736 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1737 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1738 }
1739
1740 #ifdef TUNNEL_SINK
1741 if (u->version >= 17)
1742 pa_tagstruct_put_boolean(reply, FALSE); /* relative volume */
1743
1744 if (u->version >= 18)
1745 pa_tagstruct_put_boolean(reply, FALSE); /* passthrough stream */
1746 #endif
1747
1748 #ifdef TUNNEL_SINK
1749 if (u->version >= 21) {
1750 /* We're not using the extended API, so n_formats = 0 and that's that */
1751 pa_tagstruct_putu8(t, 0);
1752 }
1753 #endif
1754
1755 pa_pstream_send_tagstruct(u->pstream, reply);
1756 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1757
1758 pa_log_debug("Connection authenticated, creating stream ...");
1759
1760 return;
1761
1762 fail:
1763 pa_module_unload_request(u->module, TRUE);
1764 }
1765
1766 /* Called from main context */
1767 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1768 struct userdata *u = userdata;
1769
1770 pa_assert(p);
1771 pa_assert(u);
1772
1773 pa_log_warn("Stream died.");
1774 pa_module_unload_request(u->module, TRUE);
1775 }
1776
1777 /* Called from main context */
1778 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1779 struct userdata *u = userdata;
1780
1781 pa_assert(p);
1782 pa_assert(packet);
1783 pa_assert(u);
1784
1785 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1786 pa_log("Invalid packet");
1787 pa_module_unload_request(u->module, TRUE);
1788 return;
1789 }
1790 }
1791
1792 #ifndef TUNNEL_SINK
1793 /* Called from main context */
1794 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1795 struct userdata *u = userdata;
1796
1797 pa_assert(p);
1798 pa_assert(chunk);
1799 pa_assert(u);
1800
1801 if (channel != u->channel) {
1802 pa_log("Received memory block on bad channel.");
1803 pa_module_unload_request(u->module, TRUE);
1804 return;
1805 }
1806
1807 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1808
1809 u->counter_delta += (int64_t) chunk->length;
1810 }
1811 #endif
1812
1813 /* Called from main context */
1814 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1815 struct userdata *u = userdata;
1816 pa_tagstruct *t;
1817 uint32_t tag;
1818
1819 pa_assert(sc);
1820 pa_assert(u);
1821 pa_assert(u->client == sc);
1822
1823 pa_socket_client_unref(u->client);
1824 u->client = NULL;
1825
1826 if (!io) {
1827 pa_log("Connection failed: %s", pa_cstrerror(errno));
1828 pa_module_unload_request(u->module, TRUE);
1829 return;
1830 }
1831
1832 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1833 u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
1834
1835 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1836 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1837 #ifndef TUNNEL_SINK
1838 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1839 #endif
1840
1841 t = pa_tagstruct_new(NULL, 0);
1842 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1843 pa_tagstruct_putu32(t, tag = u->ctag++);
1844 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1845
1846 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1847
1848 #ifdef HAVE_CREDS
1849 {
1850 pa_creds ucred;
1851
1852 if (pa_iochannel_creds_supported(io))
1853 pa_iochannel_creds_enable(io);
1854
1855 ucred.uid = getuid();
1856 ucred.gid = getgid();
1857
1858 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1859 }
1860 #else
1861 pa_pstream_send_tagstruct(u->pstream, t);
1862 #endif
1863
1864 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1865
1866 pa_log_debug("Connection established, authenticating ...");
1867 }
1868
1869 #ifdef TUNNEL_SINK
1870
1871 /* Called from main context */
1872 static void sink_set_volume(pa_sink *sink) {
1873 struct userdata *u;
1874 pa_tagstruct *t;
1875
1876 pa_assert(sink);
1877 u = sink->userdata;
1878 pa_assert(u);
1879
1880 t = pa_tagstruct_new(NULL, 0);
1881 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1882 pa_tagstruct_putu32(t, u->ctag++);
1883 pa_tagstruct_putu32(t, u->device_index);
1884 pa_tagstruct_put_cvolume(t, &sink->real_volume);
1885 pa_pstream_send_tagstruct(u->pstream, t);
1886 }
1887
1888 /* Called from main context */
1889 static void sink_set_mute(pa_sink *sink) {
1890 struct userdata *u;
1891 pa_tagstruct *t;
1892
1893 pa_assert(sink);
1894 u = sink->userdata;
1895 pa_assert(u);
1896
1897 if (u->version < 11)
1898 return;
1899
1900 t = pa_tagstruct_new(NULL, 0);
1901 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1902 pa_tagstruct_putu32(t, u->ctag++);
1903 pa_tagstruct_putu32(t, u->device_index);
1904 pa_tagstruct_put_boolean(t, !!sink->muted);
1905 pa_pstream_send_tagstruct(u->pstream, t);
1906 }
1907
1908 #endif
1909
1910 int pa__init(pa_module*m) {
1911 pa_modargs *ma = NULL;
1912 struct userdata *u = NULL;
1913 pa_sample_spec ss;
1914 pa_channel_map map;
1915 char *dn = NULL;
1916 #ifdef TUNNEL_SINK
1917 pa_sink_new_data data;
1918 #else
1919 pa_source_new_data data;
1920 #endif
1921
1922 pa_assert(m);
1923
1924 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1925 pa_log("Failed to parse module arguments");
1926 goto fail;
1927 }
1928
1929 m->userdata = u = pa_xnew0(struct userdata, 1);
1930 u->core = m->core;
1931 u->module = m;
1932 u->client = NULL;
1933 u->pdispatch = NULL;
1934 u->pstream = NULL;
1935 u->server_name = NULL;
1936 #ifdef TUNNEL_SINK
1937 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1938 u->sink = NULL;
1939 u->requested_bytes = 0;
1940 #else
1941 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1942 u->source = NULL;
1943 #endif
1944 u->smoother = pa_smoother_new(
1945 PA_USEC_PER_SEC,
1946 PA_USEC_PER_SEC*2,
1947 TRUE,
1948 TRUE,
1949 10,
1950 pa_rtclock_now(),
1951 FALSE);
1952 u->ctag = 1;
1953 u->device_index = u->channel = PA_INVALID_INDEX;
1954 u->time_event = NULL;
1955 u->ignore_latency_before = 0;
1956 u->transport_usec = u->thread_transport_usec = 0;
1957 u->remote_suspended = u->remote_corked = FALSE;
1958 u->counter = u->counter_delta = 0;
1959
1960 u->rtpoll = pa_rtpoll_new();
1961 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1962
1963 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1964 goto fail;
1965
1966 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1967 pa_log("No server specified.");
1968 goto fail;
1969 }
1970
1971 ss = m->core->default_sample_spec;
1972 map = m->core->default_channel_map;
1973 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1974 pa_log("Invalid sample format specification");
1975 goto fail;
1976 }
1977
1978 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1979 pa_log("Failed to connect to server '%s'", u->server_name);
1980 goto fail;
1981 }
1982
1983 pa_socket_client_set_callback(u->client, on_connection, u);
1984
1985 #ifdef TUNNEL_SINK
1986
1987 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1988 dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
1989
1990 pa_sink_new_data_init(&data);
1991 data.driver = __FILE__;
1992 data.module = m;
1993 data.namereg_fail = TRUE;
1994 pa_sink_new_data_set_name(&data, dn);
1995 pa_sink_new_data_set_sample_spec(&data, &ss);
1996 pa_sink_new_data_set_channel_map(&data, &map);
1997 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1998 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1999 if (u->sink_name)
2000 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
2001
2002 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2003 pa_log("Invalid properties");
2004 pa_sink_new_data_done(&data);
2005 goto fail;
2006 }
2007
2008 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
2009 pa_sink_new_data_done(&data);
2010
2011 if (!u->sink) {
2012 pa_log("Failed to create sink.");
2013 goto fail;
2014 }
2015
2016 u->sink->parent.process_msg = sink_process_msg;
2017 u->sink->userdata = u;
2018 u->sink->set_state = sink_set_state;
2019 u->sink->set_volume = sink_set_volume;
2020 u->sink->set_mute = sink_set_mute;
2021
2022 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
2023
2024 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2025
2026 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2027 pa_sink_set_rtpoll(u->sink, u->rtpoll);
2028
2029 #else
2030
2031 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
2032 dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2033
2034 pa_source_new_data_init(&data);
2035 data.driver = __FILE__;
2036 data.module = m;
2037 data.namereg_fail = TRUE;
2038 pa_source_new_data_set_name(&data, dn);
2039 pa_source_new_data_set_sample_spec(&data, &ss);
2040 pa_source_new_data_set_channel_map(&data, &map);
2041 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2042 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2043 if (u->source_name)
2044 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2045
2046 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2047 pa_log("Invalid properties");
2048 pa_source_new_data_done(&data);
2049 goto fail;
2050 }
2051
2052 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2053 pa_source_new_data_done(&data);
2054
2055 if (!u->source) {
2056 pa_log("Failed to create source.");
2057 goto fail;
2058 }
2059
2060 u->source->parent.process_msg = source_process_msg;
2061 u->source->set_state = source_set_state;
2062 u->source->userdata = u;
2063
2064 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2065
2066 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2067 pa_source_set_rtpoll(u->source, u->rtpoll);
2068
2069 u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2070 #endif
2071
2072 pa_xfree(dn);
2073
2074 u->time_event = NULL;
2075
2076 u->maxlength = (uint32_t) -1;
2077 #ifdef TUNNEL_SINK
2078 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2079 #else
2080 u->fragsize = (uint32_t) -1;
2081 #endif
2082
2083 if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2084 pa_log("Failed to create thread.");
2085 goto fail;
2086 }
2087
2088 #ifdef TUNNEL_SINK
2089 pa_sink_put(u->sink);
2090 #else
2091 pa_source_put(u->source);
2092 #endif
2093
2094 pa_modargs_free(ma);
2095
2096 return 0;
2097
2098 fail:
2099 pa__done(m);
2100
2101 if (ma)
2102 pa_modargs_free(ma);
2103
2104 pa_xfree(dn);
2105
2106 return -1;
2107 }
2108
2109 void pa__done(pa_module*m) {
2110 struct userdata* u;
2111
2112 pa_assert(m);
2113
2114 if (!(u = m->userdata))
2115 return;
2116
2117 #ifdef TUNNEL_SINK
2118 if (u->sink)
2119 pa_sink_unlink(u->sink);
2120 #else
2121 if (u->source)
2122 pa_source_unlink(u->source);
2123 #endif
2124
2125 if (u->thread) {
2126 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2127 pa_thread_free(u->thread);
2128 }
2129
2130 pa_thread_mq_done(&u->thread_mq);
2131
2132 #ifdef TUNNEL_SINK
2133 if (u->sink)
2134 pa_sink_unref(u->sink);
2135 #else
2136 if (u->source)
2137 pa_source_unref(u->source);
2138 #endif
2139
2140 if (u->rtpoll)
2141 pa_rtpoll_free(u->rtpoll);
2142
2143 if (u->pstream) {
2144 pa_pstream_unlink(u->pstream);
2145 pa_pstream_unref(u->pstream);
2146 }
2147
2148 if (u->pdispatch)
2149 pa_pdispatch_unref(u->pdispatch);
2150
2151 if (u->client)
2152 pa_socket_client_unref(u->client);
2153
2154 if (u->auth_cookie)
2155 pa_auth_cookie_unref(u->auth_cookie);
2156
2157 if (u->smoother)
2158 pa_smoother_free(u->smoother);
2159
2160 if (u->time_event)
2161 u->core->mainloop->time_free(u->time_event);
2162
2163 #ifndef TUNNEL_SINK
2164 if (u->mcalign)
2165 pa_mcalign_free(u->mcalign);
2166 #endif
2167
2168 #ifdef TUNNEL_SINK
2169 pa_xfree(u->sink_name);
2170 #else
2171 pa_xfree(u->source_name);
2172 #endif
2173 pa_xfree(u->server_name);
2174
2175 pa_xfree(u->device_description);
2176 pa_xfree(u->server_fqdn);
2177 pa_xfree(u->user_name);
2178
2179 pa_xfree(u);
2180 }