]> code.delx.au - pulseaudio/blob - src/modules/module-combine.c
render new data always in the master sink's thread, fixing missing locking
[pulseaudio] / src / modules / module-combine.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as published
10 by the Free Software Foundation; either version 2 of the License,
11 or (at your option) any later version.
12
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21 USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <stdio.h>
29 #include <errno.h>
30
31 #include <pulse/timeval.h>
32 #include <pulse/xmalloc.h>
33
34 #include <pulsecore/macro.h>
35 #include <pulsecore/module.h>
36 #include <pulsecore/llist.h>
37 #include <pulsecore/sink.h>
38 #include <pulsecore/sink-input.h>
39 #include <pulsecore/memblockq.h>
40 #include <pulsecore/log.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/namereg.h>
44 #include <pulsecore/mutex.h>
45 #include <pulsecore/thread.h>
46 #include <pulsecore/thread-mq.h>
47 #include <pulsecore/rtpoll.h>
48 #include <pulsecore/rtclock.h>
49 #include <pulsecore/core-error.h>
50
51 #include "module-combine-symdef.h"
52
53 PA_MODULE_AUTHOR("Lennart Poettering")
54 PA_MODULE_DESCRIPTION("Combine multiple sinks to one")
55 PA_MODULE_VERSION(PACKAGE_VERSION)
56 PA_MODULE_USAGE(
57 "sink_name=<name for the sink> "
58 "master=<master sink> "
59 "slaves=<slave sinks> "
60 "adjust_time=<seconds> "
61 "resample_method=<method> "
62 "format=<sample format> "
63 "channels=<number of channels> "
64 "rate=<sample rate> "
65 "channel_map=<channel map>")
66
67 #define DEFAULT_SINK_NAME "combined"
68 #define MEMBLOCKQ_MAXLENGTH (1024*170)
69
70 #define DEFAULT_ADJUST_TIME 10
71
72 static const char* const valid_modargs[] = {
73 "sink_name",
74 "master",
75 "slaves",
76 "adjust_time",
77 "resample_method",
78 "format",
79 "channels",
80 "rate",
81 "channel_map",
82 NULL
83 };
84
85 struct output {
86 struct userdata *userdata;
87 pa_sink *sink;
88 pa_sink_input *sink_input;
89
90 pa_asyncmsgq *inq, /* Message queue from the master to this sink input */
91 *outq; /* Message queue from this sink input to the master */
92 pa_rtpoll_item *inq_rtpoll_item, *outq_rtpoll_item;
93
94 pa_memblockq *memblockq;
95
96 pa_usec_t total_latency;
97
98 PA_LLIST_FIELDS(struct output);
99 };
100
101 struct userdata {
102 pa_core *core;
103 pa_module *module;
104 pa_sink *sink;
105
106 pa_thread *thread;
107 pa_thread_mq thread_mq;
108 pa_rtpoll *rtpoll;
109
110 struct output *master;
111
112 pa_time_event *time_event;
113 uint32_t adjust_time;
114
115 int automatic;
116 size_t block_size;
117
118 struct timespec timestamp;
119
120 pa_hook_slot *sink_new_slot, *sink_unlink_slot, *sink_state_changed_slot;
121
122 pa_resample_method_t resample_method;
123
124 struct timespec adjust_timestamp;
125
126 pa_idxset* outputs; /* managed in main context */
127
128 struct {
129 PA_LLIST_HEAD(struct output, outputs); /* managed in IO thread context */
130 struct output *master;
131 } thread_info;
132 };
133
134 enum {
135 SINK_MESSAGE_ADD_OUTPUT = PA_SINK_MESSAGE_MAX,
136 SINK_MESSAGE_REMOVE_OUTPUT,
137 SINK_MESSAGE_NEED
138 };
139
140 enum {
141 SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX
142 };
143
144 static void output_free(struct output *o);
145 static int output_create_sink_input(struct userdata *u, struct output *o);
146 static int update_master(struct userdata *u, struct output *o);
147 static int pick_master(struct userdata *u);
148
149 static void adjust_rates(struct userdata *u) {
150 struct output *o;
151 pa_usec_t max_sink_latency = 0, min_total_latency = (pa_usec_t) -1, target_latency;
152 uint32_t base_rate;
153 uint32_t idx;
154
155 pa_assert(u);
156 pa_sink_assert_ref(u->sink);
157
158 if (pa_idxset_size(u->outputs) <= 0)
159 return;
160
161 if (!PA_SINK_OPENED(pa_sink_get_state(u->sink)))
162 return;
163
164 for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
165 uint32_t sink_latency;
166
167 if (!o->sink_input || !PA_SINK_OPENED(pa_sink_get_state(o->sink)))
168 continue;
169
170 sink_latency = o->sink_input->sink ? pa_sink_get_latency(o->sink_input->sink) : 0;
171 o->total_latency = sink_latency + pa_sink_input_get_latency(o->sink_input);
172
173 if (sink_latency > max_sink_latency)
174 max_sink_latency = sink_latency;
175
176 if (o->total_latency < min_total_latency)
177 min_total_latency = o->total_latency;
178 }
179
180 if (min_total_latency == (pa_usec_t) -1)
181 return;
182
183 target_latency = max_sink_latency > min_total_latency ? max_sink_latency : min_total_latency;
184
185 pa_log_info("[%s] target latency is %0.0f usec.", u->sink->name, (float) target_latency);
186 pa_log_info("[%s] master is %s", u->sink->name, u->master->sink->description);
187
188 base_rate = u->sink->sample_spec.rate;
189
190 for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
191 uint32_t r = base_rate;
192
193 if (!o->sink_input || !PA_SINK_OPENED(pa_sink_get_state(o->sink)))
194 continue;
195
196 if (o->total_latency < target_latency)
197 r -= (uint32_t) (((((double) target_latency - o->total_latency))/u->adjust_time)*r/ 1000000);
198 else if (o->total_latency > target_latency)
199 r += (uint32_t) (((((double) o->total_latency - target_latency))/u->adjust_time)*r/ 1000000);
200
201 if (r < (uint32_t) (base_rate*0.9) || r > (uint32_t) (base_rate*1.1)) {
202 pa_log_warn("[%s] sample rates too different, not adjusting (%u vs. %u).", o->sink_input->name, base_rate, r);
203 pa_sink_input_set_rate(o->sink_input, base_rate);
204 } else {
205 pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f; latency is %0.0f usec.", o->sink_input->name, r, (double) r / base_rate, (float) o->total_latency);
206 pa_sink_input_set_rate(o->sink_input, r);
207 }
208 }
209 }
210
211 static void time_callback(pa_mainloop_api*a, pa_time_event* e, const struct timeval *tv, void *userdata) {
212 struct userdata *u = userdata;
213 struct timeval n;
214
215 pa_assert(u);
216 pa_assert(a);
217 pa_assert(u->time_event == e);
218
219 adjust_rates(u);
220
221 pa_gettimeofday(&n);
222 n.tv_sec += u->adjust_time;
223 u->sink->core->mainloop->time_restart(e, &n);
224 }
225
226 static void thread_func(void *userdata) {
227 struct userdata *u = userdata;
228
229 pa_assert(u);
230
231 pa_log_debug("Thread starting up");
232
233 pa_thread_mq_install(&u->thread_mq);
234 pa_rtpoll_install(u->rtpoll);
235
236 pa_rtclock_get(&u->timestamp);
237
238 /* This is only run when we are in NULL mode, to make sure that
239 * playback doesn't stop. In all other cases we hook our stuff
240 * into the master sink. */
241
242 for (;;) {
243 int ret;
244
245 /* Render some data and drop it immediately */
246 if (u->sink->thread_info.state == PA_SINK_RUNNING) {
247 struct timespec now;
248
249 pa_rtclock_get(&now);
250
251 if (pa_timespec_cmp(&u->timestamp, &now) <= 0) {
252 pa_sink_skip(u->sink, u->block_size);
253 pa_timespec_add(&u->timestamp, pa_bytes_to_usec(u->block_size, &u->sink->sample_spec));
254 }
255
256 pa_rtpoll_set_timer_absolute(u->rtpoll, &u->timestamp);
257 } else
258 pa_rtpoll_set_timer_disabled(u->rtpoll);
259
260 /* Hmm, nothing to do. Let's sleep */
261 if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
262 goto fail;
263
264 if (ret == 0)
265 goto finish;
266 }
267
268 fail:
269 /* If this was no regular exit from the loop we have to continue
270 * processing messages until we received PA_MESSAGE_SHUTDOWN */
271 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
272 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
273
274 finish:
275 pa_log_debug("Thread shutting down");
276 }
277
278 static void render_memblock(struct userdata *u, struct output *o, size_t length) {
279 pa_assert(u);
280 pa_assert(o);
281
282 if (!PA_SINK_OPENED(u->sink->thread_info.state))
283 return;
284
285 /* We are run by the master output (u->master), possibly on behalf
286 * of another output (o). The other output is waiting for us,
287 * hence it is safe to access its mainblockq directly. */
288
289 /* Maybe there's some data in the requesting output's queue
290 * now? */
291 while (pa_asyncmsgq_process_one(o->inq) > 0)
292 ;
293
294 /* Ok, now let's prepare some data if we really have to */
295 while (!pa_memblockq_is_readable(o->memblockq)) {
296 struct output *j;
297 pa_memchunk chunk;
298
299 /* Render data! */
300 pa_sink_render(u->sink, length, &chunk);
301
302 /* OK, let's send this data to the other threads */
303 for (j = o->userdata->thread_info.outputs; j; j = j->next)
304
305 /* Send to other outputs, which are not the requesting
306 * one, and not the master */
307
308 if (j != o && j != u->master && j->sink_input)
309 pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
310
311 /* Now push it into the master queue */
312 pa_memblockq_push_align(u->master->memblockq, &chunk);
313
314 /* And into the requesting output's queue */
315 if (o != u->master)
316 pa_memblockq_push_align(o->memblockq, &chunk);
317
318 pa_memblock_unref(chunk.memblock);
319 }
320 }
321
322 static void request_memblock(struct output *o, size_t length) {
323 pa_assert(o);
324 pa_sink_input_assert_ref(o->sink_input);
325 pa_sink_assert_ref(o->userdata->sink);
326
327 /* If another thread already prepared some data we received
328 * the data over the asyncmsgq, hence let's first process
329 * it. */
330 while (pa_asyncmsgq_process_one(o->inq) > 0)
331 ;
332
333 /* Check whether we're now readable */
334 if (pa_memblockq_is_readable(o->memblockq))
335 return;
336
337 /* OK, we need to prepare new data */
338
339 if (o == o->userdata->master)
340 /* OK, we're the master, so let's render some data */
341 render_memblock(o->userdata, o, length);
342
343 else
344 /* We're not the master, we need to ask the master to do the
345 * rendering for us */
346
347 pa_asyncmsgq_send(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_NEED, o, length, NULL);
348 }
349
350 /* Called from I/O thread context */
351 static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
352 struct output *o;
353
354 pa_sink_input_assert_ref(i);
355 pa_assert_se(o = i->userdata);
356
357 /* If necessary, get some new data */
358 request_memblock(o, length);
359
360 return pa_memblockq_peek(o->memblockq, chunk);
361 }
362
363 /* Called from I/O thread context */
364 static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
365 struct output *o;
366
367 pa_sink_input_assert_ref(i);
368 pa_assert(length > 0);
369 pa_assert_se(o = i->userdata);
370
371 pa_memblockq_drop(o->memblockq, length);
372 }
373
374 /* Called from I/O thread context */
375 static void sink_input_attach_cb(pa_sink_input *i) {
376 struct output *o;
377
378 pa_sink_input_assert_ref(i);
379 pa_assert_se(o = i->userdata);
380
381 pa_assert(!o->inq_rtpoll_item);
382
383 if (o->userdata->master == o) {
384 struct output *k;
385
386 pa_assert(!o->outq_rtpoll_item);
387
388 /* Set up the queues from the outputs to the master */
389 for (k = o->userdata->thread_info.outputs; k; k = k->next) {
390
391 pa_assert(!k->outq_rtpoll_item);
392
393 if (o == k)
394 continue;
395
396 k->outq_rtpoll_item = pa_rtpoll_item_new_asyncmsgq(
397 i->sink->rtpoll,
398 PA_RTPOLL_EARLY+1, /* This one has a slightly lower priority than the normal message handling */
399 k->outq);
400 }
401
402 /* Calling these two functions here is safe, because both
403 * threads that might access this sink are known to be
404 * waiting for us. */
405 pa_sink_set_asyncmsgq(o->userdata->sink, i->sink->asyncmsgq);
406 pa_sink_set_rtpoll(o->userdata->sink, i->sink->rtpoll);
407 pa_sink_attach_within_thread(o->userdata->sink);
408 }
409
410 /* Set up the queues from the inputs to the master */
411 o->inq_rtpoll_item = pa_rtpoll_item_new_asyncmsgq(
412 i->sink->rtpoll,
413 PA_RTPOLL_NORMAL, /* This one has a lower priority than the normal message handling */
414 o->inq);
415 }
416
417 /* Called from I/O thread context */
418 static void sink_input_detach_cb(pa_sink_input *i) {
419 struct output *o;
420
421 pa_sink_input_assert_ref(i);
422 pa_assert_se(o = i->userdata);
423
424 pa_assert(o->inq_rtpoll_item);
425 pa_rtpoll_item_free(o->inq_rtpoll_item);
426 o->inq_rtpoll_item = NULL;
427
428 if (o->userdata->master == o) {
429 struct output *k;
430
431 pa_sink_detach_within_thread(o->userdata->sink);
432
433 for (k = o->userdata->thread_info.outputs; k; k = k->next) {
434
435 if (o == k)
436 continue;
437
438 pa_assert(k->outq_rtpoll_item);
439 pa_rtpoll_item_free(k->outq_rtpoll_item);
440 k->outq_rtpoll_item = NULL;
441 }
442 }
443 }
444
445 /* Called from main context */
446 static void sink_input_kill_cb(pa_sink_input *i) {
447 struct output *o;
448
449 pa_sink_input_assert_ref(i);
450 o = i->userdata;
451 pa_assert(o);
452
453 pa_sink_input_unlink(o->sink_input);
454 pa_sink_input_unref(o->sink_input);
455 o->sink_input = NULL;
456
457 pa_module_unload_request(o->userdata->module);
458 }
459
460 /* Called from thread context */
461 static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
462 struct output *o = PA_SINK_INPUT(obj)->userdata;
463
464 switch (code) {
465
466 case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
467 pa_usec_t *r = data;
468
469 *r = pa_bytes_to_usec(pa_memblockq_get_length(o->memblockq), &o->sink_input->sample_spec);
470
471 /* Fall through, the default handler will add in the extra
472 * latency added by the resampler */
473 break;
474 }
475
476 case SINK_INPUT_MESSAGE_POST: {
477
478 if (PA_SINK_OPENED(o->sink_input->sink->thread_info.state))
479 pa_memblockq_push_align(o->memblockq, chunk);
480 else
481 pa_memblockq_flush(o->memblockq);
482
483 break;
484 }
485 }
486
487 return pa_sink_input_process_msg(obj, code, data, offset, chunk);
488 }
489
490 /* Called from main context */
491 static int suspend(struct userdata *u) {
492 struct output *o;
493 uint32_t idx;
494
495 pa_assert(u);
496
497 /* Let's suspend by unlinking all streams */
498
499 for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
500
501 if (o->sink_input) {
502 pa_sink_input_unlink(o->sink_input);
503 pa_sink_input_unref(o->sink_input);
504 o->sink_input = NULL;
505 }
506 }
507
508 if (pick_master(u) < 0)
509 pa_module_unload_request(u->module);
510
511 pa_log_info("Device suspended...");
512
513 return 0;
514 }
515
516 /* Called from main context */
517 static int unsuspend(struct userdata *u) {
518 struct output *o;
519 uint32_t idx;
520
521 pa_assert(u);
522
523 /* Let's resume */
524
525 for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
526
527 pa_sink_suspend(o->sink, 0);
528
529 if (PA_SINK_OPENED(pa_sink_get_state(o->sink))) {
530 if (output_create_sink_input(u, o) < 0)
531 output_free(o);
532 else
533 pa_sink_input_put(o->sink_input);
534 }
535 }
536
537 if (pick_master(u) < 0)
538 pa_module_unload_request(u->module);
539
540 pa_log_info("Resumed successfully...");
541 return 0;
542 }
543
544 /* Called from main context */
545 static int sink_set_state(pa_sink *sink, pa_sink_state_t state) {
546 struct userdata *u;
547
548 pa_sink_assert_ref(sink);
549 pa_assert_se(u = sink->userdata);
550
551 /* Please note that in contrast to the ALSA modules we call
552 * suspend/unsuspend from main context here! */
553
554 switch (state) {
555 case PA_SINK_SUSPENDED:
556 pa_assert(PA_SINK_OPENED(pa_sink_get_state(u->sink)));
557
558 if (suspend(u) < 0)
559 return -1;
560
561 break;
562
563 case PA_SINK_IDLE:
564 case PA_SINK_RUNNING:
565
566 if (pa_sink_get_state(u->sink) == PA_SINK_SUSPENDED) {
567 if (unsuspend(u) < 0)
568 return -1;
569 }
570
571 break;
572
573 case PA_SINK_UNLINKED:
574 case PA_SINK_INIT:
575 ;
576 }
577
578 return 0;
579 }
580
581 /* Called from thread context of the master */
582 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
583 struct userdata *u = PA_SINK(o)->userdata;
584
585 switch (code) {
586
587 case PA_SINK_MESSAGE_SET_STATE:
588
589 if ((pa_sink_state_t) PA_PTR_TO_UINT(data) == PA_SINK_RUNNING) {
590 /* Only useful when running in NULL mode, i.e. when no
591 * master sink is attached */
592 pa_rtclock_get(&u->timestamp);
593 }
594
595 break;
596
597 case PA_SINK_MESSAGE_GET_LATENCY: {
598 struct timespec now;
599
600 /* This code will only be called when running in NULL
601 * mode, i.e. when no master sink is attached. See
602 * sink_get_latency_cb() below */
603 pa_rtclock_get(&now);
604
605 if (pa_timespec_cmp(&u->timestamp, &now) > 0)
606 *((pa_usec_t*) data) = 0;
607 else
608 *((pa_usec_t*) data) = pa_timespec_diff(&u->timestamp, &now);
609 break;
610 }
611
612 case PA_SINK_MESSAGE_DETACH:
613
614 /* We're detaching all our input streams artificially, so
615 * that we can drive our sink from a different sink */
616
617 u->thread_info.master = NULL;
618 break;
619
620 case PA_SINK_MESSAGE_ATTACH:
621
622 /* We're attached all our input streams artificially again */
623
624 u->thread_info.master = data;
625 break;
626
627 case SINK_MESSAGE_ADD_OUTPUT:
628 PA_LLIST_PREPEND(struct output, u->thread_info.outputs, (struct output*) data);
629 break;
630
631 case SINK_MESSAGE_REMOVE_OUTPUT:
632 PA_LLIST_REMOVE(struct output, u->thread_info.outputs, (struct output*) data);
633 break;
634
635 case SINK_MESSAGE_NEED:
636 render_memblock(u, data, (size_t) offset);
637 break;
638 }
639
640 return pa_sink_process_msg(o, code, data, offset, chunk);
641 }
642
643 /* Called from main context */
644 static pa_usec_t sink_get_latency_cb(pa_sink *s) {
645 struct userdata *u;
646
647 pa_sink_assert_ref(s);
648 u = s->userdata;
649 pa_assert(u);
650
651 if (u->master) {
652 /* If we have a master sink, we just return the latency of it
653 * and add our own buffering on top */
654
655 if (!u->master->sink_input)
656 return 0;
657
658 return
659 pa_sink_input_get_latency(u->master->sink_input) +
660 pa_sink_get_latency(u->master->sink_input->sink);
661
662 } else {
663 pa_usec_t usec;
664
665 /* We have no master, hence let's ask our own thread which
666 * implements the NULL sink */
667
668 if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
669 return 0;
670
671 return usec;
672 }
673 }
674
675 static void update_description(struct userdata *u) {
676 int first = 1;
677 char *t;
678 struct output *o;
679 uint32_t idx;
680
681 pa_assert(u);
682
683 if (pa_idxset_isempty(u->outputs)) {
684 pa_sink_set_description(u->sink, "Simultaneous output");
685 return;
686 }
687
688 t = pa_xstrdup("Simultaneous output to");
689
690 for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
691 char *e;
692
693 if (first) {
694 e = pa_sprintf_malloc("%s %s", t, o->sink->description);
695 first = 0;
696 } else
697 e = pa_sprintf_malloc("%s, %s", t, o->sink->description);
698
699 pa_xfree(t);
700 t = e;
701 }
702
703 pa_sink_set_description(u->sink, t);
704 pa_xfree(t);
705 }
706
707 static int update_master(struct userdata *u, struct output *o) {
708 pa_assert(u);
709
710 /* Make sure everything is detached from the old thread before we move our stuff to a new thread */
711 if (u->sink && PA_SINK_LINKED(pa_sink_get_state(u->sink)))
712 pa_sink_detach(u->sink);
713
714 if (o) {
715 /* If we have a master sink we run our own sink in its thread */
716
717 pa_assert(o->sink_input);
718 pa_assert(PA_SINK_OPENED(pa_sink_get_state(o->sink)));
719
720 if (u->thread) {
721 /* If we previously were in NULL mode, let's kill the thread */
722 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
723 pa_thread_free(u->thread);
724 u->thread = NULL;
725
726 pa_assert(u->rtpoll);
727 pa_rtpoll_free(u->rtpoll);
728 u->rtpoll = NULL;
729 }
730
731 pa_sink_set_asyncmsgq(u->sink, o->sink->asyncmsgq);
732 pa_sink_set_rtpoll(u->sink, o->sink->rtpoll);
733 u->master = o;
734
735 pa_log_info("Master sink is now '%s'", o->sink_input->sink->name);
736
737 } else {
738
739 /* We have no master sink, let's create our own thread */
740
741 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
742 u->master = NULL;
743
744 if (!u->thread) {
745 pa_assert(!u->rtpoll);
746
747 u->rtpoll = pa_rtpoll_new();
748 pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq);
749
750 pa_sink_set_rtpoll(u->sink, u->rtpoll);
751
752 if (!(u->thread = pa_thread_new(thread_func, u))) {
753 pa_log("Failed to create thread.");
754 return -1;
755 }
756 }
757
758 pa_log_info("No suitable master sink found, going to NULL mode\n");
759 }
760
761 /* Now attach everything again */
762 if (u->sink && PA_SINK_LINKED(pa_sink_get_state(u->sink)))
763 pa_sink_attach(u->sink);
764
765 return 0;
766 }
767
768 static int pick_master(struct userdata *u) {
769 struct output *o;
770 uint32_t idx;
771 pa_assert(u);
772
773 if (u->master && u->master->sink_input && PA_SINK_OPENED(pa_sink_get_state(u->master->sink)))
774 return update_master(u, u->master);
775
776 for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx))
777 if (o->sink_input && PA_SINK_OPENED(pa_sink_get_state(o->sink)))
778 return update_master(u, o);
779
780 return update_master(u, NULL);
781 }
782
783 static int output_create_sink_input(struct userdata *u, struct output *o) {
784 pa_sink_input_new_data data;
785 char *t;
786
787 pa_assert(u);
788 pa_assert(!o->sink_input);
789
790 t = pa_sprintf_malloc("Simultaneous output on %s", o->sink->description);
791
792 pa_sink_input_new_data_init(&data);
793 data.sink = o->sink;
794 data.driver = __FILE__;
795 data.name = t;
796 pa_sink_input_new_data_set_sample_spec(&data, &u->sink->sample_spec);
797 pa_sink_input_new_data_set_channel_map(&data, &u->sink->channel_map);
798 data.module = u->module;
799 data.resample_method = u->resample_method;
800
801 o->sink_input = pa_sink_input_new(u->core, &data, PA_SINK_INPUT_VARIABLE_RATE|PA_SINK_INPUT_DONT_MOVE);
802
803 pa_xfree(t);
804
805 if (!o->sink_input)
806 return -1;
807
808 o->sink_input->parent.process_msg = sink_input_process_msg;
809 o->sink_input->peek = sink_input_peek_cb;
810 o->sink_input->drop = sink_input_drop_cb;
811 o->sink_input->attach = sink_input_attach_cb;
812 o->sink_input->detach = sink_input_detach_cb;
813 o->sink_input->kill = sink_input_kill_cb;
814 o->sink_input->userdata = o;
815
816 return 0;
817 }
818
819 static struct output *output_new(struct userdata *u, pa_sink *sink) {
820 struct output *o;
821
822 pa_assert(u);
823 pa_assert(sink);
824 pa_assert(u->sink);
825
826 o = pa_xnew(struct output, 1);
827 o->userdata = u;
828 o->inq = pa_asyncmsgq_new(0);
829 o->outq = pa_asyncmsgq_new(0);
830 o->inq_rtpoll_item = NULL;
831 o->outq_rtpoll_item = NULL;
832 o->sink = sink;
833 o->sink_input = NULL;
834 o->memblockq = pa_memblockq_new(
835 0,
836 MEMBLOCKQ_MAXLENGTH,
837 MEMBLOCKQ_MAXLENGTH,
838 pa_frame_size(&u->sink->sample_spec),
839 1,
840 0,
841 NULL);
842
843
844 pa_assert_se(pa_idxset_put(u->outputs, o, NULL) == 0);
845
846 update_description(u);
847
848 if (u->sink && PA_SINK_LINKED(pa_sink_get_state(u->sink)))
849 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_ADD_OUTPUT, o, 0, NULL);
850 else
851 PA_LLIST_PREPEND(struct output, u->thread_info.outputs, o);
852
853 if (PA_SINK_OPENED(pa_sink_get_state(u->sink)) || pa_sink_get_state(u->sink) == PA_SINK_INIT) {
854 pa_sink_suspend(sink, 0);
855
856 if (PA_SINK_OPENED(pa_sink_get_state(sink)))
857 if (output_create_sink_input(u, o) < 0)
858 goto fail;
859 }
860
861 return o;
862
863 fail:
864
865 if (o) {
866 if (o->sink_input) {
867 pa_sink_input_unlink(o->sink_input);
868 pa_sink_input_unref(o->sink_input);
869 }
870
871 if (o->memblockq)
872 pa_memblockq_free(o->memblockq);
873
874 if (o->inq)
875 pa_asyncmsgq_unref(o->inq);
876
877 if (o->outq)
878 pa_asyncmsgq_unref(o->outq);
879
880 pa_xfree(o);
881 }
882
883 return NULL;
884 }
885
886 static pa_hook_result_t sink_new_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
887 struct output *o;
888
889 pa_core_assert_ref(c);
890 pa_sink_assert_ref(s);
891 pa_assert(u);
892 pa_assert(u->automatic);
893
894 if (!(s->flags & PA_SINK_HARDWARE) || s == u->sink)
895 return PA_HOOK_OK;
896
897 pa_log_info("Configuring new sink: %s", s->name);
898
899 if (!(o = output_new(u, s))) {
900 pa_log("Failed to create sink input on sink '%s'.", s->name);
901 return PA_HOOK_OK;
902 }
903
904 if (pick_master(u) < 0)
905 pa_module_unload_request(u->module);
906
907 if (o->sink_input)
908 pa_sink_input_put(o->sink_input);
909
910 return PA_HOOK_OK;
911 }
912
913 static pa_hook_result_t sink_unlink_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
914 struct output *o;
915 uint32_t idx;
916
917 pa_assert(c);
918 pa_sink_assert_ref(s);
919 pa_assert(u);
920
921 if (s == u->sink)
922 return PA_HOOK_OK;
923
924 for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx))
925 if (o->sink == s)
926 break;
927
928 if (!o)
929 return PA_HOOK_OK;
930
931 pa_log_info("Unconfiguring sink: %s", s->name);
932
933 output_free(o);
934
935 if (pick_master(u) < 0)
936 pa_module_unload_request(u->module);
937
938 return PA_HOOK_OK;
939 }
940
941 static pa_hook_result_t sink_state_changed_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
942 struct output *o;
943 uint32_t idx;
944 pa_sink_state_t state;
945
946 if (s == u->sink)
947 return PA_HOOK_OK;
948
949 for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx))
950 if (o->sink == s)
951 break;
952
953 if (!o)
954 return PA_HOOK_OK;
955
956 state = pa_sink_get_state(s);
957
958 if (PA_SINK_OPENED(state) && PA_SINK_OPENED(pa_sink_get_state(u->sink)) && !o->sink_input) {
959 output_create_sink_input(u, o);
960
961 if (pick_master(u) < 0)
962 pa_module_unload_request(u->module);
963
964 if (o->sink_input)
965 pa_sink_input_put(o->sink_input);
966 }
967
968 if (state == PA_SINK_SUSPENDED && o->sink_input) {
969 pa_sink_input_unlink(o->sink_input);
970 pa_sink_input_unref(o->sink_input);
971 o->sink_input = NULL;
972
973 pa_memblockq_flush(o->memblockq);
974
975 if (pick_master(u) < 0)
976 pa_module_unload_request(u->module);
977 }
978
979 return PA_HOOK_OK;
980 }
981
982 int pa__init(pa_module*m) {
983 struct userdata *u;
984 pa_modargs *ma = NULL;
985 const char *master_name, *slaves, *rm;
986 pa_sink *master_sink = NULL;
987 int resample_method = PA_RESAMPLER_TRIVIAL;
988 pa_sample_spec ss;
989 pa_channel_map map;
990 struct output *o;
991 uint32_t idx;
992
993 pa_assert(m);
994
995 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
996 pa_log("failed to parse module arguments");
997 goto fail;
998 }
999
1000 if ((rm = pa_modargs_get_value(ma, "resample_method", NULL))) {
1001 if ((resample_method = pa_parse_resample_method(rm)) < 0) {
1002 pa_log("invalid resample method '%s'", rm);
1003 goto fail;
1004 }
1005 }
1006
1007 u = pa_xnew(struct userdata, 1);
1008 u->core = m->core;
1009 u->module = m;
1010 m->userdata = u;
1011 u->sink = NULL;
1012 u->thread_info.master = u->master = NULL;
1013 u->time_event = NULL;
1014 u->adjust_time = DEFAULT_ADJUST_TIME;
1015 pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
1016 u->rtpoll = NULL;
1017 u->thread = NULL;
1018 PA_LLIST_HEAD_INIT(struct output, u->thread_info.outputs);
1019 u->resample_method = resample_method;
1020 u->outputs = pa_idxset_new(NULL, NULL);
1021 pa_timespec_reset(&u->adjust_timestamp);
1022
1023 if (pa_modargs_get_value_u32(ma, "adjust_time", &u->adjust_time) < 0) {
1024 pa_log("Failed to parse adjust_time value");
1025 goto fail;
1026 }
1027
1028 master_name = pa_modargs_get_value(ma, "master", NULL);
1029 slaves = pa_modargs_get_value(ma, "slaves", NULL);
1030 if (!master_name != !slaves) {
1031 pa_log("No master or slave sinks specified");
1032 goto fail;
1033 }
1034
1035 if (master_name) {
1036 if (!(master_sink = pa_namereg_get(m->core, master_name, PA_NAMEREG_SINK, 1))) {
1037 pa_log("Invalid master sink '%s'", master_name);
1038 goto fail;
1039 }
1040
1041 ss = master_sink->sample_spec;
1042 u->automatic = 0;
1043 } else {
1044 master_sink = NULL;
1045 ss = m->core->default_sample_spec;
1046 u->automatic = 1;
1047 }
1048
1049 if ((pa_modargs_get_sample_spec(ma, &ss) < 0)) {
1050 pa_log("Invalid sample specification.");
1051 goto fail;
1052 }
1053
1054 if (master_sink && ss.channels == master_sink->sample_spec.channels)
1055 map = master_sink->channel_map;
1056 else
1057 pa_channel_map_init_auto(&map, ss.channels, PA_CHANNEL_MAP_DEFAULT);
1058
1059 if ((pa_modargs_get_channel_map(ma, NULL, &map) < 0)) {
1060 pa_log("Invalid channel map.");
1061 goto fail;
1062 }
1063
1064 if (ss.channels != map.channels) {
1065 pa_log("Channel map and sample specification don't match.");
1066 goto fail;
1067 }
1068
1069 if (!(u->sink = pa_sink_new(m->core, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, &map))) {
1070 pa_log("Failed to create sink");
1071 goto fail;
1072 }
1073
1074 u->sink->parent.process_msg = sink_process_msg;
1075 u->sink->get_latency = sink_get_latency_cb;
1076 u->sink->set_state = sink_set_state;
1077 u->sink->userdata = u;
1078
1079 u->sink->flags = PA_SINK_CAN_SUSPEND|PA_SINK_LATENCY;
1080 pa_sink_set_module(u->sink, m);
1081 pa_sink_set_description(u->sink, "Simultaneous output");
1082
1083 u->block_size = pa_bytes_per_second(&ss) / 20; /* 50 ms */
1084 if (u->block_size <= 0)
1085 u->block_size = pa_frame_size(&ss);
1086
1087 if (!u->automatic) {
1088 const char*split_state;
1089 char *n = NULL;
1090 pa_assert(slaves);
1091
1092 /* The master and slaves have been specified manually */
1093
1094 if (!(u->master = output_new(u, master_sink))) {
1095 pa_log("Failed to create master sink input on sink '%s'.", master_sink->name);
1096 goto fail;
1097 }
1098
1099 split_state = NULL;
1100 while ((n = pa_split(slaves, ",", &split_state))) {
1101 pa_sink *slave_sink;
1102
1103 if (!(slave_sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK, 1)) || slave_sink == u->sink) {
1104 pa_log("Invalid slave sink '%s'", n);
1105 pa_xfree(n);
1106 goto fail;
1107 }
1108
1109 pa_xfree(n);
1110
1111 if (!output_new(u, slave_sink)) {
1112 pa_log("Failed to create slave sink input on sink '%s'.", slave_sink->name);
1113 goto fail;
1114 }
1115 }
1116
1117 if (pa_idxset_size(u->outputs) <= 1)
1118 pa_log_warn("No slave sinks specified.");
1119
1120 u->sink_new_slot = NULL;
1121
1122 } else {
1123 pa_sink *s;
1124
1125 /* We're in automatic mode, we elect one hw sink to the master
1126 * and attach all other hw sinks as slaves to it */
1127
1128 for (s = pa_idxset_first(m->core->sinks, &idx); s; s = pa_idxset_next(m->core->sinks, &idx)) {
1129
1130 if (!(s->flags & PA_SINK_HARDWARE) || s == u->sink)
1131 continue;
1132
1133 if (!output_new(u, s)) {
1134 pa_log("Failed to create sink input on sink '%s'.", s->name);
1135 goto fail;
1136 }
1137 }
1138
1139 u->sink_new_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_NEW_POST], (pa_hook_cb_t) sink_new_hook_cb, u);
1140 }
1141
1142 u->sink_unlink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_UNLINK], (pa_hook_cb_t) sink_unlink_hook_cb, u);
1143 u->sink_state_changed_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_STATE_CHANGED], (pa_hook_cb_t) sink_state_changed_hook_cb, u);
1144
1145 if (pick_master(u) < 0)
1146 goto fail;
1147
1148 /* Activate the sink and the sink inputs */
1149 pa_sink_put(u->sink);
1150
1151 for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx))
1152 if (o->sink_input)
1153 pa_sink_input_put(o->sink_input);
1154
1155 if (u->adjust_time > 0) {
1156 struct timeval tv;
1157 pa_gettimeofday(&tv);
1158 tv.tv_sec += u->adjust_time;
1159 u->time_event = m->core->mainloop->time_new(m->core->mainloop, &tv, time_callback, u);
1160 }
1161
1162 pa_modargs_free(ma);
1163
1164 return 0;
1165
1166 fail:
1167
1168 if (ma)
1169 pa_modargs_free(ma);
1170
1171 pa__done(m);
1172
1173 return -1;
1174 }
1175
1176 static void output_free(struct output *o) {
1177 pa_assert(o);
1178
1179 if (o->userdata) {
1180 if (o->userdata->sink && PA_SINK_LINKED(pa_sink_get_state(o->userdata->sink)))
1181 pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
1182 else
1183 PA_LLIST_REMOVE(struct output, o->userdata->thread_info.outputs, o);
1184 }
1185
1186 pa_assert_se(pa_idxset_remove_by_data(o->userdata->outputs, o, NULL));
1187
1188 if (o->userdata->master == o) {
1189 /* Make sure the master points to a different output */
1190 o->userdata->master = NULL;
1191 pick_master(o->userdata);
1192 }
1193
1194 update_description(o->userdata);
1195
1196 if (o->sink_input) {
1197 pa_sink_input_unlink(o->sink_input);
1198 pa_sink_input_unref(o->sink_input);
1199 }
1200
1201 if (o->inq_rtpoll_item)
1202 pa_rtpoll_item_free(o->inq_rtpoll_item);
1203
1204 if (o->outq_rtpoll_item)
1205 pa_rtpoll_item_free(o->outq_rtpoll_item);
1206
1207 if (o->inq)
1208 pa_asyncmsgq_unref(o->inq);
1209
1210 if (o->outq)
1211 pa_asyncmsgq_unref(o->outq);
1212
1213 if (o->memblockq)
1214 pa_memblockq_free(o->memblockq);
1215
1216 pa_xfree(o);
1217 }
1218
1219 void pa__done(pa_module*m) {
1220 struct userdata *u;
1221 struct output *o;
1222
1223 pa_assert(m);
1224
1225 if (!(u = m->userdata))
1226 return;
1227
1228 if (u->sink_new_slot)
1229 pa_hook_slot_free(u->sink_new_slot);
1230
1231 if (u->sink_unlink_slot)
1232 pa_hook_slot_free(u->sink_unlink_slot);
1233
1234 if (u->sink_state_changed_slot)
1235 pa_hook_slot_free(u->sink_state_changed_slot);
1236
1237 if (u->sink)
1238 pa_sink_unlink(u->sink);
1239
1240 if (u->outputs) {
1241 while ((o = pa_idxset_first(u->outputs, NULL)))
1242 output_free(o);
1243
1244 pa_idxset_free(u->outputs, NULL, NULL);
1245 }
1246
1247 if (u->thread) {
1248 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1249 pa_thread_free(u->thread);
1250 }
1251
1252 pa_thread_mq_done(&u->thread_mq);
1253
1254 if (u->sink)
1255 pa_sink_unref(u->sink);
1256
1257 if (u->rtpoll)
1258 pa_rtpoll_free(u->rtpoll);
1259
1260 if (u->time_event)
1261 u->core->mainloop->time_free(u->time_event);
1262
1263 pa_xfree(u);
1264 }
1265
1266