]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
More spelling fixes
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/time-smoother.h>
50 #include <pulsecore/thread.h>
51 #include <pulsecore/thread-mq.h>
52 #include <pulsecore/core-rtclock.h>
53 #include <pulsecore/core-error.h>
54 #include <pulsecore/proplist-util.h>
55 #include <pulsecore/auth-cookie.h>
56 #include <pulsecore/mcalign.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67 "sink_name=<name for the local sink> "
68 "sink_properties=<properties for the local sink> "
69 "server=<address> "
70 "sink=<remote sink name> "
71 "cookie=<filename> "
72 "format=<sample format> "
73 "channels=<number of channels> "
74 "rate=<sample rate> "
75 "channel_map=<channel map>");
76 #else
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
78 PA_MODULE_USAGE(
79 "source_name=<name for the local source> "
80 "source_properties=<properties for the local source> "
81 "server=<address> "
82 "source=<remote source name> "
83 "cookie=<filename> "
84 "format=<sample format> "
85 "channels=<number of channels> "
86 "rate=<sample rate> "
87 "channel_map=<channel map>");
88 #endif
89
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION);
92 PA_MODULE_LOAD_ONCE(FALSE);
93
94 static const char* const valid_modargs[] = {
95 "server",
96 "cookie",
97 "format",
98 "channels",
99 "rate",
100 #ifdef TUNNEL_SINK
101 "sink_name",
102 "sink_properties",
103 "sink",
104 #else
105 "source_name",
106 "source_properties",
107 "source",
108 #endif
109 "channel_map",
110 NULL,
111 };
112
113 #define DEFAULT_TIMEOUT 5
114
115 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
116
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
118
119 #ifdef TUNNEL_SINK
120
121 enum {
122 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
123 SINK_MESSAGE_REMOTE_SUSPEND,
124 SINK_MESSAGE_UPDATE_LATENCY,
125 SINK_MESSAGE_POST
126 };
127
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
130
131 #else
132
133 enum {
134 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
135 SOURCE_MESSAGE_REMOTE_SUSPEND,
136 SOURCE_MESSAGE_UPDATE_LATENCY
137 };
138
139 #define DEFAULT_FRAGSIZE_MSEC 25
140
141 #endif
142
143 #ifdef TUNNEL_SINK
144 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 #endif
147 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154
155 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
156 #ifdef TUNNEL_SINK
157 [PA_COMMAND_REQUEST] = command_request,
158 [PA_COMMAND_STARTED] = command_started,
159 #endif
160 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
161 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
162 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
163 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
164 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
165 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
166 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
167 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
168 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
169 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
170 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
171 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
172 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
173 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
174 };
175
176 struct userdata {
177 pa_core *core;
178 pa_module *module;
179
180 pa_thread_mq thread_mq;
181 pa_rtpoll *rtpoll;
182 pa_thread *thread;
183
184 pa_socket_client *client;
185 pa_pstream *pstream;
186 pa_pdispatch *pdispatch;
187
188 char *server_name;
189 #ifdef TUNNEL_SINK
190 char *sink_name;
191 pa_sink *sink;
192 size_t requested_bytes;
193 #else
194 char *source_name;
195 pa_source *source;
196 pa_mcalign *mcalign;
197 #endif
198
199 pa_auth_cookie *auth_cookie;
200
201 uint32_t version;
202 uint32_t ctag;
203 uint32_t device_index;
204 uint32_t channel;
205
206 int64_t counter, counter_delta;
207
208 pa_bool_t remote_corked:1;
209 pa_bool_t remote_suspended:1;
210
211 pa_usec_t transport_usec; /* maintained in the main thread */
212 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
213
214 uint32_t ignore_latency_before;
215
216 pa_time_event *time_event;
217
218 pa_smoother *smoother;
219
220 char *device_description;
221 char *server_fqdn;
222 char *user_name;
223
224 uint32_t maxlength;
225 #ifdef TUNNEL_SINK
226 uint32_t tlength;
227 uint32_t minreq;
228 uint32_t prebuf;
229 #else
230 uint32_t fragsize;
231 #endif
232 };
233
234 static void request_latency(struct userdata *u);
235
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
238 pa_log_debug("Got stream or client event.");
239 }
240
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
243 struct userdata *u = userdata;
244
245 pa_assert(pd);
246 pa_assert(t);
247 pa_assert(u);
248 pa_assert(u->pdispatch == pd);
249
250 pa_log_warn("Stream killed");
251 pa_module_unload_request(u->module, TRUE);
252 }
253
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
256 struct userdata *u = userdata;
257
258 pa_assert(pd);
259 pa_assert(t);
260 pa_assert(u);
261 pa_assert(u->pdispatch == pd);
262
263 pa_log_info("Server signalled buffer overrun/underrun.");
264 request_latency(u);
265 }
266
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
269 struct userdata *u = userdata;
270 uint32_t channel;
271 pa_bool_t suspended;
272
273 pa_assert(pd);
274 pa_assert(t);
275 pa_assert(u);
276 pa_assert(u->pdispatch == pd);
277
278 if (pa_tagstruct_getu32(t, &channel) < 0 ||
279 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
280 !pa_tagstruct_eof(t)) {
281
282 pa_log("Invalid packet.");
283 pa_module_unload_request(u->module, TRUE);
284 return;
285 }
286
287 pa_log_debug("Server reports device suspend.");
288
289 #ifdef TUNNEL_SINK
290 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
291 #else
292 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #endif
294
295 request_latency(u);
296 }
297
298 /* Called from main context */
299 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
300 struct userdata *u = userdata;
301 uint32_t channel, di;
302 const char *dn;
303 pa_bool_t suspended;
304
305 pa_assert(pd);
306 pa_assert(t);
307 pa_assert(u);
308 pa_assert(u->pdispatch == pd);
309
310 if (pa_tagstruct_getu32(t, &channel) < 0 ||
311 pa_tagstruct_getu32(t, &di) < 0 ||
312 pa_tagstruct_gets(t, &dn) < 0 ||
313 pa_tagstruct_get_boolean(t, &suspended) < 0) {
314
315 pa_log_error("Invalid packet.");
316 pa_module_unload_request(u->module, TRUE);
317 return;
318 }
319
320 pa_log_debug("Server reports a stream move.");
321
322 #ifdef TUNNEL_SINK
323 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
324 #else
325 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #endif
327
328 request_latency(u);
329 }
330
331 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
332 struct userdata *u = userdata;
333 uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
334 pa_usec_t usec;
335
336 pa_assert(pd);
337 pa_assert(t);
338 pa_assert(u);
339 pa_assert(u->pdispatch == pd);
340
341 if (pa_tagstruct_getu32(t, &channel) < 0 ||
342 pa_tagstruct_getu32(t, &maxlength) < 0) {
343
344 pa_log_error("Invalid packet.");
345 pa_module_unload_request(u->module, TRUE);
346 return;
347 }
348
349 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
350 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
351 pa_tagstruct_get_usec(t, &usec) < 0) {
352
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u->module, TRUE);
355 return;
356 }
357 } else {
358 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
359 pa_tagstruct_getu32(t, &prebuf) < 0 ||
360 pa_tagstruct_getu32(t, &minreq) < 0 ||
361 pa_tagstruct_get_usec(t, &usec) < 0) {
362
363 pa_log_error("Invalid packet.");
364 pa_module_unload_request(u->module, TRUE);
365 return;
366 }
367 }
368
369 #ifdef TUNNEL_SINK
370 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
371 #endif
372
373 request_latency(u);
374 }
375
376 #ifdef TUNNEL_SINK
377
378 /* Called from main context */
379 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
380 struct userdata *u = userdata;
381
382 pa_assert(pd);
383 pa_assert(t);
384 pa_assert(u);
385 pa_assert(u->pdispatch == pd);
386
387 pa_log_debug("Server reports playback started.");
388 request_latency(u);
389 }
390
391 #endif
392
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
395 pa_usec_t x;
396
397 pa_assert(u);
398
399 x = pa_rtclock_now();
400
401 /* Correct by the time the requested issued needs to travel to the
402 * other side. This is a valid thread-safe access, because the
403 * main thread is waiting for us */
404
405 if (past)
406 x -= u->thread_transport_usec;
407 else
408 x += u->thread_transport_usec;
409
410 if (u->remote_suspended || u->remote_corked)
411 pa_smoother_pause(u->smoother, x);
412 else
413 pa_smoother_resume(u->smoother, x, TRUE);
414 }
415
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
418 pa_assert(u);
419
420 if (u->remote_corked == cork)
421 return;
422
423 u->remote_corked = cork;
424 check_smoother_status(u, FALSE);
425 }
426
427 /* Called from main context */
428 static void stream_cork(struct userdata *u, pa_bool_t cork) {
429 pa_tagstruct *t;
430 pa_assert(u);
431
432 if (!u->pstream)
433 return;
434
435 t = pa_tagstruct_new(NULL, 0);
436 #ifdef TUNNEL_SINK
437 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
438 #else
439 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
440 #endif
441 pa_tagstruct_putu32(t, u->ctag++);
442 pa_tagstruct_putu32(t, u->channel);
443 pa_tagstruct_put_boolean(t, !!cork);
444 pa_pstream_send_tagstruct(u->pstream, t);
445
446 request_latency(u);
447 }
448
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
451 pa_assert(u);
452
453 if (u->remote_suspended == suspend)
454 return;
455
456 u->remote_suspended = suspend;
457 check_smoother_status(u, TRUE);
458 }
459
460 #ifdef TUNNEL_SINK
461
462 /* Called from IO thread context */
463 static void send_data(struct userdata *u) {
464 pa_assert(u);
465
466 while (u->requested_bytes > 0) {
467 pa_memchunk memchunk;
468
469 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
470 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
471 pa_memblock_unref(memchunk.memblock);
472
473 u->requested_bytes -= memchunk.length;
474
475 u->counter += (int64_t) memchunk.length;
476 }
477 }
478
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
481 struct userdata *u = PA_SINK(o)->userdata;
482
483 switch (code) {
484
485 case PA_SINK_MESSAGE_SET_STATE: {
486 int r;
487
488 /* First, change the state, because otherwise pa_sink_render() would fail */
489 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
490
491 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
492
493 if (PA_SINK_IS_OPENED(u->sink->state))
494 send_data(u);
495 }
496
497 return r;
498 }
499
500 case PA_SINK_MESSAGE_GET_LATENCY: {
501 pa_usec_t yl, yr, *usec = data;
502
503 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
504 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
505
506 *usec = yl > yr ? yl - yr : 0;
507 return 0;
508 }
509
510 case SINK_MESSAGE_REQUEST:
511
512 pa_assert(offset > 0);
513 u->requested_bytes += (size_t) offset;
514
515 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
516 send_data(u);
517
518 return 0;
519
520
521 case SINK_MESSAGE_REMOTE_SUSPEND:
522
523 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
524 return 0;
525
526
527 case SINK_MESSAGE_UPDATE_LATENCY: {
528 pa_usec_t y;
529
530 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
531
532 if (y > (pa_usec_t) offset)
533 y -= (pa_usec_t) offset;
534 else
535 y = 0;
536
537 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
538
539 /* We can access this freely here, since the main thread is waiting for us */
540 u->thread_transport_usec = u->transport_usec;
541
542 return 0;
543 }
544
545 case SINK_MESSAGE_POST:
546
547 /* OK, This might be a bit confusing. This message is
548 * delivered to us from the main context -- NOT from the
549 * IO thread context where the rest of the messages are
550 * dispatched. Yeah, ugly, but I am a lazy bastard. */
551
552 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
553
554 u->counter_delta += (int64_t) chunk->length;
555
556 return 0;
557 }
558
559 return pa_sink_process_msg(o, code, data, offset, chunk);
560 }
561
562 /* Called from main context */
563 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
564 struct userdata *u;
565 pa_sink_assert_ref(s);
566 u = s->userdata;
567
568 switch ((pa_sink_state_t) state) {
569
570 case PA_SINK_SUSPENDED:
571 pa_assert(PA_SINK_IS_OPENED(s->state));
572 stream_cork(u, TRUE);
573 break;
574
575 case PA_SINK_IDLE:
576 case PA_SINK_RUNNING:
577 if (s->state == PA_SINK_SUSPENDED)
578 stream_cork(u, FALSE);
579 break;
580
581 case PA_SINK_UNLINKED:
582 case PA_SINK_INIT:
583 case PA_SINK_INVALID_STATE:
584 ;
585 }
586
587 return 0;
588 }
589
590 #else
591
592 /* This function is called from IO context -- except when it is not. */
593 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
594 struct userdata *u = PA_SOURCE(o)->userdata;
595
596 switch (code) {
597
598 case PA_SOURCE_MESSAGE_SET_STATE: {
599 int r;
600
601 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
602 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
603
604 return r;
605 }
606
607 case PA_SOURCE_MESSAGE_GET_LATENCY: {
608 pa_usec_t yr, yl, *usec = data;
609
610 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
611 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
612
613 *usec = yr > yl ? yr - yl : 0;
614 return 0;
615 }
616
617 case SOURCE_MESSAGE_POST: {
618 pa_memchunk c;
619
620 pa_mcalign_push(u->mcalign, chunk);
621
622 while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
623
624 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
625 pa_source_post(u->source, &c);
626
627 pa_memblock_unref(c.memblock);
628
629 u->counter += (int64_t) c.length;
630 }
631
632 return 0;
633 }
634
635 case SOURCE_MESSAGE_REMOTE_SUSPEND:
636
637 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
638 return 0;
639
640 case SOURCE_MESSAGE_UPDATE_LATENCY: {
641 pa_usec_t y;
642
643 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
644 y += (pa_usec_t) offset;
645
646 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
647
648 /* We can access this freely here, since the main thread is waiting for us */
649 u->thread_transport_usec = u->transport_usec;
650
651 return 0;
652 }
653 }
654
655 return pa_source_process_msg(o, code, data, offset, chunk);
656 }
657
658 /* Called from main context */
659 static int source_set_state(pa_source *s, pa_source_state_t state) {
660 struct userdata *u;
661 pa_source_assert_ref(s);
662 u = s->userdata;
663
664 switch ((pa_source_state_t) state) {
665
666 case PA_SOURCE_SUSPENDED:
667 pa_assert(PA_SOURCE_IS_OPENED(s->state));
668 stream_cork(u, TRUE);
669 break;
670
671 case PA_SOURCE_IDLE:
672 case PA_SOURCE_RUNNING:
673 if (s->state == PA_SOURCE_SUSPENDED)
674 stream_cork(u, FALSE);
675 break;
676
677 case PA_SOURCE_UNLINKED:
678 case PA_SOURCE_INIT:
679 case PA_SINK_INVALID_STATE:
680 ;
681 }
682
683 return 0;
684 }
685
686 #endif
687
688 static void thread_func(void *userdata) {
689 struct userdata *u = userdata;
690
691 pa_assert(u);
692
693 pa_log_debug("Thread starting up");
694
695 pa_thread_mq_install(&u->thread_mq);
696
697 for (;;) {
698 int ret;
699
700 #ifdef TUNNEL_SINK
701 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
702 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
703 pa_sink_process_rewind(u->sink, 0);
704 #endif
705
706 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
707 goto fail;
708
709 if (ret == 0)
710 goto finish;
711 }
712
713 fail:
714 /* If this was no regular exit from the loop we have to continue
715 * processing messages until we received PA_MESSAGE_SHUTDOWN */
716 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
717 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
718
719 finish:
720 pa_log_debug("Thread shutting down");
721 }
722
723 #ifdef TUNNEL_SINK
724 /* Called from main context */
725 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
726 struct userdata *u = userdata;
727 uint32_t bytes, channel;
728
729 pa_assert(pd);
730 pa_assert(command == PA_COMMAND_REQUEST);
731 pa_assert(t);
732 pa_assert(u);
733 pa_assert(u->pdispatch == pd);
734
735 if (pa_tagstruct_getu32(t, &channel) < 0 ||
736 pa_tagstruct_getu32(t, &bytes) < 0) {
737 pa_log("Invalid protocol reply");
738 goto fail;
739 }
740
741 if (channel != u->channel) {
742 pa_log("Received data for invalid channel");
743 goto fail;
744 }
745
746 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
747 return;
748
749 fail:
750 pa_module_unload_request(u->module, TRUE);
751 }
752
753 #endif
754
755 /* Called from main context */
756 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
757 struct userdata *u = userdata;
758 pa_usec_t sink_usec, source_usec;
759 pa_bool_t playing;
760 int64_t write_index, read_index;
761 struct timeval local, remote, now;
762 pa_sample_spec *ss;
763 int64_t delay;
764
765 pa_assert(pd);
766 pa_assert(u);
767
768 if (command != PA_COMMAND_REPLY) {
769 if (command == PA_COMMAND_ERROR)
770 pa_log("Failed to get latency.");
771 else
772 pa_log("Protocol error.");
773 goto fail;
774 }
775
776 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
777 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
778 pa_tagstruct_get_boolean(t, &playing) < 0 ||
779 pa_tagstruct_get_timeval(t, &local) < 0 ||
780 pa_tagstruct_get_timeval(t, &remote) < 0 ||
781 pa_tagstruct_gets64(t, &write_index) < 0 ||
782 pa_tagstruct_gets64(t, &read_index) < 0) {
783 pa_log("Invalid reply.");
784 goto fail;
785 }
786
787 #ifdef TUNNEL_SINK
788 if (u->version >= 13) {
789 uint64_t underrun_for = 0, playing_for = 0;
790
791 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
792 pa_tagstruct_getu64(t, &playing_for) < 0) {
793 pa_log("Invalid reply.");
794 goto fail;
795 }
796 }
797 #endif
798
799 if (!pa_tagstruct_eof(t)) {
800 pa_log("Invalid reply.");
801 goto fail;
802 }
803
804 if (tag < u->ignore_latency_before) {
805 return;
806 }
807
808 pa_gettimeofday(&now);
809
810 /* Calculate transport usec */
811 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
812 /* local and remote seem to have synchronized clocks */
813 #ifdef TUNNEL_SINK
814 u->transport_usec = pa_timeval_diff(&remote, &local);
815 #else
816 u->transport_usec = pa_timeval_diff(&now, &remote);
817 #endif
818 } else
819 u->transport_usec = pa_timeval_diff(&now, &local)/2;
820
821 /* First, take the device's delay */
822 #ifdef TUNNEL_SINK
823 delay = (int64_t) sink_usec;
824 ss = &u->sink->sample_spec;
825 #else
826 delay = (int64_t) source_usec;
827 ss = &u->source->sample_spec;
828 #endif
829
830 /* Add the length of our server-side buffer */
831 if (write_index >= read_index)
832 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
833 else
834 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
835
836 /* Our measurements are already out of date, hence correct by the *
837 * transport latency */
838 #ifdef TUNNEL_SINK
839 delay -= (int64_t) u->transport_usec;
840 #else
841 delay += (int64_t) u->transport_usec;
842 #endif
843
844 /* Now correct by what we have have read/written since we requested the update */
845 #ifdef TUNNEL_SINK
846 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
847 #else
848 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
849 #endif
850
851 #ifdef TUNNEL_SINK
852 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
853 #else
854 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
855 #endif
856
857 return;
858
859 fail:
860
861 pa_module_unload_request(u->module, TRUE);
862 }
863
864 /* Called from main context */
865 static void request_latency(struct userdata *u) {
866 pa_tagstruct *t;
867 struct timeval now;
868 uint32_t tag;
869 pa_assert(u);
870
871 t = pa_tagstruct_new(NULL, 0);
872 #ifdef TUNNEL_SINK
873 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
874 #else
875 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
876 #endif
877 pa_tagstruct_putu32(t, tag = u->ctag++);
878 pa_tagstruct_putu32(t, u->channel);
879
880 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
881
882 pa_pstream_send_tagstruct(u->pstream, t);
883 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
884
885 u->ignore_latency_before = tag;
886 u->counter_delta = 0;
887 }
888
889 /* Called from main context */
890 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
891 struct userdata *u = userdata;
892
893 pa_assert(m);
894 pa_assert(e);
895 pa_assert(u);
896
897 request_latency(u);
898
899 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
900 }
901
902 /* Called from main context */
903 static void update_description(struct userdata *u) {
904 char *d;
905 char un[128], hn[128];
906 pa_tagstruct *t;
907
908 pa_assert(u);
909
910 if (!u->server_fqdn || !u->user_name || !u->device_description)
911 return;
912
913 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
914
915 #ifdef TUNNEL_SINK
916 pa_sink_set_description(u->sink, d);
917 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
918 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
919 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
920 #else
921 pa_source_set_description(u->source, d);
922 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
923 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
924 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
925 #endif
926
927 pa_xfree(d);
928
929 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
930 pa_get_user_name(un, sizeof(un)),
931 pa_get_host_name(hn, sizeof(hn)));
932
933 t = pa_tagstruct_new(NULL, 0);
934 #ifdef TUNNEL_SINK
935 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
936 #else
937 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
938 #endif
939 pa_tagstruct_putu32(t, u->ctag++);
940 pa_tagstruct_putu32(t, u->channel);
941 pa_tagstruct_puts(t, d);
942 pa_pstream_send_tagstruct(u->pstream, t);
943
944 pa_xfree(d);
945 }
946
947 /* Called from main context */
948 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
949 struct userdata *u = userdata;
950 pa_sample_spec ss;
951 pa_channel_map cm;
952 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
953 uint32_t cookie;
954
955 pa_assert(pd);
956 pa_assert(u);
957
958 if (command != PA_COMMAND_REPLY) {
959 if (command == PA_COMMAND_ERROR)
960 pa_log("Failed to get info.");
961 else
962 pa_log("Protocol error.");
963 goto fail;
964 }
965
966 if (pa_tagstruct_gets(t, &server_name) < 0 ||
967 pa_tagstruct_gets(t, &server_version) < 0 ||
968 pa_tagstruct_gets(t, &user_name) < 0 ||
969 pa_tagstruct_gets(t, &host_name) < 0 ||
970 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
971 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
972 pa_tagstruct_gets(t, &default_source_name) < 0 ||
973 pa_tagstruct_getu32(t, &cookie) < 0 ||
974 (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
975
976 pa_log("Parse failure");
977 goto fail;
978 }
979
980 if (!pa_tagstruct_eof(t)) {
981 pa_log("Packet too long");
982 goto fail;
983 }
984
985 pa_xfree(u->server_fqdn);
986 u->server_fqdn = pa_xstrdup(host_name);
987
988 pa_xfree(u->user_name);
989 u->user_name = pa_xstrdup(user_name);
990
991 update_description(u);
992
993 return;
994
995 fail:
996 pa_module_unload_request(u->module, TRUE);
997 }
998
999 #ifdef TUNNEL_SINK
1000
1001 /* Called from main context */
1002 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1003 struct userdata *u = userdata;
1004 uint32_t idx, owner_module, monitor_source, flags;
1005 const char *name, *description, *monitor_source_name, *driver;
1006 pa_sample_spec ss;
1007 pa_channel_map cm;
1008 pa_cvolume volume;
1009 pa_bool_t mute;
1010 pa_usec_t latency;
1011 pa_proplist *pl;
1012
1013 pa_assert(pd);
1014 pa_assert(u);
1015
1016 pl = pa_proplist_new();
1017
1018 if (command != PA_COMMAND_REPLY) {
1019 if (command == PA_COMMAND_ERROR)
1020 pa_log("Failed to get info.");
1021 else
1022 pa_log("Protocol error.");
1023 goto fail;
1024 }
1025
1026 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1027 pa_tagstruct_gets(t, &name) < 0 ||
1028 pa_tagstruct_gets(t, &description) < 0 ||
1029 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1030 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1031 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1032 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1033 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1034 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1035 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1036 pa_tagstruct_get_usec(t, &latency) < 0 ||
1037 pa_tagstruct_gets(t, &driver) < 0 ||
1038 pa_tagstruct_getu32(t, &flags) < 0) {
1039
1040 pa_log("Parse failure");
1041 goto fail;
1042 }
1043
1044 if (u->version >= 13) {
1045 pa_usec_t configured_latency;
1046
1047 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1048 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1049
1050 pa_log("Parse failure");
1051 goto fail;
1052 }
1053 }
1054
1055 if (u->version >= 15) {
1056 pa_volume_t base_volume;
1057 uint32_t state, n_volume_steps, card;
1058
1059 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1060 pa_tagstruct_getu32(t, &state) < 0 ||
1061 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1062 pa_tagstruct_getu32(t, &card) < 0) {
1063
1064 pa_log("Parse failure");
1065 goto fail;
1066 }
1067 }
1068
1069 if (u->version >= 16) {
1070 uint32_t n_ports;
1071 const char *s;
1072
1073 if (pa_tagstruct_getu32(t, &n_ports)) {
1074 pa_log("Parse failure");
1075 goto fail;
1076 }
1077
1078 for (uint32_t j = 0; j < n_ports; j++) {
1079 uint32_t priority;
1080
1081 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1082 pa_tagstruct_gets(t, &s) < 0 || /* description */
1083 pa_tagstruct_getu32(t, &priority) < 0) {
1084
1085 pa_log("Parse failure");
1086 goto fail;
1087 }
1088 }
1089
1090 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1091 pa_log("Parse failure");
1092 goto fail;
1093 }
1094 }
1095
1096 if (u->version >= 21) {
1097 uint8_t n_formats;
1098 pa_format_info *format;
1099
1100 if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1101 pa_log("Parse failure");
1102 goto fail;
1103 }
1104
1105 for (uint8_t j = 0; j < n_formats; j++) {
1106 format = pa_format_info_new();
1107 if (pa_tagstruct_get_format_info(t, format)) { /* format info */
1108 pa_format_info_free(format);
1109 pa_log("Parse failure");
1110 goto fail;
1111 }
1112 pa_format_info_free(format);
1113 }
1114 }
1115
1116 if (!pa_tagstruct_eof(t)) {
1117 pa_log("Packet too long");
1118 goto fail;
1119 }
1120
1121 pa_proplist_free(pl);
1122
1123 if (!u->sink_name || strcmp(name, u->sink_name))
1124 return;
1125
1126 pa_xfree(u->device_description);
1127 u->device_description = pa_xstrdup(description);
1128
1129 update_description(u);
1130
1131 return;
1132
1133 fail:
1134 pa_module_unload_request(u->module, TRUE);
1135 pa_proplist_free(pl);
1136 }
1137
1138 /* Called from main context */
1139 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1140 struct userdata *u = userdata;
1141 uint32_t idx, owner_module, client, sink;
1142 pa_usec_t buffer_usec, sink_usec;
1143 const char *name, *driver, *resample_method;
1144 pa_bool_t mute = FALSE;
1145 pa_sample_spec sample_spec;
1146 pa_channel_map channel_map;
1147 pa_cvolume volume;
1148 pa_proplist *pl;
1149 pa_bool_t b;
1150
1151 pa_assert(pd);
1152 pa_assert(u);
1153
1154 pl = pa_proplist_new();
1155
1156 if (command != PA_COMMAND_REPLY) {
1157 if (command == PA_COMMAND_ERROR)
1158 pa_log("Failed to get info.");
1159 else
1160 pa_log("Protocol error.");
1161 goto fail;
1162 }
1163
1164 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1165 pa_tagstruct_gets(t, &name) < 0 ||
1166 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1167 pa_tagstruct_getu32(t, &client) < 0 ||
1168 pa_tagstruct_getu32(t, &sink) < 0 ||
1169 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1170 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1171 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1172 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1173 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1174 pa_tagstruct_gets(t, &resample_method) < 0 ||
1175 pa_tagstruct_gets(t, &driver) < 0) {
1176
1177 pa_log("Parse failure");
1178 goto fail;
1179 }
1180
1181 if (u->version >= 11) {
1182 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1183
1184 pa_log("Parse failure");
1185 goto fail;
1186 }
1187 }
1188
1189 if (u->version >= 13) {
1190 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1191
1192 pa_log("Parse failure");
1193 goto fail;
1194 }
1195 }
1196
1197 if (u->version >= 19) {
1198 if (pa_tagstruct_get_boolean(t, &b) < 0) {
1199
1200 pa_log("Parse failure");
1201 goto fail;
1202 }
1203 }
1204
1205 if (u->version >= 20) {
1206 if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1207 pa_tagstruct_get_boolean(t, &b) < 0) {
1208
1209 pa_log("Parse failure");
1210 goto fail;
1211 }
1212 }
1213
1214 if (u->version >= 21) {
1215 pa_format_info *format = pa_format_info_new();
1216
1217 if (pa_tagstruct_get_format_info(t, format) < 0) {
1218 pa_format_info_free(format);
1219 pa_log("Parse failure");
1220 goto fail;
1221 }
1222 pa_format_info_free(format);
1223 }
1224
1225 if (!pa_tagstruct_eof(t)) {
1226 pa_log("Packet too long");
1227 goto fail;
1228 }
1229
1230 pa_proplist_free(pl);
1231
1232 if (idx != u->device_index)
1233 return;
1234
1235 pa_assert(u->sink);
1236
1237 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1238 pa_cvolume_equal(&volume, &u->sink->real_volume))
1239 return;
1240
1241 pa_sink_volume_changed(u->sink, &volume);
1242
1243 if (u->version >= 11)
1244 pa_sink_mute_changed(u->sink, mute);
1245
1246 return;
1247
1248 fail:
1249 pa_module_unload_request(u->module, TRUE);
1250 pa_proplist_free(pl);
1251 }
1252
1253 #else
1254
1255 /* Called from main context */
1256 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1257 struct userdata *u = userdata;
1258 uint32_t idx, owner_module, monitor_of_sink, flags;
1259 const char *name, *description, *monitor_of_sink_name, *driver;
1260 pa_sample_spec ss;
1261 pa_channel_map cm;
1262 pa_cvolume volume;
1263 pa_bool_t mute;
1264 pa_usec_t latency, configured_latency;
1265 pa_proplist *pl;
1266
1267 pa_assert(pd);
1268 pa_assert(u);
1269
1270 pl = pa_proplist_new();
1271
1272 if (command != PA_COMMAND_REPLY) {
1273 if (command == PA_COMMAND_ERROR)
1274 pa_log("Failed to get info.");
1275 else
1276 pa_log("Protocol error.");
1277 goto fail;
1278 }
1279
1280 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1281 pa_tagstruct_gets(t, &name) < 0 ||
1282 pa_tagstruct_gets(t, &description) < 0 ||
1283 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1284 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1285 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1286 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1287 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1288 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1289 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1290 pa_tagstruct_get_usec(t, &latency) < 0 ||
1291 pa_tagstruct_gets(t, &driver) < 0 ||
1292 pa_tagstruct_getu32(t, &flags) < 0) {
1293
1294 pa_log("Parse failure");
1295 goto fail;
1296 }
1297
1298 if (u->version >= 13) {
1299 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1300 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1301
1302 pa_log("Parse failure");
1303 goto fail;
1304 }
1305 }
1306
1307 if (u->version >= 15) {
1308 pa_volume_t base_volume;
1309 uint32_t state, n_volume_steps, card;
1310
1311 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1312 pa_tagstruct_getu32(t, &state) < 0 ||
1313 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1314 pa_tagstruct_getu32(t, &card) < 0) {
1315
1316 pa_log("Parse failure");
1317 goto fail;
1318 }
1319 }
1320
1321 if (u->version >= 16) {
1322 uint32_t n_ports;
1323 const char *s;
1324
1325 if (pa_tagstruct_getu32(t, &n_ports)) {
1326 pa_log("Parse failure");
1327 goto fail;
1328 }
1329
1330 for (uint32_t j = 0; j < n_ports; j++) {
1331 uint32_t priority;
1332
1333 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1334 pa_tagstruct_gets(t, &s) < 0 || /* description */
1335 pa_tagstruct_getu32(t, &priority) < 0) {
1336
1337 pa_log("Parse failure");
1338 goto fail;
1339 }
1340 }
1341
1342 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1343 pa_log("Parse failure");
1344 goto fail;
1345 }
1346 }
1347
1348 if (!pa_tagstruct_eof(t)) {
1349 pa_log("Packet too long");
1350 goto fail;
1351 }
1352
1353 pa_proplist_free(pl);
1354
1355 if (!u->source_name || strcmp(name, u->source_name))
1356 return;
1357
1358 pa_xfree(u->device_description);
1359 u->device_description = pa_xstrdup(description);
1360
1361 update_description(u);
1362
1363 return;
1364
1365 fail:
1366 pa_module_unload_request(u->module, TRUE);
1367 pa_proplist_free(pl);
1368 }
1369
1370 #endif
1371
1372 /* Called from main context */
1373 static void request_info(struct userdata *u) {
1374 pa_tagstruct *t;
1375 uint32_t tag;
1376 pa_assert(u);
1377
1378 t = pa_tagstruct_new(NULL, 0);
1379 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1380 pa_tagstruct_putu32(t, tag = u->ctag++);
1381 pa_pstream_send_tagstruct(u->pstream, t);
1382 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1383
1384 #ifdef TUNNEL_SINK
1385 t = pa_tagstruct_new(NULL, 0);
1386 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1387 pa_tagstruct_putu32(t, tag = u->ctag++);
1388 pa_tagstruct_putu32(t, u->device_index);
1389 pa_pstream_send_tagstruct(u->pstream, t);
1390 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1391
1392 if (u->sink_name) {
1393 t = pa_tagstruct_new(NULL, 0);
1394 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1395 pa_tagstruct_putu32(t, tag = u->ctag++);
1396 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1397 pa_tagstruct_puts(t, u->sink_name);
1398 pa_pstream_send_tagstruct(u->pstream, t);
1399 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1400 }
1401 #else
1402 if (u->source_name) {
1403 t = pa_tagstruct_new(NULL, 0);
1404 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1405 pa_tagstruct_putu32(t, tag = u->ctag++);
1406 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1407 pa_tagstruct_puts(t, u->source_name);
1408 pa_pstream_send_tagstruct(u->pstream, t);
1409 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1410 }
1411 #endif
1412 }
1413
1414 /* Called from main context */
1415 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1416 struct userdata *u = userdata;
1417 pa_subscription_event_type_t e;
1418 uint32_t idx;
1419
1420 pa_assert(pd);
1421 pa_assert(t);
1422 pa_assert(u);
1423 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1424
1425 if (pa_tagstruct_getu32(t, &e) < 0 ||
1426 pa_tagstruct_getu32(t, &idx) < 0) {
1427 pa_log("Invalid protocol reply");
1428 pa_module_unload_request(u->module, TRUE);
1429 return;
1430 }
1431
1432 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1433 #ifdef TUNNEL_SINK
1434 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1435 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1436 #else
1437 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1438 #endif
1439 )
1440 return;
1441
1442 request_info(u);
1443 }
1444
1445 /* Called from main context */
1446 static void start_subscribe(struct userdata *u) {
1447 pa_tagstruct *t;
1448 pa_assert(u);
1449
1450 t = pa_tagstruct_new(NULL, 0);
1451 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1452 pa_tagstruct_putu32(t, u->ctag++);
1453 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1454 #ifdef TUNNEL_SINK
1455 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1456 #else
1457 PA_SUBSCRIPTION_MASK_SOURCE
1458 #endif
1459 );
1460
1461 pa_pstream_send_tagstruct(u->pstream, t);
1462 }
1463
1464 /* Called from main context */
1465 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1466 struct userdata *u = userdata;
1467 #ifdef TUNNEL_SINK
1468 uint32_t bytes;
1469 #endif
1470
1471 pa_assert(pd);
1472 pa_assert(u);
1473 pa_assert(u->pdispatch == pd);
1474
1475 if (command != PA_COMMAND_REPLY) {
1476 if (command == PA_COMMAND_ERROR)
1477 pa_log("Failed to create stream.");
1478 else
1479 pa_log("Protocol error.");
1480 goto fail;
1481 }
1482
1483 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1484 pa_tagstruct_getu32(t, &u->device_index) < 0
1485 #ifdef TUNNEL_SINK
1486 || pa_tagstruct_getu32(t, &bytes) < 0
1487 #endif
1488 )
1489 goto parse_error;
1490
1491 if (u->version >= 9) {
1492 #ifdef TUNNEL_SINK
1493 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1494 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1495 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1496 pa_tagstruct_getu32(t, &u->minreq) < 0)
1497 goto parse_error;
1498 #else
1499 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1500 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1501 goto parse_error;
1502 #endif
1503 }
1504
1505 if (u->version >= 12) {
1506 pa_sample_spec ss;
1507 pa_channel_map cm;
1508 uint32_t device_index;
1509 const char *dn;
1510 pa_bool_t suspended;
1511
1512 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1513 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1514 pa_tagstruct_getu32(t, &device_index) < 0 ||
1515 pa_tagstruct_gets(t, &dn) < 0 ||
1516 pa_tagstruct_get_boolean(t, &suspended) < 0)
1517 goto parse_error;
1518
1519 #ifdef TUNNEL_SINK
1520 pa_xfree(u->sink_name);
1521 u->sink_name = pa_xstrdup(dn);
1522 #else
1523 pa_xfree(u->source_name);
1524 u->source_name = pa_xstrdup(dn);
1525 #endif
1526 }
1527
1528 if (u->version >= 13) {
1529 pa_usec_t usec;
1530
1531 if (pa_tagstruct_get_usec(t, &usec) < 0)
1532 goto parse_error;
1533
1534 /* #ifdef TUNNEL_SINK */
1535 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1536 /* #else */
1537 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1538 /* #endif */
1539 }
1540
1541 if (u->version >= 21) {
1542 pa_format_info *format = pa_format_info_new();
1543
1544 if (pa_tagstruct_get_format_info(t, format) < 0) {
1545 pa_format_info_free(format);
1546 goto parse_error;
1547 }
1548
1549 pa_format_info_free(format);
1550 }
1551
1552 if (!pa_tagstruct_eof(t))
1553 goto parse_error;
1554
1555 start_subscribe(u);
1556 request_info(u);
1557
1558 pa_assert(!u->time_event);
1559 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1560
1561 request_latency(u);
1562
1563 pa_log_debug("Stream created.");
1564
1565 #ifdef TUNNEL_SINK
1566 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1567 #endif
1568
1569 return;
1570
1571 parse_error:
1572 pa_log("Invalid reply. (Create stream)");
1573
1574 fail:
1575 pa_module_unload_request(u->module, TRUE);
1576
1577 }
1578
1579 /* Called from main context */
1580 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1581 struct userdata *u = userdata;
1582 pa_tagstruct *reply;
1583 char name[256], un[128], hn[128];
1584 #ifdef TUNNEL_SINK
1585 pa_cvolume volume;
1586 #endif
1587
1588 pa_assert(pd);
1589 pa_assert(u);
1590 pa_assert(u->pdispatch == pd);
1591
1592 if (command != PA_COMMAND_REPLY ||
1593 pa_tagstruct_getu32(t, &u->version) < 0 ||
1594 !pa_tagstruct_eof(t)) {
1595
1596 if (command == PA_COMMAND_ERROR)
1597 pa_log("Failed to authenticate");
1598 else
1599 pa_log("Protocol error.");
1600
1601 goto fail;
1602 }
1603
1604 /* Minimum supported protocol version */
1605 if (u->version < 8) {
1606 pa_log("Incompatible protocol version");
1607 goto fail;
1608 }
1609
1610 /* Starting with protocol version 13 the MSB of the version tag
1611 reflects if shm is enabled for this connection or not. We don't
1612 support SHM here at all, so we just ignore this. */
1613
1614 if (u->version >= 13)
1615 u->version &= 0x7FFFFFFFU;
1616
1617 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1618
1619 #ifdef TUNNEL_SINK
1620 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1621 pa_sink_update_proplist(u->sink, 0, NULL);
1622
1623 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1624 u->sink_name,
1625 pa_get_user_name(un, sizeof(un)),
1626 pa_get_host_name(hn, sizeof(hn)));
1627 #else
1628 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1629 pa_source_update_proplist(u->source, 0, NULL);
1630
1631 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1632 u->source_name,
1633 pa_get_user_name(un, sizeof(un)),
1634 pa_get_host_name(hn, sizeof(hn)));
1635 #endif
1636
1637 reply = pa_tagstruct_new(NULL, 0);
1638 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1639 pa_tagstruct_putu32(reply, u->ctag++);
1640
1641 if (u->version >= 13) {
1642 pa_proplist *pl;
1643 pl = pa_proplist_new();
1644 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1645 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1646 pa_init_proplist(pl);
1647 pa_tagstruct_put_proplist(reply, pl);
1648 pa_proplist_free(pl);
1649 } else
1650 pa_tagstruct_puts(reply, "PulseAudio");
1651
1652 pa_pstream_send_tagstruct(u->pstream, reply);
1653 /* We ignore the server's reply here */
1654
1655 reply = pa_tagstruct_new(NULL, 0);
1656
1657 if (u->version < 13)
1658 /* Only for older PA versions we need to fill in the maxlength */
1659 u->maxlength = 4*1024*1024;
1660
1661 #ifdef TUNNEL_SINK
1662 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1663 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1664 u->prebuf = u->tlength;
1665 #else
1666 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1667 #endif
1668
1669 #ifdef TUNNEL_SINK
1670 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1671 pa_tagstruct_putu32(reply, tag = u->ctag++);
1672
1673 if (u->version < 13)
1674 pa_tagstruct_puts(reply, name);
1675
1676 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1677 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1678 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1679 pa_tagstruct_puts(reply, u->sink_name);
1680 pa_tagstruct_putu32(reply, u->maxlength);
1681 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1682 pa_tagstruct_putu32(reply, u->tlength);
1683 pa_tagstruct_putu32(reply, u->prebuf);
1684 pa_tagstruct_putu32(reply, u->minreq);
1685 pa_tagstruct_putu32(reply, 0);
1686 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1687 pa_tagstruct_put_cvolume(reply, &volume);
1688 #else
1689 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1690 pa_tagstruct_putu32(reply, tag = u->ctag++);
1691
1692 if (u->version < 13)
1693 pa_tagstruct_puts(reply, name);
1694
1695 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1696 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1697 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1698 pa_tagstruct_puts(reply, u->source_name);
1699 pa_tagstruct_putu32(reply, u->maxlength);
1700 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1701 pa_tagstruct_putu32(reply, u->fragsize);
1702 #endif
1703
1704 if (u->version >= 12) {
1705 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1706 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1707 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1708 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1709 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1710 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1711 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1712 }
1713
1714 if (u->version >= 13) {
1715 pa_proplist *pl;
1716
1717 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1718 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1719
1720 pl = pa_proplist_new();
1721 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1722 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1723 pa_tagstruct_put_proplist(reply, pl);
1724 pa_proplist_free(pl);
1725
1726 #ifndef TUNNEL_SINK
1727 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1728 #endif
1729 }
1730
1731 if (u->version >= 14) {
1732 #ifdef TUNNEL_SINK
1733 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1734 #endif
1735 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1736 }
1737
1738 if (u->version >= 15) {
1739 #ifdef TUNNEL_SINK
1740 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1741 #endif
1742 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1743 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1744 }
1745
1746 #ifdef TUNNEL_SINK
1747 if (u->version >= 17)
1748 pa_tagstruct_put_boolean(reply, FALSE); /* relative volume */
1749
1750 if (u->version >= 18)
1751 pa_tagstruct_put_boolean(reply, FALSE); /* passthrough stream */
1752 #endif
1753
1754 #ifdef TUNNEL_SINK
1755 if (u->version >= 21) {
1756 /* We're not using the extended API, so n_formats = 0 and that's that */
1757 pa_tagstruct_putu8(reply, 0);
1758 }
1759 #endif
1760
1761 pa_pstream_send_tagstruct(u->pstream, reply);
1762 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1763
1764 pa_log_debug("Connection authenticated, creating stream ...");
1765
1766 return;
1767
1768 fail:
1769 pa_module_unload_request(u->module, TRUE);
1770 }
1771
1772 /* Called from main context */
1773 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1774 struct userdata *u = userdata;
1775
1776 pa_assert(p);
1777 pa_assert(u);
1778
1779 pa_log_warn("Stream died.");
1780 pa_module_unload_request(u->module, TRUE);
1781 }
1782
1783 /* Called from main context */
1784 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1785 struct userdata *u = userdata;
1786
1787 pa_assert(p);
1788 pa_assert(packet);
1789 pa_assert(u);
1790
1791 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1792 pa_log("Invalid packet");
1793 pa_module_unload_request(u->module, TRUE);
1794 return;
1795 }
1796 }
1797
1798 #ifndef TUNNEL_SINK
1799 /* Called from main context */
1800 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1801 struct userdata *u = userdata;
1802
1803 pa_assert(p);
1804 pa_assert(chunk);
1805 pa_assert(u);
1806
1807 if (channel != u->channel) {
1808 pa_log("Received memory block on bad channel.");
1809 pa_module_unload_request(u->module, TRUE);
1810 return;
1811 }
1812
1813 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1814
1815 u->counter_delta += (int64_t) chunk->length;
1816 }
1817 #endif
1818
1819 /* Called from main context */
1820 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1821 struct userdata *u = userdata;
1822 pa_tagstruct *t;
1823 uint32_t tag;
1824
1825 pa_assert(sc);
1826 pa_assert(u);
1827 pa_assert(u->client == sc);
1828
1829 pa_socket_client_unref(u->client);
1830 u->client = NULL;
1831
1832 if (!io) {
1833 pa_log("Connection failed: %s", pa_cstrerror(errno));
1834 pa_module_unload_request(u->module, TRUE);
1835 return;
1836 }
1837
1838 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1839 u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
1840
1841 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1842 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1843 #ifndef TUNNEL_SINK
1844 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1845 #endif
1846
1847 t = pa_tagstruct_new(NULL, 0);
1848 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1849 pa_tagstruct_putu32(t, tag = u->ctag++);
1850 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1851
1852 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1853
1854 #ifdef HAVE_CREDS
1855 {
1856 pa_creds ucred;
1857
1858 if (pa_iochannel_creds_supported(io))
1859 pa_iochannel_creds_enable(io);
1860
1861 ucred.uid = getuid();
1862 ucred.gid = getgid();
1863
1864 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1865 }
1866 #else
1867 pa_pstream_send_tagstruct(u->pstream, t);
1868 #endif
1869
1870 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1871
1872 pa_log_debug("Connection established, authenticating ...");
1873 }
1874
1875 #ifdef TUNNEL_SINK
1876
1877 /* Called from main context */
1878 static void sink_set_volume(pa_sink *sink) {
1879 struct userdata *u;
1880 pa_tagstruct *t;
1881
1882 pa_assert(sink);
1883 u = sink->userdata;
1884 pa_assert(u);
1885
1886 t = pa_tagstruct_new(NULL, 0);
1887 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1888 pa_tagstruct_putu32(t, u->ctag++);
1889 pa_tagstruct_putu32(t, u->device_index);
1890 pa_tagstruct_put_cvolume(t, &sink->real_volume);
1891 pa_pstream_send_tagstruct(u->pstream, t);
1892 }
1893
1894 /* Called from main context */
1895 static void sink_set_mute(pa_sink *sink) {
1896 struct userdata *u;
1897 pa_tagstruct *t;
1898
1899 pa_assert(sink);
1900 u = sink->userdata;
1901 pa_assert(u);
1902
1903 if (u->version < 11)
1904 return;
1905
1906 t = pa_tagstruct_new(NULL, 0);
1907 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1908 pa_tagstruct_putu32(t, u->ctag++);
1909 pa_tagstruct_putu32(t, u->device_index);
1910 pa_tagstruct_put_boolean(t, !!sink->muted);
1911 pa_pstream_send_tagstruct(u->pstream, t);
1912 }
1913
1914 #endif
1915
1916 int pa__init(pa_module*m) {
1917 pa_modargs *ma = NULL;
1918 struct userdata *u = NULL;
1919 pa_sample_spec ss;
1920 pa_channel_map map;
1921 char *dn = NULL;
1922 #ifdef TUNNEL_SINK
1923 pa_sink_new_data data;
1924 #else
1925 pa_source_new_data data;
1926 #endif
1927
1928 pa_assert(m);
1929
1930 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1931 pa_log("Failed to parse module arguments");
1932 goto fail;
1933 }
1934
1935 m->userdata = u = pa_xnew0(struct userdata, 1);
1936 u->core = m->core;
1937 u->module = m;
1938 u->client = NULL;
1939 u->pdispatch = NULL;
1940 u->pstream = NULL;
1941 u->server_name = NULL;
1942 #ifdef TUNNEL_SINK
1943 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1944 u->sink = NULL;
1945 u->requested_bytes = 0;
1946 #else
1947 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1948 u->source = NULL;
1949 #endif
1950 u->smoother = pa_smoother_new(
1951 PA_USEC_PER_SEC,
1952 PA_USEC_PER_SEC*2,
1953 TRUE,
1954 TRUE,
1955 10,
1956 pa_rtclock_now(),
1957 FALSE);
1958 u->ctag = 1;
1959 u->device_index = u->channel = PA_INVALID_INDEX;
1960 u->time_event = NULL;
1961 u->ignore_latency_before = 0;
1962 u->transport_usec = u->thread_transport_usec = 0;
1963 u->remote_suspended = u->remote_corked = FALSE;
1964 u->counter = u->counter_delta = 0;
1965
1966 u->rtpoll = pa_rtpoll_new();
1967 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1968
1969 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1970 goto fail;
1971
1972 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1973 pa_log("No server specified.");
1974 goto fail;
1975 }
1976
1977 ss = m->core->default_sample_spec;
1978 map = m->core->default_channel_map;
1979 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1980 pa_log("Invalid sample format specification");
1981 goto fail;
1982 }
1983
1984 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1985 pa_log("Failed to connect to server '%s'", u->server_name);
1986 goto fail;
1987 }
1988
1989 pa_socket_client_set_callback(u->client, on_connection, u);
1990
1991 #ifdef TUNNEL_SINK
1992
1993 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1994 dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
1995
1996 pa_sink_new_data_init(&data);
1997 data.driver = __FILE__;
1998 data.module = m;
1999 data.namereg_fail = TRUE;
2000 pa_sink_new_data_set_name(&data, dn);
2001 pa_sink_new_data_set_sample_spec(&data, &ss);
2002 pa_sink_new_data_set_channel_map(&data, &map);
2003 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
2004 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2005 if (u->sink_name)
2006 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
2007
2008 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2009 pa_log("Invalid properties");
2010 pa_sink_new_data_done(&data);
2011 goto fail;
2012 }
2013
2014 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
2015 pa_sink_new_data_done(&data);
2016
2017 if (!u->sink) {
2018 pa_log("Failed to create sink.");
2019 goto fail;
2020 }
2021
2022 u->sink->parent.process_msg = sink_process_msg;
2023 u->sink->userdata = u;
2024 u->sink->set_state = sink_set_state;
2025 pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
2026 pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
2027
2028 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
2029
2030 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2031
2032 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2033 pa_sink_set_rtpoll(u->sink, u->rtpoll);
2034
2035 #else
2036
2037 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
2038 dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2039
2040 pa_source_new_data_init(&data);
2041 data.driver = __FILE__;
2042 data.module = m;
2043 data.namereg_fail = TRUE;
2044 pa_source_new_data_set_name(&data, dn);
2045 pa_source_new_data_set_sample_spec(&data, &ss);
2046 pa_source_new_data_set_channel_map(&data, &map);
2047 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2048 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2049 if (u->source_name)
2050 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2051
2052 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2053 pa_log("Invalid properties");
2054 pa_source_new_data_done(&data);
2055 goto fail;
2056 }
2057
2058 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2059 pa_source_new_data_done(&data);
2060
2061 if (!u->source) {
2062 pa_log("Failed to create source.");
2063 goto fail;
2064 }
2065
2066 u->source->parent.process_msg = source_process_msg;
2067 u->source->set_state = source_set_state;
2068 u->source->userdata = u;
2069
2070 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2071
2072 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2073 pa_source_set_rtpoll(u->source, u->rtpoll);
2074
2075 u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2076 #endif
2077
2078 pa_xfree(dn);
2079
2080 u->time_event = NULL;
2081
2082 u->maxlength = (uint32_t) -1;
2083 #ifdef TUNNEL_SINK
2084 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2085 #else
2086 u->fragsize = (uint32_t) -1;
2087 #endif
2088
2089 if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2090 pa_log("Failed to create thread.");
2091 goto fail;
2092 }
2093
2094 #ifdef TUNNEL_SINK
2095 pa_sink_put(u->sink);
2096 #else
2097 pa_source_put(u->source);
2098 #endif
2099
2100 pa_modargs_free(ma);
2101
2102 return 0;
2103
2104 fail:
2105 pa__done(m);
2106
2107 if (ma)
2108 pa_modargs_free(ma);
2109
2110 pa_xfree(dn);
2111
2112 return -1;
2113 }
2114
2115 void pa__done(pa_module*m) {
2116 struct userdata* u;
2117
2118 pa_assert(m);
2119
2120 if (!(u = m->userdata))
2121 return;
2122
2123 #ifdef TUNNEL_SINK
2124 if (u->sink)
2125 pa_sink_unlink(u->sink);
2126 #else
2127 if (u->source)
2128 pa_source_unlink(u->source);
2129 #endif
2130
2131 if (u->thread) {
2132 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2133 pa_thread_free(u->thread);
2134 }
2135
2136 pa_thread_mq_done(&u->thread_mq);
2137
2138 #ifdef TUNNEL_SINK
2139 if (u->sink)
2140 pa_sink_unref(u->sink);
2141 #else
2142 if (u->source)
2143 pa_source_unref(u->source);
2144 #endif
2145
2146 if (u->rtpoll)
2147 pa_rtpoll_free(u->rtpoll);
2148
2149 if (u->pstream) {
2150 pa_pstream_unlink(u->pstream);
2151 pa_pstream_unref(u->pstream);
2152 }
2153
2154 if (u->pdispatch)
2155 pa_pdispatch_unref(u->pdispatch);
2156
2157 if (u->client)
2158 pa_socket_client_unref(u->client);
2159
2160 if (u->auth_cookie)
2161 pa_auth_cookie_unref(u->auth_cookie);
2162
2163 if (u->smoother)
2164 pa_smoother_free(u->smoother);
2165
2166 if (u->time_event)
2167 u->core->mainloop->time_free(u->time_event);
2168
2169 #ifndef TUNNEL_SINK
2170 if (u->mcalign)
2171 pa_mcalign_free(u->mcalign);
2172 #endif
2173
2174 #ifdef TUNNEL_SINK
2175 pa_xfree(u->sink_name);
2176 #else
2177 pa_xfree(u->source_name);
2178 #endif
2179 pa_xfree(u->server_name);
2180
2181 pa_xfree(u->device_description);
2182 pa_xfree(u->server_fqdn);
2183 pa_xfree(u->user_name);
2184
2185 pa_xfree(u);
2186 }