]> code.delx.au - pulseaudio/blob - src/modules/echo-cancel/module-echo-cancel.c
echo-cancel: Clarify function call contexts.
[pulseaudio] / src / modules / echo-cancel / module-echo-cancel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6 Based on module-virtual-sink.c
7 module-virtual-source.c
8 module-loopback.c
9
10 Copyright 2010 Intel Corporation
11 Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13 PulseAudio is free software; you can redistribute it and/or modify
14 it under the terms of the GNU Lesser General Public License as published
15 by the Free Software Foundation; either version 2.1 of the License,
16 or (at your option) any later version.
17
18 PulseAudio is distributed in the hope that it will be useful, but
19 WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 General Public License for more details.
22
23 You should have received a copy of the GNU Lesser General Public License
24 along with PulseAudio; if not, write to the Free Software
25 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26 USA.
27 ***/
28
29 #ifdef HAVE_CONFIG_H
30 #include <config.h>
31 #endif
32
33 #include <stdio.h>
34 #include <math.h>
35
36 #include "echo-cancel.h"
37
38 #include <pulse/xmalloc.h>
39 #include <pulse/timeval.h>
40 #include <pulse/rtclock.h>
41
42 #include <pulsecore/i18n.h>
43 #include <pulsecore/atomic.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/namereg.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-rtclock.h>
49 #include <pulsecore/core-util.h>
50 #include <pulsecore/modargs.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/rtpoll.h>
53 #include <pulsecore/sample-util.h>
54 #include <pulsecore/ltdl-helper.h>
55
56 #include "module-echo-cancel-symdef.h"
57
58 PA_MODULE_AUTHOR("Wim Taymans");
59 PA_MODULE_DESCRIPTION("Echo Cancellation");
60 PA_MODULE_VERSION(PACKAGE_VERSION);
61 PA_MODULE_LOAD_ONCE(FALSE);
62 PA_MODULE_USAGE(
63 _("source_name=<name for the source> "
64 "source_properties=<properties for the source> "
65 "source_master=<name of source to filter> "
66 "sink_name=<name for the sink> "
67 "sink_properties=<properties for the sink> "
68 "sink_master=<name of sink to filter> "
69 "adjust_time=<how often to readjust rates in s> "
70 "adjust_threshold=<how much drift to readjust after in ms> "
71 "format=<sample format> "
72 "rate=<sample rate> "
73 "channels=<number of channels> "
74 "channel_map=<channel map> "
75 "aec_method=<implementation to use> "
76 "aec_args=<parameters for the AEC engine> "
77 "save_aec=<save AEC data in /tmp> "
78 "autoloaded=<set if this module is being loaded automatically> "
79 "use_volume_sharing=<yes or no> "
80 ));
81
82 /* NOTE: Make sure the enum and ec_table are maintained in the correct order */
83 typedef enum {
84 PA_ECHO_CANCELLER_INVALID = -1,
85 #ifdef HAVE_SPEEX
86 PA_ECHO_CANCELLER_SPEEX,
87 #endif
88 #ifdef HAVE_ADRIAN_EC
89 PA_ECHO_CANCELLER_ADRIAN,
90 #endif
91 #ifdef HAVE_WEBRTC
92 PA_ECHO_CANCELLER_WEBRTC,
93 #endif
94 } pa_echo_canceller_method_t;
95
96 #ifdef HAVE_WEBRTC
97 #define DEFAULT_ECHO_CANCELLER "webrtc"
98 #else
99 #define DEFAULT_ECHO_CANCELLER "speex"
100 #endif
101
102 static const pa_echo_canceller ec_table[] = {
103 #ifdef HAVE_SPEEX
104 {
105 /* Speex */
106 .init = pa_speex_ec_init,
107 .run = pa_speex_ec_run,
108 .done = pa_speex_ec_done,
109 },
110 #endif
111 #ifdef HAVE_ADRIAN_EC
112 {
113 /* Adrian Andre's NLMS implementation */
114 .init = pa_adrian_ec_init,
115 .run = pa_adrian_ec_run,
116 .done = pa_adrian_ec_done,
117 },
118 #endif
119 #ifdef HAVE_WEBRTC
120 {
121 /* WebRTC's audio processing engine */
122 .init = pa_webrtc_ec_init,
123 .play = pa_webrtc_ec_play,
124 .record = pa_webrtc_ec_record,
125 .set_drift = pa_webrtc_ec_set_drift,
126 .run = pa_webrtc_ec_run,
127 .done = pa_webrtc_ec_done,
128 },
129 #endif
130 };
131
132 #define DEFAULT_RATE 32000
133 #define DEFAULT_CHANNELS 1
134 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
135 #define DEFAULT_ADJUST_TOLERANCE (5*PA_USEC_PER_MSEC)
136 #define DEFAULT_SAVE_AEC FALSE
137 #define DEFAULT_AUTOLOADED FALSE
138
139 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
140
141 /* Can only be used in main context */
142 #define IS_ACTIVE(u) ((pa_source_get_state((u)->source) == PA_SOURCE_RUNNING) && \
143 (pa_sink_get_state((u)->sink) == PA_SINK_RUNNING))
144
145 /* This module creates a new (virtual) source and sink.
146 *
147 * The data sent to the new sink is kept in a memblockq before being
148 * forwarded to the real sink_master.
149 *
150 * Data read from source_master is matched against the saved sink data and
151 * echo canceled data is then pushed onto the new source.
152 *
153 * Both source and sink masters have their own threads to push/pull data
154 * respectively. We however perform all our actions in the source IO thread.
155 * To do this we send all played samples to the source IO thread where they
156 * are then pushed into the memblockq.
157 *
158 * Alignment is performed in two steps:
159 *
160 * 1) when something happens that requires quick adjustment of the alignment of
161 * capture and playback samples, we perform a resync. This adjusts the
162 * position in the playback memblock to the requested sample. Quick
163 * adjustments include moving the playback samples before the capture
164 * samples (because else the echo canceler does not work) or when the
165 * playback pointer drifts too far away.
166 *
167 * 2) periodically check the difference between capture and playback. We use a
168 * low and high watermark for adjusting the alignment. Playback should always
169 * be before capture and the difference should not be bigger than one frame
170 * size. We would ideally like to resample the sink_input but most driver
171 * don't give enough accuracy to be able to do that right now.
172 */
173
174 struct userdata;
175
176 struct pa_echo_canceller_msg {
177 pa_msgobject parent;
178 struct userdata *userdata;
179 };
180
181 PA_DEFINE_PRIVATE_CLASS(pa_echo_canceller_msg, pa_msgobject);
182 #define PA_ECHO_CANCELLER_MSG(o) (pa_echo_canceller_msg_cast(o))
183
184 struct snapshot {
185 pa_usec_t sink_now;
186 pa_usec_t sink_latency;
187 size_t sink_delay;
188 int64_t send_counter;
189
190 pa_usec_t source_now;
191 pa_usec_t source_latency;
192 size_t source_delay;
193 int64_t recv_counter;
194 size_t rlen;
195 size_t plen;
196 };
197
198 struct userdata {
199 pa_core *core;
200 pa_module *module;
201
202 pa_bool_t autoloaded;
203 pa_bool_t dead;
204 pa_bool_t save_aec;
205
206 pa_echo_canceller *ec;
207 uint32_t blocksize;
208
209 pa_bool_t need_realign;
210
211 /* to wakeup the source I/O thread */
212 pa_asyncmsgq *asyncmsgq;
213 pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
214
215 pa_source *source;
216 pa_bool_t source_auto_desc;
217 pa_source_output *source_output;
218 pa_memblockq *source_memblockq; /* echo canceler needs fixed sized chunks */
219 size_t source_skip;
220
221 pa_sink *sink;
222 pa_bool_t sink_auto_desc;
223 pa_sink_input *sink_input;
224 pa_memblockq *sink_memblockq;
225 int64_t send_counter; /* updated in sink IO thread */
226 int64_t recv_counter;
227 size_t sink_skip;
228
229 /* Bytes left over from previous iteration */
230 size_t sink_rem;
231 size_t source_rem;
232
233 pa_atomic_t request_resync;
234
235 pa_time_event *time_event;
236 pa_usec_t adjust_time;
237 int adjust_threshold;
238
239 FILE *captured_file;
240 FILE *played_file;
241 FILE *canceled_file;
242 FILE *drift_file;
243
244 pa_bool_t use_volume_sharing;
245
246 struct {
247 pa_cvolume current_volume;
248 } thread_info;
249 };
250
251 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
252
253 static const char* const valid_modargs[] = {
254 "source_name",
255 "source_properties",
256 "source_master",
257 "sink_name",
258 "sink_properties",
259 "sink_master",
260 "adjust_time",
261 "adjust_threshold",
262 "format",
263 "rate",
264 "channels",
265 "channel_map",
266 "aec_method",
267 "aec_args",
268 "save_aec",
269 "autoloaded",
270 "use_volume_sharing",
271 NULL
272 };
273
274 enum {
275 SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
276 SOURCE_OUTPUT_MESSAGE_REWIND,
277 SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
278 SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
279 };
280
281 enum {
282 SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
283 };
284
285 enum {
286 ECHO_CANCELLER_MESSAGE_SET_VOLUME,
287 };
288
289 static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
290 int64_t buffer, diff_time, buffer_latency;
291
292 /* get the number of samples between capture and playback */
293 if (snapshot->plen > snapshot->rlen)
294 buffer = snapshot->plen - snapshot->rlen;
295 else
296 buffer = 0;
297
298 buffer += snapshot->source_delay + snapshot->sink_delay;
299
300 /* add the amount of samples not yet transferred to the source context */
301 if (snapshot->recv_counter <= snapshot->send_counter)
302 buffer += (int64_t) (snapshot->send_counter - snapshot->recv_counter);
303 else
304 buffer += PA_CLIP_SUB(buffer, (int64_t) (snapshot->recv_counter - snapshot->send_counter));
305
306 /* convert to time */
307 buffer_latency = pa_bytes_to_usec(buffer, &u->source_output->sample_spec);
308
309 /* capture and playback samples are perfectly aligned when diff_time is 0 */
310 diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
311 (snapshot->source_now - snapshot->source_latency);
312
313 pa_log_debug("Diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
314 (long long) snapshot->sink_latency,
315 (long long) buffer_latency, (long long) snapshot->source_latency,
316 (long long) snapshot->source_delay, (long long) snapshot->sink_delay,
317 (long long) (snapshot->send_counter - snapshot->recv_counter),
318 (long long) (snapshot->sink_now - snapshot->source_now));
319
320 return diff_time;
321 }
322
323 /* Called from main context */
324 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
325 struct userdata *u = userdata;
326 uint32_t old_rate, base_rate, new_rate;
327 int64_t diff_time;
328 /*size_t fs*/
329 struct snapshot latency_snapshot;
330
331 pa_assert(u);
332 pa_assert(a);
333 pa_assert(u->time_event == e);
334 pa_assert_ctl_context();
335
336 if (!IS_ACTIVE(u))
337 return;
338
339 /* update our snapshots */
340 pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
341 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
342
343 /* calculate drift between capture and playback */
344 diff_time = calc_diff(u, &latency_snapshot);
345
346 /*fs = pa_frame_size(&u->source_output->sample_spec);*/
347 old_rate = u->sink_input->sample_spec.rate;
348 base_rate = u->source_output->sample_spec.rate;
349
350 if (diff_time < 0) {
351 /* recording before playback, we need to adjust quickly. The echo
352 * canceler does not work in this case. */
353 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
354 NULL, diff_time, NULL, NULL);
355 /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
356 new_rate = base_rate;
357 }
358 else {
359 if (diff_time > u->adjust_threshold) {
360 /* diff too big, quickly adjust */
361 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
362 NULL, diff_time, NULL, NULL);
363 }
364
365 /* recording behind playback, we need to slowly adjust the rate to match */
366 /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
367
368 /* assume equal samplerates for now */
369 new_rate = base_rate;
370 }
371
372 /* make sure we don't make too big adjustments because that sounds horrible */
373 if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
374 new_rate = base_rate;
375
376 if (new_rate != old_rate) {
377 pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
378
379 pa_sink_input_set_rate(u->sink_input, new_rate);
380 }
381
382 pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
383 }
384
385 /* Called from source I/O thread context */
386 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
387 struct userdata *u = PA_SOURCE(o)->userdata;
388
389 switch (code) {
390
391 case PA_SOURCE_MESSAGE_GET_LATENCY:
392
393 /* The source is _put() before the source output is, so let's
394 * make sure we don't access it in that time. Also, the
395 * source output is first shut down, the source second. */
396 if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
397 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
398 *((pa_usec_t*) data) = 0;
399 return 0;
400 }
401
402 *((pa_usec_t*) data) =
403
404 /* Get the latency of the master source */
405 pa_source_get_latency_within_thread(u->source_output->source) +
406 /* Add the latency internal to our source output on top */
407 pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
408 /* and the buffering we do on the source */
409 pa_bytes_to_usec(u->blocksize, &u->source_output->source->sample_spec);
410
411 return 0;
412
413 case PA_SOURCE_MESSAGE_SET_VOLUME_SYNCED:
414 u->thread_info.current_volume = u->source->reference_volume;
415 break;
416 }
417
418 return pa_source_process_msg(o, code, data, offset, chunk);
419 }
420
421 /* Called from sink I/O thread context */
422 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
423 struct userdata *u = PA_SINK(o)->userdata;
424
425 switch (code) {
426
427 case PA_SINK_MESSAGE_GET_LATENCY:
428
429 /* The sink is _put() before the sink input is, so let's
430 * make sure we don't access it in that time. Also, the
431 * sink input is first shut down, the sink second. */
432 if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
433 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
434 *((pa_usec_t*) data) = 0;
435 return 0;
436 }
437
438 *((pa_usec_t*) data) =
439
440 /* Get the latency of the master sink */
441 pa_sink_get_latency_within_thread(u->sink_input->sink) +
442
443 /* Add the latency internal to our sink input on top */
444 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
445
446 return 0;
447 }
448
449 return pa_sink_process_msg(o, code, data, offset, chunk);
450 }
451
452
453 /* Called from main context */
454 static int source_set_state_cb(pa_source *s, pa_source_state_t state) {
455 struct userdata *u;
456
457 pa_source_assert_ref(s);
458 pa_assert_se(u = s->userdata);
459
460 if (!PA_SOURCE_IS_LINKED(state) ||
461 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
462 return 0;
463
464 if (state == PA_SOURCE_RUNNING) {
465 /* restart timer when both sink and source are active */
466 if (IS_ACTIVE(u) && u->adjust_time)
467 pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
468
469 pa_atomic_store(&u->request_resync, 1);
470 pa_source_output_cork(u->source_output, FALSE);
471 } else if (state == PA_SOURCE_SUSPENDED) {
472 pa_source_output_cork(u->source_output, TRUE);
473 }
474
475 return 0;
476 }
477
478 /* Called from main context */
479 static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state) {
480 struct userdata *u;
481
482 pa_sink_assert_ref(s);
483 pa_assert_se(u = s->userdata);
484
485 if (!PA_SINK_IS_LINKED(state) ||
486 !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
487 return 0;
488
489 if (state == PA_SINK_RUNNING) {
490 /* restart timer when both sink and source are active */
491 if (IS_ACTIVE(u) && u->adjust_time)
492 pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
493
494 pa_atomic_store(&u->request_resync, 1);
495 pa_sink_input_cork(u->sink_input, FALSE);
496 } else if (state == PA_SINK_SUSPENDED) {
497 pa_sink_input_cork(u->sink_input, TRUE);
498 }
499
500 return 0;
501 }
502
503 /* Called from source I/O thread context */
504 static void source_update_requested_latency_cb(pa_source *s) {
505 struct userdata *u;
506
507 pa_source_assert_ref(s);
508 pa_assert_se(u = s->userdata);
509
510 if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
511 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
512 return;
513
514 pa_log_debug("Source update requested latency");
515
516 /* Just hand this one over to the master source */
517 pa_source_output_set_requested_latency_within_thread(
518 u->source_output,
519 pa_source_get_requested_latency_within_thread(s));
520 }
521
522 /* Called from sink I/O thread context */
523 static void sink_update_requested_latency_cb(pa_sink *s) {
524 struct userdata *u;
525
526 pa_sink_assert_ref(s);
527 pa_assert_se(u = s->userdata);
528
529 if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
530 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
531 return;
532
533 pa_log_debug("Sink update requested latency");
534
535 /* Just hand this one over to the master sink */
536 pa_sink_input_set_requested_latency_within_thread(
537 u->sink_input,
538 pa_sink_get_requested_latency_within_thread(s));
539 }
540
541 /* Called from sink I/O thread context */
542 static void sink_request_rewind_cb(pa_sink *s) {
543 struct userdata *u;
544
545 pa_sink_assert_ref(s);
546 pa_assert_se(u = s->userdata);
547
548 if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
549 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
550 return;
551
552 pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
553
554 /* Just hand this one over to the master sink */
555 pa_sink_input_request_rewind(u->sink_input,
556 s->thread_info.rewind_nbytes, TRUE, FALSE, FALSE);
557 }
558
559 /* Called from main context */
560 static void source_set_volume_cb(pa_source *s) {
561 struct userdata *u;
562
563 pa_source_assert_ref(s);
564 pa_assert_se(u = s->userdata);
565
566 if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
567 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
568 return;
569
570 pa_source_output_set_volume(u->source_output, &s->real_volume, s->save_volume, TRUE);
571 }
572
573 /* Called from main context */
574 static void sink_set_volume_cb(pa_sink *s) {
575 struct userdata *u;
576
577 pa_sink_assert_ref(s);
578 pa_assert_se(u = s->userdata);
579
580 if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
581 !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
582 return;
583
584 pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, TRUE);
585 }
586
587 /* Called from main context. */
588 static void source_get_volume_cb(pa_source *s) {
589 struct userdata *u;
590 pa_cvolume v;
591
592 pa_source_assert_ref(s);
593 pa_assert_se(u = s->userdata);
594
595 if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
596 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
597 return;
598
599 pa_source_output_get_volume(u->source_output, &v, TRUE);
600
601 if (pa_cvolume_equal(&s->real_volume, &v))
602 /* no change */
603 return;
604
605 s->real_volume = v;
606 pa_source_set_soft_volume(s, NULL);
607 }
608
609 /* Called from main context */
610 static void source_set_mute_cb(pa_source *s) {
611 struct userdata *u;
612
613 pa_source_assert_ref(s);
614 pa_assert_se(u = s->userdata);
615
616 if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
617 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
618 return;
619
620 pa_source_output_set_mute(u->source_output, s->muted, s->save_muted);
621 }
622
623 /* Called from main context */
624 static void sink_set_mute_cb(pa_sink *s) {
625 struct userdata *u;
626
627 pa_sink_assert_ref(s);
628 pa_assert_se(u = s->userdata);
629
630 if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
631 !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
632 return;
633
634 pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
635 }
636
637 /* Called from main context */
638 static void source_get_mute_cb(pa_source *s) {
639 struct userdata *u;
640
641 pa_source_assert_ref(s);
642 pa_assert_se(u = s->userdata);
643
644 if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
645 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
646 return;
647
648 pa_source_output_get_mute(u->source_output);
649 }
650
651 /* Called from source I/O thread context. */
652 static void apply_diff_time(struct userdata *u, int64_t diff_time) {
653 int64_t diff;
654
655 if (diff_time < 0) {
656 diff = pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec);
657
658 if (diff > 0) {
659 /* add some extra safety samples to compensate for jitter in the
660 * timings */
661 diff += 10 * pa_frame_size (&u->source_output->sample_spec);
662
663 pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
664
665 u->sink_skip = diff;
666 u->source_skip = 0;
667 }
668 } else if (diff_time > 0) {
669 diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
670
671 if (diff > 0) {
672 pa_log("Playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
673
674 u->source_skip = diff;
675 u->sink_skip = 0;
676 }
677 }
678 }
679
680 /* Called from source I/O thread context. */
681 static void do_resync(struct userdata *u) {
682 int64_t diff_time;
683 struct snapshot latency_snapshot;
684
685 pa_log("Doing resync");
686
687 /* update our snapshot */
688 source_output_snapshot_within_thread(u, &latency_snapshot);
689 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
690
691 /* calculate drift between capture and playback */
692 diff_time = calc_diff(u, &latency_snapshot);
693
694 /* and adjust for the drift */
695 apply_diff_time(u, diff_time);
696 }
697
698 /* 1. Calculate drift at this point, pass to canceller
699 * 2. Push out playback samples in blocksize chunks
700 * 3. Push out capture samples in blocksize chunks
701 * 4. ???
702 * 5. Profit
703 *
704 * Called from source I/O thread context.
705 */
706 static void do_push_drift_comp(struct userdata *u) {
707 size_t rlen, plen;
708 pa_memchunk rchunk, pchunk, cchunk;
709 uint8_t *rdata, *pdata, *cdata;
710 float drift;
711 int unused PA_GCC_UNUSED;
712
713 rlen = pa_memblockq_get_length(u->source_memblockq);
714 plen = pa_memblockq_get_length(u->sink_memblockq);
715
716 /* Estimate snapshot drift as follows:
717 * pd: amount of data consumed since last time
718 * rd: amount of data consumed since last time
719 *
720 * drift = (pd - rd) / rd;
721 *
722 * We calculate pd and rd as the memblockq length less the number of
723 * samples left from the last iteration (to avoid double counting
724 * those remainder samples.
725 */
726 drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
727 u->sink_rem = plen % u->blocksize;
728 u->source_rem = rlen % u->blocksize;
729
730 /* Now let the canceller work its drift compensation magic */
731 u->ec->set_drift(u->ec, drift);
732
733 if (u->save_aec) {
734 if (u->drift_file)
735 fprintf(u->drift_file, "d %a\n", drift);
736 }
737
738 /* Send in the playback samples first */
739 while (plen >= u->blocksize) {
740 pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
741 pdata = pa_memblock_acquire(pchunk.memblock);
742 pdata += pchunk.index;
743
744 u->ec->play(u->ec, pdata);
745
746 if (u->save_aec) {
747 if (u->drift_file)
748 fprintf(u->drift_file, "p %d\n", u->blocksize);
749 if (u->played_file)
750 unused = fwrite(pdata, 1, u->blocksize, u->played_file);
751 }
752
753 pa_memblock_release(pchunk.memblock);
754 pa_memblockq_drop(u->sink_memblockq, u->blocksize);
755 pa_memblock_unref(pchunk.memblock);
756
757 plen -= u->blocksize;
758 }
759
760 /* And now the capture samples */
761 while (rlen >= u->blocksize) {
762 pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
763
764 rdata = pa_memblock_acquire(rchunk.memblock);
765 rdata += rchunk.index;
766
767 cchunk.index = 0;
768 cchunk.length = u->blocksize;
769 cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
770 cdata = pa_memblock_acquire(cchunk.memblock);
771
772 u->ec->record(u->ec, rdata, cdata);
773
774 if (u->save_aec) {
775 if (u->drift_file)
776 fprintf(u->drift_file, "c %d\n", u->blocksize);
777 if (u->captured_file)
778 unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
779 if (u->canceled_file)
780 unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
781 }
782
783 pa_memblock_release(cchunk.memblock);
784 pa_memblock_release(rchunk.memblock);
785
786 pa_memblock_unref(rchunk.memblock);
787
788 pa_source_post(u->source, &cchunk);
789 pa_memblock_unref(cchunk.memblock);
790
791 pa_memblockq_drop(u->source_memblockq, u->blocksize);
792 rlen -= u->blocksize;
793 }
794 }
795
796 /* This one's simpler than the drift compensation case -- we just iterate over
797 * the capture buffer, and pass the canceller blocksize bytes of playback and
798 * capture data.
799 *
800 * Called from source I/O thread context. */
801 static void do_push(struct userdata *u) {
802 size_t rlen, plen;
803 pa_memchunk rchunk, pchunk, cchunk;
804 uint8_t *rdata, *pdata, *cdata;
805 int unused PA_GCC_UNUSED;
806
807 rlen = pa_memblockq_get_length(u->source_memblockq);
808 plen = pa_memblockq_get_length(u->sink_memblockq);
809
810 while (rlen >= u->blocksize) {
811 /* take fixed block from recorded samples */
812 pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
813
814 if (plen >= u->blocksize) {
815 /* take fixed block from played samples */
816 pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
817
818 rdata = pa_memblock_acquire(rchunk.memblock);
819 rdata += rchunk.index;
820 pdata = pa_memblock_acquire(pchunk.memblock);
821 pdata += pchunk.index;
822
823 cchunk.index = 0;
824 cchunk.length = u->blocksize;
825 cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
826 cdata = pa_memblock_acquire(cchunk.memblock);
827
828 if (u->save_aec) {
829 if (u->captured_file)
830 unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
831 if (u->played_file)
832 unused = fwrite(pdata, 1, u->blocksize, u->played_file);
833 }
834
835 /* perform echo cancellation */
836 u->ec->run(u->ec, rdata, pdata, cdata);
837
838 if (u->save_aec) {
839 if (u->canceled_file)
840 unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
841 }
842
843 pa_memblock_release(cchunk.memblock);
844 pa_memblock_release(pchunk.memblock);
845 pa_memblock_release(rchunk.memblock);
846
847 /* drop consumed sink samples */
848 pa_memblockq_drop(u->sink_memblockq, u->blocksize);
849 pa_memblock_unref(pchunk.memblock);
850
851 pa_memblock_unref(rchunk.memblock);
852 /* the filtered samples now become the samples from our
853 * source */
854 rchunk = cchunk;
855
856 plen -= u->blocksize;
857 }
858
859 /* forward the (echo-canceled) data to the virtual source */
860 pa_source_post(u->source, &rchunk);
861 pa_memblock_unref(rchunk.memblock);
862
863 pa_memblockq_drop(u->source_memblockq, u->blocksize);
864 rlen -= u->blocksize;
865 }
866 }
867
868 /* Called from source I/O thread context. */
869 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
870 struct userdata *u;
871 size_t rlen, plen, to_skip;
872 pa_memchunk rchunk;
873
874 pa_source_output_assert_ref(o);
875 pa_source_output_assert_io_context(o);
876 pa_assert_se(u = o->userdata);
877
878 if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
879 pa_log("Push when no link?");
880 return;
881 }
882
883 if (PA_UNLIKELY(u->source->thread_info.state != PA_SOURCE_RUNNING ||
884 u->sink->thread_info.state != PA_SINK_RUNNING)) {
885 pa_source_post(u->source, chunk);
886 return;
887 }
888
889 /* handle queued messages, do any message sending of our own */
890 while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
891 ;
892
893 pa_memblockq_push_align(u->source_memblockq, chunk);
894
895 rlen = pa_memblockq_get_length(u->source_memblockq);
896 plen = pa_memblockq_get_length(u->sink_memblockq);
897
898 /* Let's not do anything else till we have enough data to process */
899 if (rlen < u->blocksize)
900 return;
901
902 /* See if we need to drop samples in order to sync */
903 if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
904 do_resync(u);
905 }
906
907 /* Okay, skip cancellation for skipped source samples if needed. */
908 if (PA_UNLIKELY(u->source_skip)) {
909 /* The slightly tricky bit here is that we drop all but modulo
910 * blocksize bytes and then adjust for that last bit on the sink side.
911 * We do this because the source data is coming at a fixed rate, which
912 * means the only way to try to catch up is drop sink samples and let
913 * the canceller cope up with this. */
914 to_skip = rlen >= u->source_skip ? u->source_skip : rlen;
915 to_skip -= to_skip % u->blocksize;
916
917 if (to_skip) {
918 pa_memblockq_peek_fixed_size(u->source_memblockq, to_skip, &rchunk);
919 pa_source_post(u->source, &rchunk);
920
921 pa_memblock_unref(rchunk.memblock);
922 pa_memblockq_drop(u->source_memblockq, to_skip);
923
924 rlen -= to_skip;
925 u->source_skip -= to_skip;
926 }
927
928 if (rlen && u->source_skip % u->blocksize) {
929 u->sink_skip += u->blocksize - (u->source_skip % u->blocksize);
930 u->source_skip -= (u->source_skip % u->blocksize);
931 }
932 }
933
934 /* And for the sink, these samples have been played back already, so we can
935 * just drop them and get on with it. */
936 if (PA_UNLIKELY(u->sink_skip)) {
937 to_skip = plen >= u->sink_skip ? u->sink_skip : plen;
938
939 pa_memblockq_drop(u->sink_memblockq, to_skip);
940
941 plen -= to_skip;
942 u->sink_skip -= to_skip;
943 }
944
945 /* process and push out samples */
946 if (u->ec->params.drift_compensation)
947 do_push_drift_comp(u);
948 else
949 do_push(u);
950 }
951
952 /* Called from sink I/O thread context. */
953 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
954 struct userdata *u;
955
956 pa_sink_input_assert_ref(i);
957 pa_assert(chunk);
958 pa_assert_se(u = i->userdata);
959
960 if (u->sink->thread_info.rewind_requested)
961 pa_sink_process_rewind(u->sink, 0);
962
963 pa_sink_render_full(u->sink, nbytes, chunk);
964
965 if (i->thread_info.underrun_for > 0) {
966 pa_log_debug("Handling end of underrun.");
967 pa_atomic_store(&u->request_resync, 1);
968 }
969
970 /* let source thread handle the chunk. pass the sample count as well so that
971 * the source IO thread can update the right variables. */
972 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
973 NULL, 0, chunk, NULL);
974 u->send_counter += chunk->length;
975
976 return 0;
977 }
978
979 /* Called from source I/O thread context. */
980 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
981 struct userdata *u;
982
983 pa_source_output_assert_ref(o);
984 pa_source_output_assert_io_context(o);
985 pa_assert_se(u = o->userdata);
986
987 pa_source_process_rewind(u->source, nbytes);
988
989 /* go back on read side, we need to use older sink data for this */
990 pa_memblockq_rewind(u->sink_memblockq, nbytes);
991
992 /* manipulate write index */
993 pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, TRUE);
994
995 pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
996 (long long) pa_memblockq_get_length (u->source_memblockq));
997 }
998
999 /* Called from sink I/O thread context. */
1000 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1001 struct userdata *u;
1002
1003 pa_sink_input_assert_ref(i);
1004 pa_assert_se(u = i->userdata);
1005
1006 pa_log_debug("Sink process rewind %lld", (long long) nbytes);
1007
1008 pa_sink_process_rewind(u->sink, nbytes);
1009
1010 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
1011 u->send_counter -= nbytes;
1012 }
1013
1014 /* Called from source I/O thread context. */
1015 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
1016 size_t delay, rlen, plen;
1017 pa_usec_t now, latency;
1018
1019 now = pa_rtclock_now();
1020 latency = pa_source_get_latency_within_thread(u->source_output->source);
1021 delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
1022
1023 delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
1024 rlen = pa_memblockq_get_length(u->source_memblockq);
1025 plen = pa_memblockq_get_length(u->sink_memblockq);
1026
1027 snapshot->source_now = now;
1028 snapshot->source_latency = latency;
1029 snapshot->source_delay = delay;
1030 snapshot->recv_counter = u->recv_counter;
1031 snapshot->rlen = rlen + u->sink_skip;
1032 snapshot->plen = plen + u->source_skip;
1033 }
1034
1035 /* Called from source I/O thread context. */
1036 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1037 struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
1038
1039 switch (code) {
1040
1041 case SOURCE_OUTPUT_MESSAGE_POST:
1042
1043 pa_source_output_assert_io_context(u->source_output);
1044
1045 if (u->source_output->source->thread_info.state == PA_SOURCE_RUNNING)
1046 pa_memblockq_push_align(u->sink_memblockq, chunk);
1047 else
1048 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1049
1050 u->recv_counter += (int64_t) chunk->length;
1051
1052 return 0;
1053
1054 case SOURCE_OUTPUT_MESSAGE_REWIND:
1055 pa_source_output_assert_io_context(u->source_output);
1056
1057 /* manipulate write index, never go past what we have */
1058 if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
1059 pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, TRUE);
1060 else
1061 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1062
1063 pa_log_debug("Sink rewind (%lld)", (long long) offset);
1064
1065 u->recv_counter -= offset;
1066
1067 return 0;
1068
1069 case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
1070 struct snapshot *snapshot = (struct snapshot *) data;
1071
1072 source_output_snapshot_within_thread(u, snapshot);
1073 return 0;
1074 }
1075
1076 case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
1077 apply_diff_time(u, offset);
1078 return 0;
1079
1080 }
1081
1082 return pa_source_output_process_msg(obj, code, data, offset, chunk);
1083 }
1084
1085 /* Called from sink I/O thread context. */
1086 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1087 struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1088
1089 switch (code) {
1090
1091 case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1092 size_t delay;
1093 pa_usec_t now, latency;
1094 struct snapshot *snapshot = (struct snapshot *) data;
1095
1096 pa_sink_input_assert_io_context(u->sink_input);
1097
1098 now = pa_rtclock_now();
1099 latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
1100 delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1101
1102 delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
1103
1104 snapshot->sink_now = now;
1105 snapshot->sink_latency = latency;
1106 snapshot->sink_delay = delay;
1107 snapshot->send_counter = u->send_counter;
1108 return 0;
1109 }
1110 }
1111
1112 return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1113 }
1114
1115 /* Called from sink I/O thread context. */
1116 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1117 struct userdata *u;
1118
1119 pa_sink_input_assert_ref(i);
1120 pa_assert_se(u = i->userdata);
1121
1122 pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
1123
1124 pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
1125 pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
1126 }
1127
1128 /* Called from source I/O thread context. */
1129 static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
1130 struct userdata *u;
1131
1132 pa_source_output_assert_ref(o);
1133 pa_assert_se(u = o->userdata);
1134
1135 pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
1136
1137 pa_source_set_max_rewind_within_thread(u->source, nbytes);
1138 }
1139
1140 /* Called from sink I/O thread context. */
1141 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1142 struct userdata *u;
1143
1144 pa_sink_input_assert_ref(i);
1145 pa_assert_se(u = i->userdata);
1146
1147 pa_log_debug("Sink input update max request %lld", (long long) nbytes);
1148
1149 pa_sink_set_max_request_within_thread(u->sink, nbytes);
1150 }
1151
1152 /* Called from sink I/O thread context. */
1153 static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
1154 struct userdata *u;
1155 pa_usec_t latency;
1156
1157 pa_sink_input_assert_ref(i);
1158 pa_assert_se(u = i->userdata);
1159
1160 latency = pa_sink_get_requested_latency_within_thread(i->sink);
1161
1162 pa_log_debug("Sink input update requested latency %lld", (long long) latency);
1163 }
1164
1165 /* Called from source I/O thread context. */
1166 static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
1167 struct userdata *u;
1168 pa_usec_t latency;
1169
1170 pa_source_output_assert_ref(o);
1171 pa_assert_se(u = o->userdata);
1172
1173 latency = pa_source_get_requested_latency_within_thread(o->source);
1174
1175 pa_log_debug("Source output update requested latency %lld", (long long) latency);
1176 }
1177
1178 /* Called from sink I/O thread context. */
1179 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
1180 struct userdata *u;
1181
1182 pa_sink_input_assert_ref(i);
1183 pa_assert_se(u = i->userdata);
1184
1185 pa_log_debug("Sink input update latency range %lld %lld",
1186 (long long) i->sink->thread_info.min_latency,
1187 (long long) i->sink->thread_info.max_latency);
1188
1189 pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1190 }
1191
1192 /* Called from source I/O thread context. */
1193 static void source_output_update_source_latency_range_cb(pa_source_output *o) {
1194 struct userdata *u;
1195
1196 pa_source_output_assert_ref(o);
1197 pa_assert_se(u = o->userdata);
1198
1199 pa_log_debug("Source output update latency range %lld %lld",
1200 (long long) o->source->thread_info.min_latency,
1201 (long long) o->source->thread_info.max_latency);
1202
1203 pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1204 }
1205
1206 /* Called from sink I/O thread context. */
1207 static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1208 struct userdata *u;
1209
1210 pa_sink_input_assert_ref(i);
1211 pa_assert_se(u = i->userdata);
1212
1213 pa_log_debug("Sink input update fixed latency %lld",
1214 (long long) i->sink->thread_info.fixed_latency);
1215
1216 pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1217 }
1218
1219 /* Called from source I/O thread context. */
1220 static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1221 struct userdata *u;
1222
1223 pa_source_output_assert_ref(o);
1224 pa_assert_se(u = o->userdata);
1225
1226 pa_log_debug("Source output update fixed latency %lld",
1227 (long long) o->source->thread_info.fixed_latency);
1228
1229 pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1230 }
1231
1232 /* Called from source I/O thread context. */
1233 static void source_output_attach_cb(pa_source_output *o) {
1234 struct userdata *u;
1235
1236 pa_source_output_assert_ref(o);
1237 pa_source_output_assert_io_context(o);
1238 pa_assert_se(u = o->userdata);
1239
1240 pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1241 pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1242 pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1243 pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1244
1245 pa_log_debug("Source output %d attach", o->index);
1246
1247 pa_source_attach_within_thread(u->source);
1248
1249 u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1250 o->source->thread_info.rtpoll,
1251 PA_RTPOLL_LATE,
1252 u->asyncmsgq);
1253 }
1254
1255 /* Called from sink I/O thread context. */
1256 static void sink_input_attach_cb(pa_sink_input *i) {
1257 struct userdata *u;
1258
1259 pa_sink_input_assert_ref(i);
1260 pa_assert_se(u = i->userdata);
1261
1262 pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1263 pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1264
1265 /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1266 * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1267 pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1268
1269 /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1270 * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1271 * HERE. SEE (6) */
1272 pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1273 pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1274
1275 pa_log_debug("Sink input %d attach", i->index);
1276
1277 u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1278 i->sink->thread_info.rtpoll,
1279 PA_RTPOLL_LATE,
1280 u->asyncmsgq);
1281
1282 pa_sink_attach_within_thread(u->sink);
1283 }
1284
1285
1286 /* Called from source I/O thread context. */
1287 static void source_output_detach_cb(pa_source_output *o) {
1288 struct userdata *u;
1289
1290 pa_source_output_assert_ref(o);
1291 pa_source_output_assert_io_context(o);
1292 pa_assert_se(u = o->userdata);
1293
1294 pa_source_detach_within_thread(u->source);
1295 pa_source_set_rtpoll(u->source, NULL);
1296
1297 pa_log_debug("Source output %d detach", o->index);
1298
1299 if (u->rtpoll_item_read) {
1300 pa_rtpoll_item_free(u->rtpoll_item_read);
1301 u->rtpoll_item_read = NULL;
1302 }
1303 }
1304
1305 /* Called from sink I/O thread context. */
1306 static void sink_input_detach_cb(pa_sink_input *i) {
1307 struct userdata *u;
1308
1309 pa_sink_input_assert_ref(i);
1310 pa_assert_se(u = i->userdata);
1311
1312 pa_sink_detach_within_thread(u->sink);
1313
1314 pa_sink_set_rtpoll(u->sink, NULL);
1315
1316 pa_log_debug("Sink input %d detach", i->index);
1317
1318 if (u->rtpoll_item_write) {
1319 pa_rtpoll_item_free(u->rtpoll_item_write);
1320 u->rtpoll_item_write = NULL;
1321 }
1322 }
1323
1324 /* Called from source I/O thread context. */
1325 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1326 struct userdata *u;
1327
1328 pa_source_output_assert_ref(o);
1329 pa_source_output_assert_io_context(o);
1330 pa_assert_se(u = o->userdata);
1331
1332 pa_log_debug("Source output %d state %d", o->index, state);
1333 }
1334
1335 /* Called from sink I/O thread context. */
1336 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1337 struct userdata *u;
1338
1339 pa_sink_input_assert_ref(i);
1340 pa_assert_se(u = i->userdata);
1341
1342 pa_log_debug("Sink input %d state %d", i->index, state);
1343
1344 /* If we are added for the first time, ask for a rewinding so that
1345 * we are heard right-away. */
1346 if (PA_SINK_INPUT_IS_LINKED(state) &&
1347 i->thread_info.state == PA_SINK_INPUT_INIT) {
1348 pa_log_debug("Requesting rewind due to state change.");
1349 pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
1350 }
1351 }
1352
1353 /* Called from main context. */
1354 static void source_output_kill_cb(pa_source_output *o) {
1355 struct userdata *u;
1356
1357 pa_source_output_assert_ref(o);
1358 pa_assert_ctl_context();
1359 pa_assert_se(u = o->userdata);
1360
1361 u->dead = TRUE;
1362
1363 /* The order here matters! We first kill the source output, followed
1364 * by the source. That means the source callbacks must be protected
1365 * against an unconnected source output! */
1366 pa_source_output_unlink(u->source_output);
1367 pa_source_unlink(u->source);
1368
1369 pa_source_output_unref(u->source_output);
1370 u->source_output = NULL;
1371
1372 pa_source_unref(u->source);
1373 u->source = NULL;
1374
1375 pa_log_debug("Source output kill %d", o->index);
1376
1377 pa_module_unload_request(u->module, TRUE);
1378 }
1379
1380 /* Called from main context */
1381 static void sink_input_kill_cb(pa_sink_input *i) {
1382 struct userdata *u;
1383
1384 pa_sink_input_assert_ref(i);
1385 pa_assert_se(u = i->userdata);
1386
1387 u->dead = TRUE;
1388
1389 /* The order here matters! We first kill the sink input, followed
1390 * by the sink. That means the sink callbacks must be protected
1391 * against an unconnected sink input! */
1392 pa_sink_input_unlink(u->sink_input);
1393 pa_sink_unlink(u->sink);
1394
1395 pa_sink_input_unref(u->sink_input);
1396 u->sink_input = NULL;
1397
1398 pa_sink_unref(u->sink);
1399 u->sink = NULL;
1400
1401 pa_log_debug("Sink input kill %d", i->index);
1402
1403 pa_module_unload_request(u->module, TRUE);
1404 }
1405
1406 /* Called from main context. */
1407 static pa_bool_t source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1408 struct userdata *u;
1409
1410 pa_source_output_assert_ref(o);
1411 pa_assert_ctl_context();
1412 pa_assert_se(u = o->userdata);
1413
1414 if (u->dead || u->autoloaded)
1415 return FALSE;
1416
1417 return (u->source != dest) && (u->sink != dest->monitor_of);
1418 }
1419
1420 /* Called from main context */
1421 static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1422 struct userdata *u;
1423
1424 pa_sink_input_assert_ref(i);
1425 pa_assert_se(u = i->userdata);
1426
1427 if (u->dead || u->autoloaded)
1428 return FALSE;
1429
1430 return u->sink != dest;
1431 }
1432
1433 /* Called from main context. */
1434 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1435 struct userdata *u;
1436
1437 pa_source_output_assert_ref(o);
1438 pa_assert_ctl_context();
1439 pa_assert_se(u = o->userdata);
1440
1441 if (dest) {
1442 pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1443 pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1444 } else
1445 pa_source_set_asyncmsgq(u->source, NULL);
1446
1447 if (u->source_auto_desc && dest) {
1448 const char *y, *z;
1449 pa_proplist *pl;
1450
1451 pl = pa_proplist_new();
1452 y = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION);
1453 z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1454 pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1455 y ? y : u->sink_input->sink->name);
1456
1457 pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1458 pa_proplist_free(pl);
1459 }
1460 }
1461
1462 /* Called from main context */
1463 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1464 struct userdata *u;
1465
1466 pa_sink_input_assert_ref(i);
1467 pa_assert_se(u = i->userdata);
1468
1469 if (dest) {
1470 pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1471 pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1472 } else
1473 pa_sink_set_asyncmsgq(u->sink, NULL);
1474
1475 if (u->sink_auto_desc && dest) {
1476 const char *y, *z;
1477 pa_proplist *pl;
1478
1479 pl = pa_proplist_new();
1480 y = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION);
1481 z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1482 pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1483 y ? y : u->source_output->source->name);
1484
1485 pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1486 pa_proplist_free(pl);
1487 }
1488 }
1489
1490 /* Called from main context */
1491 static void sink_input_volume_changed_cb(pa_sink_input *i) {
1492 struct userdata *u;
1493
1494 pa_sink_input_assert_ref(i);
1495 pa_assert_se(u = i->userdata);
1496
1497 pa_sink_volume_changed(u->sink, &i->volume);
1498 }
1499
1500 /* Called from main context */
1501 static void sink_input_mute_changed_cb(pa_sink_input *i) {
1502 struct userdata *u;
1503
1504 pa_sink_input_assert_ref(i);
1505 pa_assert_se(u = i->userdata);
1506
1507 pa_sink_mute_changed(u->sink, i->muted);
1508 }
1509
1510 /* Called from main context */
1511 static int canceller_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1512 struct pa_echo_canceller_msg *msg;
1513 struct userdata *u;
1514
1515 pa_assert(o);
1516
1517 msg = PA_ECHO_CANCELLER_MSG(o);
1518 u = msg->userdata;
1519
1520 switch (code) {
1521 case ECHO_CANCELLER_MESSAGE_SET_VOLUME: {
1522 pa_cvolume *v = (pa_cvolume *) userdata;
1523
1524 if (u->use_volume_sharing)
1525 pa_source_set_volume(u->source, v, TRUE, FALSE);
1526 else
1527 pa_source_output_set_volume(u->source_output, v, FALSE, TRUE);
1528
1529 break;
1530 }
1531
1532 default:
1533 pa_assert_not_reached();
1534 break;
1535 }
1536
1537 return 0;
1538 }
1539
1540 /* Called by the canceller, so source I/O thread context. */
1541 void pa_echo_canceller_get_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1542 *v = ec->msg->userdata->thread_info.current_volume;
1543 }
1544
1545 /* Called by the canceller, so source I/O thread context. */
1546 void pa_echo_canceller_set_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1547 if (!pa_cvolume_equal(&ec->msg->userdata->thread_info.current_volume, v)) {
1548 pa_cvolume *vol = pa_xnewdup(pa_cvolume, v, 1);
1549
1550 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(ec->msg), ECHO_CANCELLER_MESSAGE_SET_VOLUME, vol, 0, NULL,
1551 pa_xfree);
1552 }
1553 }
1554
1555 static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1556 #ifdef HAVE_SPEEX
1557 if (pa_streq(method, "speex"))
1558 return PA_ECHO_CANCELLER_SPEEX;
1559 #endif
1560 #ifdef HAVE_ADRIAN_EC
1561 if (pa_streq(method, "adrian"))
1562 return PA_ECHO_CANCELLER_ADRIAN;
1563 #endif
1564 #ifdef HAVE_WEBRTC
1565 if (pa_streq(method, "webrtc"))
1566 return PA_ECHO_CANCELLER_WEBRTC;
1567 #endif
1568 return PA_ECHO_CANCELLER_INVALID;
1569 }
1570
1571 /* Common initialisation bits between module-echo-cancel and the standalone
1572 * test program.
1573 *
1574 * Called from main context. */
1575 static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *source_ss, pa_channel_map *source_map) {
1576 pa_echo_canceller_method_t ec_method;
1577
1578 if (pa_modargs_get_sample_spec_and_channel_map(ma, source_ss, source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1579 pa_log("Invalid sample format specification or channel map");
1580 goto fail;
1581 }
1582
1583 u->ec = pa_xnew0(pa_echo_canceller, 1);
1584 if (!u->ec) {
1585 pa_log("Failed to alloc echo canceller");
1586 goto fail;
1587 }
1588
1589 if ((ec_method = get_ec_method_from_string(pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER))) < 0) {
1590 pa_log("Invalid echo canceller implementation");
1591 goto fail;
1592 }
1593
1594 u->ec->init = ec_table[ec_method].init;
1595 u->ec->play = ec_table[ec_method].play;
1596 u->ec->record = ec_table[ec_method].record;
1597 u->ec->set_drift = ec_table[ec_method].set_drift;
1598 u->ec->run = ec_table[ec_method].run;
1599 u->ec->done = ec_table[ec_method].done;
1600
1601 return 0;
1602
1603 fail:
1604 return -1;
1605 }
1606
1607 /* Called from main context. */
1608 int pa__init(pa_module*m) {
1609 struct userdata *u;
1610 pa_sample_spec source_ss, sink_ss;
1611 pa_channel_map source_map, sink_map;
1612 pa_modargs *ma;
1613 pa_source *source_master=NULL;
1614 pa_sink *sink_master=NULL;
1615 pa_source_output_new_data source_output_data;
1616 pa_sink_input_new_data sink_input_data;
1617 pa_source_new_data source_data;
1618 pa_sink_new_data sink_data;
1619 pa_memchunk silence;
1620 uint32_t temp;
1621
1622 pa_assert(m);
1623
1624 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1625 pa_log("Failed to parse module arguments.");
1626 goto fail;
1627 }
1628
1629 if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1630 pa_log("Master source not found");
1631 goto fail;
1632 }
1633 pa_assert(source_master);
1634
1635 if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1636 pa_log("Master sink not found");
1637 goto fail;
1638 }
1639 pa_assert(sink_master);
1640
1641 if (source_master->monitor_of == sink_master) {
1642 pa_log("Can't cancel echo between a sink and its monitor");
1643 goto fail;
1644 }
1645
1646 source_ss = source_master->sample_spec;
1647 source_ss.rate = DEFAULT_RATE;
1648 source_ss.channels = DEFAULT_CHANNELS;
1649 pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1650
1651 sink_ss = sink_master->sample_spec;
1652 sink_map = sink_master->channel_map;
1653
1654 u = pa_xnew0(struct userdata, 1);
1655 if (!u) {
1656 pa_log("Failed to alloc userdata");
1657 goto fail;
1658 }
1659 u->core = m->core;
1660 u->module = m;
1661 m->userdata = u;
1662 u->dead = FALSE;
1663
1664 u->use_volume_sharing = TRUE;
1665 if (pa_modargs_get_value_boolean(ma, "use_volume_sharing", &u->use_volume_sharing) < 0) {
1666 pa_log("use_volume_sharing= expects a boolean argument");
1667 goto fail;
1668 }
1669
1670 temp = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1671 if (pa_modargs_get_value_u32(ma, "adjust_time", &temp) < 0) {
1672 pa_log("Failed to parse adjust_time value");
1673 goto fail;
1674 }
1675
1676 if (temp != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1677 u->adjust_time = temp * PA_USEC_PER_SEC;
1678 else
1679 u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1680
1681 temp = DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC;
1682 if (pa_modargs_get_value_u32(ma, "adjust_threshold", &temp) < 0) {
1683 pa_log("Failed to parse adjust_threshold value");
1684 goto fail;
1685 }
1686
1687 if (temp != DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC)
1688 u->adjust_threshold = temp * PA_USEC_PER_MSEC;
1689 else
1690 u->adjust_threshold = DEFAULT_ADJUST_TOLERANCE;
1691
1692 u->save_aec = DEFAULT_SAVE_AEC;
1693 if (pa_modargs_get_value_boolean(ma, "save_aec", &u->save_aec) < 0) {
1694 pa_log("Failed to parse save_aec value");
1695 goto fail;
1696 }
1697
1698 u->autoloaded = DEFAULT_AUTOLOADED;
1699 if (pa_modargs_get_value_boolean(ma, "autoloaded", &u->autoloaded) < 0) {
1700 pa_log("Failed to parse autoloaded value");
1701 goto fail;
1702 }
1703
1704 if (init_common(ma, u, &source_ss, &source_map) < 0)
1705 goto fail;
1706
1707 u->asyncmsgq = pa_asyncmsgq_new(0);
1708 u->need_realign = TRUE;
1709
1710 if (u->ec->init) {
1711 if (!u->ec->init(u->core, u->ec, &source_ss, &source_map, &sink_ss, &sink_map, &u->blocksize, pa_modargs_get_value(ma, "aec_args", NULL))) {
1712 pa_log("Failed to init AEC engine");
1713 goto fail;
1714 }
1715 }
1716
1717 if (u->ec->params.drift_compensation)
1718 pa_assert(u->ec->set_drift);
1719
1720 /* Create source */
1721 pa_source_new_data_init(&source_data);
1722 source_data.driver = __FILE__;
1723 source_data.module = m;
1724 if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1725 source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1726 pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1727 pa_source_new_data_set_channel_map(&source_data, &source_map);
1728 pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1729 pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1730 if (!u->autoloaded)
1731 pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1732
1733 if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1734 pa_log("Invalid properties");
1735 pa_source_new_data_done(&source_data);
1736 goto fail;
1737 }
1738
1739 if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1740 const char *y, *z;
1741
1742 y = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1743 z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1744 pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1745 z ? z : source_master->name, y ? y : sink_master->name);
1746 }
1747
1748 u->source = pa_source_new(m->core, &source_data, (source_master->flags & (PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY))
1749 | (u->use_volume_sharing ? PA_SOURCE_SHARE_VOLUME_WITH_MASTER : 0));
1750 pa_source_new_data_done(&source_data);
1751
1752 if (!u->source) {
1753 pa_log("Failed to create source.");
1754 goto fail;
1755 }
1756
1757 u->source->parent.process_msg = source_process_msg_cb;
1758 u->source->set_state = source_set_state_cb;
1759 u->source->update_requested_latency = source_update_requested_latency_cb;
1760 pa_source_set_get_mute_callback(u->source, source_get_mute_cb);
1761 pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
1762 if (!u->use_volume_sharing) {
1763 pa_source_set_get_volume_callback(u->source, source_get_volume_cb);
1764 pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
1765 pa_source_enable_decibel_volume(u->source, TRUE);
1766 }
1767 u->source->userdata = u;
1768
1769 pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1770
1771 /* Create sink */
1772 pa_sink_new_data_init(&sink_data);
1773 sink_data.driver = __FILE__;
1774 sink_data.module = m;
1775 if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1776 sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1777 pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1778 pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1779 pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1780 pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1781 if (!u->autoloaded)
1782 pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1783
1784 if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1785 pa_log("Invalid properties");
1786 pa_sink_new_data_done(&sink_data);
1787 goto fail;
1788 }
1789
1790 if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1791 const char *y, *z;
1792
1793 y = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1794 z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1795 pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1796 z ? z : sink_master->name, y ? y : source_master->name);
1797 }
1798
1799 u->sink = pa_sink_new(m->core, &sink_data, (sink_master->flags & (PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY))
1800 | (u->use_volume_sharing ? PA_SINK_SHARE_VOLUME_WITH_MASTER : 0));
1801 pa_sink_new_data_done(&sink_data);
1802
1803 if (!u->sink) {
1804 pa_log("Failed to create sink.");
1805 goto fail;
1806 }
1807
1808 u->sink->parent.process_msg = sink_process_msg_cb;
1809 u->sink->set_state = sink_set_state_cb;
1810 u->sink->update_requested_latency = sink_update_requested_latency_cb;
1811 u->sink->request_rewind = sink_request_rewind_cb;
1812 pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
1813 if (!u->use_volume_sharing) {
1814 pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
1815 pa_sink_enable_decibel_volume(u->sink, TRUE);
1816 }
1817 u->sink->userdata = u;
1818
1819 pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1820
1821 /* Create source output */
1822 pa_source_output_new_data_init(&source_output_data);
1823 source_output_data.driver = __FILE__;
1824 source_output_data.module = m;
1825 pa_source_output_new_data_set_source(&source_output_data, source_master, FALSE);
1826 source_output_data.destination_source = u->source;
1827 /* FIXME
1828 source_output_data.flags = PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; */
1829
1830 pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1831 pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1832 pa_source_output_new_data_set_sample_spec(&source_output_data, &source_ss);
1833 pa_source_output_new_data_set_channel_map(&source_output_data, &source_map);
1834
1835 pa_source_output_new(&u->source_output, m->core, &source_output_data);
1836 pa_source_output_new_data_done(&source_output_data);
1837
1838 if (!u->source_output)
1839 goto fail;
1840
1841 u->source_output->parent.process_msg = source_output_process_msg_cb;
1842 u->source_output->push = source_output_push_cb;
1843 u->source_output->process_rewind = source_output_process_rewind_cb;
1844 u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1845 u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1846 u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1847 u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1848 u->source_output->kill = source_output_kill_cb;
1849 u->source_output->attach = source_output_attach_cb;
1850 u->source_output->detach = source_output_detach_cb;
1851 u->source_output->state_change = source_output_state_change_cb;
1852 u->source_output->may_move_to = source_output_may_move_to_cb;
1853 u->source_output->moving = source_output_moving_cb;
1854 u->source_output->userdata = u;
1855
1856 u->source->output_from_master = u->source_output;
1857
1858 /* Create sink input */
1859 pa_sink_input_new_data_init(&sink_input_data);
1860 sink_input_data.driver = __FILE__;
1861 sink_input_data.module = m;
1862 pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, FALSE);
1863 sink_input_data.origin_sink = u->sink;
1864 pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
1865 pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1866 pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
1867 pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
1868 sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
1869
1870 pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1871 pa_sink_input_new_data_done(&sink_input_data);
1872
1873 if (!u->sink_input)
1874 goto fail;
1875
1876 u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1877 u->sink_input->pop = sink_input_pop_cb;
1878 u->sink_input->process_rewind = sink_input_process_rewind_cb;
1879 u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1880 u->sink_input->update_max_request = sink_input_update_max_request_cb;
1881 u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
1882 u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1883 u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
1884 u->sink_input->kill = sink_input_kill_cb;
1885 u->sink_input->attach = sink_input_attach_cb;
1886 u->sink_input->detach = sink_input_detach_cb;
1887 u->sink_input->state_change = sink_input_state_change_cb;
1888 u->sink_input->may_move_to = sink_input_may_move_to_cb;
1889 u->sink_input->moving = sink_input_moving_cb;
1890 if (!u->use_volume_sharing)
1891 u->sink_input->volume_changed = sink_input_volume_changed_cb;
1892 u->sink_input->mute_changed = sink_input_mute_changed_cb;
1893 u->sink_input->userdata = u;
1894
1895 u->sink->input_to_master = u->sink_input;
1896
1897 pa_sink_input_get_silence(u->sink_input, &silence);
1898
1899 u->source_memblockq = pa_memblockq_new("module-echo-cancel source_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1900 &source_ss, 1, 1, 0, &silence);
1901 u->sink_memblockq = pa_memblockq_new("module-echo-cancel sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1902 &sink_ss, 1, 1, 0, &silence);
1903
1904 pa_memblock_unref(silence.memblock);
1905
1906 if (!u->source_memblockq || !u->sink_memblockq) {
1907 pa_log("Failed to create memblockq.");
1908 goto fail;
1909 }
1910
1911 if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
1912 u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1913 else if (u->ec->params.drift_compensation) {
1914 pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
1915 u->adjust_time = 0;
1916 /* Perform resync just once to give the canceller a leg up */
1917 pa_atomic_store(&u->request_resync, 1);
1918 }
1919
1920 if (u->save_aec) {
1921 pa_log("Creating AEC files in /tmp");
1922 u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
1923 if (u->captured_file == NULL)
1924 perror ("fopen failed");
1925 u->played_file = fopen("/tmp/aec_play.sw", "wb");
1926 if (u->played_file == NULL)
1927 perror ("fopen failed");
1928 u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
1929 if (u->canceled_file == NULL)
1930 perror ("fopen failed");
1931 if (u->ec->params.drift_compensation) {
1932 u->drift_file = fopen("/tmp/aec_drift.txt", "w");
1933 if (u->drift_file == NULL)
1934 perror ("fopen failed");
1935 }
1936 }
1937
1938 u->ec->msg = pa_msgobject_new(pa_echo_canceller_msg);
1939 u->ec->msg->parent.process_msg = canceller_process_msg_cb;
1940 u->ec->msg->userdata = u;
1941
1942 u->thread_info.current_volume = u->source->reference_volume;
1943
1944 pa_sink_put(u->sink);
1945 pa_source_put(u->source);
1946
1947 pa_sink_input_put(u->sink_input);
1948 pa_source_output_put(u->source_output);
1949 pa_modargs_free(ma);
1950
1951 return 0;
1952
1953 fail:
1954 if (ma)
1955 pa_modargs_free(ma);
1956
1957 pa__done(m);
1958
1959 return -1;
1960 }
1961
1962 /* Called from main context. */
1963 int pa__get_n_used(pa_module *m) {
1964 struct userdata *u;
1965
1966 pa_assert(m);
1967 pa_assert_se(u = m->userdata);
1968
1969 return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
1970 }
1971
1972 /* Called from main context. */
1973 void pa__done(pa_module*m) {
1974 struct userdata *u;
1975
1976 pa_assert(m);
1977
1978 if (!(u = m->userdata))
1979 return;
1980
1981 u->dead = TRUE;
1982
1983 /* See comments in source_output_kill_cb() above regarding
1984 * destruction order! */
1985
1986 if (u->time_event)
1987 u->core->mainloop->time_free(u->time_event);
1988
1989 if (u->source_output)
1990 pa_source_output_unlink(u->source_output);
1991 if (u->sink_input)
1992 pa_sink_input_unlink(u->sink_input);
1993
1994 if (u->source)
1995 pa_source_unlink(u->source);
1996 if (u->sink)
1997 pa_sink_unlink(u->sink);
1998
1999 if (u->source_output)
2000 pa_source_output_unref(u->source_output);
2001 if (u->sink_input)
2002 pa_sink_input_unref(u->sink_input);
2003
2004 if (u->source)
2005 pa_source_unref(u->source);
2006 if (u->sink)
2007 pa_sink_unref(u->sink);
2008
2009 if (u->source_memblockq)
2010 pa_memblockq_free(u->source_memblockq);
2011 if (u->sink_memblockq)
2012 pa_memblockq_free(u->sink_memblockq);
2013
2014 if (u->ec) {
2015 if (u->ec->done)
2016 u->ec->done(u->ec);
2017
2018 pa_xfree(u->ec);
2019 }
2020
2021 if (u->asyncmsgq)
2022 pa_asyncmsgq_unref(u->asyncmsgq);
2023
2024 if (u->save_aec) {
2025 if (u->played_file)
2026 fclose(u->played_file);
2027 if (u->captured_file)
2028 fclose(u->captured_file);
2029 if (u->canceled_file)
2030 fclose(u->canceled_file);
2031 if (u->drift_file)
2032 fclose(u->drift_file);
2033 }
2034
2035 pa_xfree(u);
2036 }
2037
2038 #ifdef ECHO_CANCEL_TEST
2039 /*
2040 * Stand-alone test program for running in the canceller on pre-recorded files.
2041 */
2042 int main(int argc, char* argv[]) {
2043 struct userdata u;
2044 pa_sample_spec source_ss, sink_ss;
2045 pa_channel_map source_map, sink_map;
2046 pa_modargs *ma = NULL;
2047 uint8_t *rdata = NULL, *pdata = NULL, *cdata = NULL;
2048 int unused PA_GCC_UNUSED;
2049 int ret = 0, i;
2050 char c;
2051 float drift;
2052
2053 pa_memzero(&u, sizeof(u));
2054
2055 if (argc < 4 || argc > 7) {
2056 goto usage;
2057 }
2058
2059 u.captured_file = fopen(argv[2], "rb");
2060 if (u.captured_file == NULL) {
2061 perror ("fopen failed");
2062 goto fail;
2063 }
2064 u.played_file = fopen(argv[1], "rb");
2065 if (u.played_file == NULL) {
2066 perror ("fopen failed");
2067 goto fail;
2068 }
2069 u.canceled_file = fopen(argv[3], "wb");
2070 if (u.canceled_file == NULL) {
2071 perror ("fopen failed");
2072 goto fail;
2073 }
2074
2075 u.core = pa_xnew0(pa_core, 1);
2076 u.core->cpu_info.cpu_type = PA_CPU_X86;
2077 u.core->cpu_info.flags.x86 |= PA_CPU_X86_SSE;
2078
2079 if (!(ma = pa_modargs_new(argc > 4 ? argv[4] : NULL, valid_modargs))) {
2080 pa_log("Failed to parse module arguments.");
2081 goto fail;
2082 }
2083
2084 source_ss.format = PA_SAMPLE_S16LE;
2085 source_ss.rate = DEFAULT_RATE;
2086 source_ss.channels = DEFAULT_CHANNELS;
2087 pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2088
2089 if (init_common(ma, &u, &source_ss, &source_map) < 0)
2090 goto fail;
2091
2092 if (!u.ec->init(u.core, u.ec, &source_ss, &source_map, &sink_ss, &sink_map, &u.blocksize,
2093 (argc > 4) ? argv[5] : NULL )) {
2094 pa_log("Failed to init AEC engine");
2095 goto fail;
2096 }
2097
2098 if (u.ec->params.drift_compensation) {
2099 if (argc < 7) {
2100 pa_log("Drift compensation enabled but drift file not specified");
2101 goto fail;
2102 }
2103
2104 u.drift_file = fopen(argv[6], "rt");
2105
2106 if (u.drift_file == NULL) {
2107 perror ("fopen failed");
2108 goto fail;
2109 }
2110 }
2111
2112 rdata = pa_xmalloc(u.blocksize);
2113 pdata = pa_xmalloc(u.blocksize);
2114 cdata = pa_xmalloc(u.blocksize);
2115
2116 if (!u.ec->params.drift_compensation) {
2117 while (fread(rdata, u.blocksize, 1, u.captured_file) > 0) {
2118 if (fread(pdata, u.blocksize, 1, u.played_file) == 0) {
2119 perror("Played file ended before captured file");
2120 goto fail;
2121 }
2122
2123 u.ec->run(u.ec, rdata, pdata, cdata);
2124
2125 unused = fwrite(cdata, u.blocksize, 1, u.canceled_file);
2126 }
2127 } else {
2128 while (fscanf(u.drift_file, "%c", &c) > 0) {
2129 switch (c) {
2130 case 'd':
2131 if (!fscanf(u.drift_file, "%a", &drift)) {
2132 perror("Drift file incomplete");
2133 goto fail;
2134 }
2135
2136 u.ec->set_drift(u.ec, drift);
2137
2138 break;
2139
2140 case 'c':
2141 if (!fscanf(u.drift_file, "%d", &i)) {
2142 perror("Drift file incomplete");
2143 goto fail;
2144 }
2145
2146 if (fread(rdata, i, 1, u.captured_file) <= 0) {
2147 perror("Captured file ended prematurely");
2148 goto fail;
2149 }
2150
2151 u.ec->record(u.ec, rdata, cdata);
2152
2153 unused = fwrite(cdata, i, 1, u.canceled_file);
2154
2155 break;
2156
2157 case 'p':
2158 if (!fscanf(u.drift_file, "%d", &i)) {
2159 perror("Drift file incomplete");
2160 goto fail;
2161 }
2162
2163 if (fread(pdata, i, 1, u.played_file) <= 0) {
2164 perror("Played file ended prematurely");
2165 goto fail;
2166 }
2167
2168 u.ec->play(u.ec, pdata);
2169
2170 break;
2171 }
2172 }
2173
2174 if (fread(rdata, i, 1, u.captured_file) > 0)
2175 pa_log("All capture data was not consumed");
2176 if (fread(pdata, i, 1, u.played_file) > 0)
2177 pa_log("All playback data was not consumed");
2178 }
2179
2180 u.ec->done(u.ec);
2181
2182 out:
2183 if (u.captured_file)
2184 fclose(u.captured_file);
2185 if (u.played_file)
2186 fclose(u.played_file);
2187 if (u.canceled_file)
2188 fclose(u.canceled_file);
2189 if (u.drift_file)
2190 fclose(u.drift_file);
2191
2192 pa_xfree(rdata);
2193 pa_xfree(pdata);
2194 pa_xfree(cdata);
2195
2196 pa_xfree(u.ec);
2197 pa_xfree(u.core);
2198
2199 if (ma)
2200 pa_modargs_free(ma);
2201
2202 return ret;
2203
2204 usage:
2205 pa_log("Usage: %s play_file rec_file out_file [module args] [aec_args] [drift_file]", argv[0]);
2206
2207 fail:
2208 ret = -1;
2209 goto out;
2210 }
2211 #endif /* ECHO_CANCEL_TEST */