]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
41b18ab06c54a522ed6dda4e910dec3278b25b57
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <errno.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <math.h>
34
35 #include <pulse/rtclock.h>
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-rtclock.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/modargs.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/sample-util.h>
51 #include <pulsecore/macro.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/once.h>
55 #include <pulsecore/poll.h>
56 #include <pulsecore/arpa-inet.h>
57
58 #include "module-rtp-recv-symdef.h"
59
60 #include "rtp.h"
61 #include "sdp.h"
62 #include "sap.h"
63
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION);
67 PA_MODULE_LOAD_ONCE(false);
68 PA_MODULE_USAGE(
69 "sink=<name of the sink> "
70 "sap_address=<multicast address to listen on> "
71 "latency_msec=<latency in ms> "
72 );
73
74 #define SAP_PORT 9875
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define DEFAULT_LATENCY_MSEC 500
77 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
78 #define MAX_SESSIONS 16
79 #define DEATH_TIMEOUT 20
80 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
81
82 static const char* const valid_modargs[] = {
83 "sink",
84 "sap_address",
85 "latency_msec",
86 NULL
87 };
88
89 struct session {
90 struct userdata *userdata;
91 PA_LLIST_FIELDS(struct session);
92
93 pa_sink_input *sink_input;
94 pa_memblockq *memblockq;
95
96 bool first_packet;
97 uint32_t ssrc;
98 uint32_t offset;
99
100 struct pa_sdp_info sdp_info;
101
102 pa_rtp_context rtp_context;
103
104 pa_rtpoll_item *rtpoll_item;
105
106 pa_atomic_t timestamp;
107
108 pa_usec_t intended_latency;
109 pa_usec_t sink_latency;
110
111 pa_usec_t last_rate_update;
112 pa_usec_t last_latency;
113 double estimated_rate;
114 double avg_estimated_rate;
115 };
116
117 struct userdata {
118 pa_module *module;
119 pa_core *core;
120
121 pa_sap_context sap_context;
122 pa_io_event* sap_event;
123
124 pa_time_event *check_death_event;
125
126 char *sink_name;
127
128 PA_LLIST_HEAD(struct session, sessions);
129 pa_hashmap *by_origin;
130 int n_sessions;
131
132 pa_usec_t latency;
133 };
134
135 static void session_free(struct session *s);
136
137 /* Called from I/O thread context */
138 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
139 struct session *s = PA_SINK_INPUT(o)->userdata;
140
141 switch (code) {
142 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
143 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
144
145 /* Fall through, the default handler will add in the extra
146 * latency added by the resampler */
147 break;
148 }
149
150 return pa_sink_input_process_msg(o, code, data, offset, chunk);
151 }
152
153 /* Called from I/O thread context */
154 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
155 struct session *s;
156 pa_sink_input_assert_ref(i);
157 pa_assert_se(s = i->userdata);
158
159 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
160 return -1;
161
162 pa_memblockq_drop(s->memblockq, chunk->length);
163
164 return 0;
165 }
166
167 /* Called from I/O thread context */
168 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
169 struct session *s;
170
171 pa_sink_input_assert_ref(i);
172 pa_assert_se(s = i->userdata);
173
174 pa_memblockq_rewind(s->memblockq, nbytes);
175 }
176
177 /* Called from I/O thread context */
178 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
179 struct session *s;
180
181 pa_sink_input_assert_ref(i);
182 pa_assert_se(s = i->userdata);
183
184 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
185 }
186
187 /* Called from main context */
188 static void sink_input_kill(pa_sink_input* i) {
189 struct session *s;
190 pa_sink_input_assert_ref(i);
191 pa_assert_se(s = i->userdata);
192
193 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
194 session_free(s);
195 }
196
197 /* Called from IO context */
198 static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
199 struct session *s;
200 pa_sink_input_assert_ref(i);
201 pa_assert_se(s = i->userdata);
202
203 if (b)
204 pa_memblockq_flush_read(s->memblockq);
205 else
206 s->first_packet = false;
207 }
208
209 /* Called from I/O thread context */
210 static int rtpoll_work_cb(pa_rtpoll_item *i) {
211 pa_memchunk chunk;
212 int64_t k, j, delta;
213 struct timeval now = { 0, 0 };
214 struct session *s;
215 struct pollfd *p;
216
217 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
218
219 p = pa_rtpoll_item_get_pollfd(i, NULL);
220
221 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
222 pa_log("poll() signalled bad revents.");
223 return -1;
224 }
225
226 if ((p->revents & POLLIN) == 0)
227 return 0;
228
229 p->revents = 0;
230
231 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
232 return 0;
233
234 if (s->sdp_info.payload != s->rtp_context.payload ||
235 !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
236 pa_memblock_unref(chunk.memblock);
237 return 0;
238 }
239
240 if (!s->first_packet) {
241 s->first_packet = true;
242
243 s->ssrc = s->rtp_context.ssrc;
244 s->offset = s->rtp_context.timestamp;
245
246 if (s->ssrc == s->userdata->module->core->cookie)
247 pa_log_warn("Detected RTP packet loop!");
248 } else {
249 if (s->ssrc != s->rtp_context.ssrc) {
250 pa_memblock_unref(chunk.memblock);
251 return 0;
252 }
253 }
254
255 /* Check whether there was a timestamp overflow */
256 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
257 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
258
259 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
260 delta = k;
261 else
262 delta = j;
263
264 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, true);
265
266 if (now.tv_sec == 0) {
267 PA_ONCE_BEGIN {
268 pa_log_warn("Using artificial time instead of timestamp");
269 } PA_ONCE_END;
270 pa_rtclock_get(&now);
271 } else
272 pa_rtclock_from_wallclock(&now);
273
274 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
275 pa_log_warn("Queue overrun");
276 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
277 }
278
279 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
280
281 pa_memblock_unref(chunk.memblock);
282
283 /* The next timestamp we expect */
284 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
285
286 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
287
288 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
289 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
290 uint32_t base_rate = s->sink_input->sink->sample_spec.rate;
291 uint32_t current_rate = s->sink_input->sample_spec.rate;
292 uint32_t new_rate;
293 double estimated_rate, alpha = 0.02;
294
295 pa_log_debug("Updating sample rate");
296
297 wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
298 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
299
300 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
301
302 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
303 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
304
305 if (ri > render_delay+sink_delay)
306 ri -= render_delay+sink_delay;
307 else
308 ri = 0;
309
310 if (wi < ri)
311 latency = 0;
312 else
313 latency = wi - ri;
314
315 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
316
317 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
318 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
319 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
320 * T
321 * R̂ = ─────────────── Rⁿ . (1)
322 * T - (Lⁿ - Lⁿ⁻ⁱ)
323 *
324 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
325 * is correct). But there is also the requirement to keep the buffer at a predefined target
326 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
327 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
328 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
329 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
330 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
331 * ʲ⁼ⁱ R̂ a a
332 * Solving for Rⁿ⁺ⁱ gives
333 * T - ²∕ₐ₊₁(L̂ - Lⁿ)
334 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
335 * T
336 * In the code below a = 7 is used.
337 *
338 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
339 * of the estimated rate R̂ is used. This average R̅ is defined as
340 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
341 * Because it is difficult to find a fixed value for the coefficient α such that the
342 * averaging is without significant lag but oscillations are filtered out, a heuristic is
343 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
344 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
345 */
346 estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
347 if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
348 double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
349 alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
350 }
351 s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
352 s->estimated_rate = estimated_rate;
353 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
354 new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
355 s->last_latency = latency;
356
357 if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
358 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
359 new_rate = base_rate;
360 } else {
361 if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
362 new_rate = base_rate;
363 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
364 if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
365 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
366 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
367 }
368 }
369 s->sink_input->sample_spec.rate = new_rate;
370
371 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
372
373 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
374
375 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
376
377 s->last_rate_update = pa_timeval_load(&now);
378 }
379
380 if (pa_memblockq_is_readable(s->memblockq) &&
381 s->sink_input->thread_info.underrun_for > 0) {
382 pa_log_debug("Requesting rewind due to end of underrun");
383 pa_sink_input_request_rewind(s->sink_input,
384 (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
385 false, true, false);
386 }
387
388 return 1;
389 }
390
391 /* Called from I/O thread context */
392 static void sink_input_attach(pa_sink_input *i) {
393 struct session *s;
394 struct pollfd *p;
395
396 pa_sink_input_assert_ref(i);
397 pa_assert_se(s = i->userdata);
398
399 pa_assert(!s->rtpoll_item);
400 s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
401
402 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
403 p->fd = s->rtp_context.fd;
404 p->events = POLLIN;
405 p->revents = 0;
406
407 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
408 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
409 }
410
411 /* Called from I/O thread context */
412 static void sink_input_detach(pa_sink_input *i) {
413 struct session *s;
414 pa_sink_input_assert_ref(i);
415 pa_assert_se(s = i->userdata);
416
417 pa_assert(s->rtpoll_item);
418 pa_rtpoll_item_free(s->rtpoll_item);
419 s->rtpoll_item = NULL;
420 }
421
422 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
423 int af, fd = -1, r, one;
424
425 pa_assert(sa);
426 pa_assert(salen > 0);
427
428 af = sa->sa_family;
429 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
430 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
431 goto fail;
432 }
433
434 pa_make_udp_socket_low_delay(fd);
435
436 #ifdef SO_TIMESTAMP
437 one = 1;
438 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
439 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
440 goto fail;
441 }
442 #else
443 pa_log("SO_TIMESTAMP unsupported on this platform");
444 goto fail;
445 #endif
446
447 one = 1;
448 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
449 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
450 goto fail;
451 }
452
453 if (af == AF_INET) {
454 struct ip_mreq mr4;
455 memset(&mr4, 0, sizeof(mr4));
456 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
457 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
458 #ifdef HAVE_IPV6
459 } else if (af == AF_INET6) {
460 struct ipv6_mreq mr6;
461 memset(&mr6, 0, sizeof(mr6));
462 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
463 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
464 #endif
465 } else
466 pa_assert_not_reached();
467
468 if (r < 0) {
469 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
470 goto fail;
471 }
472
473 if (bind(fd, sa, salen) < 0) {
474 pa_log("bind() failed: %s", pa_cstrerror(errno));
475 goto fail;
476 }
477
478 return fd;
479
480 fail:
481 if (fd >= 0)
482 close(fd);
483
484 return -1;
485 }
486
487 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
488 struct session *s = NULL;
489 pa_sink *sink;
490 int fd = -1;
491 pa_memchunk silence;
492 pa_sink_input_new_data data;
493 struct timeval now;
494
495 pa_assert(u);
496 pa_assert(sdp_info);
497
498 if (u->n_sessions >= MAX_SESSIONS) {
499 pa_log("Session limit reached.");
500 goto fail;
501 }
502
503 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
504 pa_log("Sink does not exist.");
505 goto fail;
506 }
507
508 pa_rtclock_get(&now);
509
510 s = pa_xnew0(struct session, 1);
511 s->userdata = u;
512 s->first_packet = false;
513 s->sdp_info = *sdp_info;
514 s->rtpoll_item = NULL;
515 s->intended_latency = u->latency;
516 s->last_rate_update = pa_timeval_load(&now);
517 s->last_latency = u->latency;
518 s->estimated_rate = (double) sink->sample_spec.rate;
519 s->avg_estimated_rate = (double) sink->sample_spec.rate;
520 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
521
522 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
523 goto fail;
524
525 pa_sink_input_new_data_init(&data);
526 pa_sink_input_new_data_set_sink(&data, sink, false);
527 data.driver = __FILE__;
528 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
529 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
530 "RTP Stream%s%s%s",
531 sdp_info->session_name ? " (" : "",
532 sdp_info->session_name ? sdp_info->session_name : "",
533 sdp_info->session_name ? ")" : "");
534
535 if (sdp_info->session_name)
536 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
537 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
538 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
539 data.module = u->module;
540 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
541 data.flags = PA_SINK_INPUT_VARIABLE_RATE;
542
543 pa_sink_input_new(&s->sink_input, u->module->core, &data);
544 pa_sink_input_new_data_done(&data);
545
546 if (!s->sink_input) {
547 pa_log("Failed to create sink input.");
548 goto fail;
549 }
550
551 s->sink_input->userdata = s;
552
553 s->sink_input->parent.process_msg = sink_input_process_msg;
554 s->sink_input->pop = sink_input_pop_cb;
555 s->sink_input->process_rewind = sink_input_process_rewind_cb;
556 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
557 s->sink_input->kill = sink_input_kill;
558 s->sink_input->attach = sink_input_attach;
559 s->sink_input->detach = sink_input_detach;
560 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
561
562 pa_sink_input_get_silence(s->sink_input, &silence);
563
564 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
565
566 if (s->intended_latency < s->sink_latency*2)
567 s->intended_latency = s->sink_latency*2;
568
569 s->memblockq = pa_memblockq_new(
570 "module-rtp-recv memblockq",
571 0,
572 MEMBLOCKQ_MAXLENGTH,
573 MEMBLOCKQ_MAXLENGTH,
574 &s->sink_input->sample_spec,
575 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
576 0,
577 0,
578 &silence);
579
580 pa_memblock_unref(silence.memblock);
581
582 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
583
584 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
585 u->n_sessions++;
586 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
587
588 pa_sink_input_put(s->sink_input);
589
590 pa_log_info("New session '%s'", s->sdp_info.session_name);
591
592 return s;
593
594 fail:
595 pa_xfree(s);
596
597 if (fd >= 0)
598 pa_close(fd);
599
600 return NULL;
601 }
602
603 static void session_free(struct session *s) {
604 pa_assert(s);
605
606 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
607
608 pa_sink_input_unlink(s->sink_input);
609 pa_sink_input_unref(s->sink_input);
610
611 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
612 pa_assert(s->userdata->n_sessions >= 1);
613 s->userdata->n_sessions--;
614
615 pa_memblockq_free(s->memblockq);
616 pa_sdp_info_destroy(&s->sdp_info);
617 pa_rtp_context_destroy(&s->rtp_context);
618
619 pa_xfree(s);
620 }
621
622 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
623 struct userdata *u = userdata;
624 bool goodbye = false;
625 pa_sdp_info info;
626 struct session *s;
627
628 pa_assert(m);
629 pa_assert(e);
630 pa_assert(u);
631 pa_assert(fd == u->sap_context.fd);
632 pa_assert(flags == PA_IO_EVENT_INPUT);
633
634 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
635 return;
636
637 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
638 return;
639
640 if (goodbye) {
641
642 if ((s = pa_hashmap_remove(u->by_origin, info.origin)))
643 session_free(s);
644
645 pa_sdp_info_destroy(&info);
646 } else {
647
648 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
649 if (!session_new(u, &info))
650 pa_sdp_info_destroy(&info);
651
652 } else {
653 struct timeval now;
654 pa_rtclock_get(&now);
655 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
656
657 pa_sdp_info_destroy(&info);
658 }
659 }
660 }
661
662 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
663 struct session *s, *n;
664 struct userdata *u = userdata;
665 struct timeval now;
666
667 pa_assert(m);
668 pa_assert(t);
669 pa_assert(u);
670
671 pa_rtclock_get(&now);
672
673 pa_log_debug("Checking for dead streams ...");
674
675 for (s = u->sessions; s; s = n) {
676 int k;
677 n = s->next;
678
679 k = pa_atomic_load(&s->timestamp);
680
681 if (k + DEATH_TIMEOUT < now.tv_sec) {
682 pa_hashmap_remove(u->by_origin, s->sdp_info.origin);
683 session_free(s);
684 }
685 }
686
687 /* Restart timer */
688 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
689 }
690
691 int pa__init(pa_module*m) {
692 struct userdata *u;
693 pa_modargs *ma = NULL;
694 struct sockaddr_in sa4;
695 #ifdef HAVE_IPV6
696 struct sockaddr_in6 sa6;
697 #endif
698 struct sockaddr *sa;
699 socklen_t salen;
700 const char *sap_address;
701 uint32_t latency_msec;
702 int fd = -1;
703
704 pa_assert(m);
705
706 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
707 pa_log("failed to parse module arguments");
708 goto fail;
709 }
710
711 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
712
713 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
714 sa4.sin_family = AF_INET;
715 sa4.sin_port = htons(SAP_PORT);
716 sa = (struct sockaddr*) &sa4;
717 salen = sizeof(sa4);
718 #ifdef HAVE_IPV6
719 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
720 sa6.sin6_family = AF_INET6;
721 sa6.sin6_port = htons(SAP_PORT);
722 sa = (struct sockaddr*) &sa6;
723 salen = sizeof(sa6);
724 #endif
725 } else {
726 pa_log("Invalid SAP address '%s'", sap_address);
727 goto fail;
728 }
729
730 latency_msec = DEFAULT_LATENCY_MSEC;
731 if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) {
732 pa_log("Invalid latency specification");
733 goto fail;
734 }
735
736 if ((fd = mcast_socket(sa, salen)) < 0)
737 goto fail;
738
739 m->userdata = u = pa_xnew(struct userdata, 1);
740 u->module = m;
741 u->core = m->core;
742 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
743 u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
744
745 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
746 pa_sap_context_init_recv(&u->sap_context, fd);
747
748 PA_LLIST_HEAD_INIT(struct session, u->sessions);
749 u->n_sessions = 0;
750 u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
751
752 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
753
754 pa_modargs_free(ma);
755
756 return 0;
757
758 fail:
759 if (ma)
760 pa_modargs_free(ma);
761
762 if (fd >= 0)
763 pa_close(fd);
764
765 return -1;
766 }
767
768 void pa__done(pa_module*m) {
769 struct userdata *u;
770
771 pa_assert(m);
772
773 if (!(u = m->userdata))
774 return;
775
776 if (u->sap_event)
777 m->core->mainloop->io_free(u->sap_event);
778
779 if (u->check_death_event)
780 m->core->mainloop->time_free(u->check_death_event);
781
782 pa_sap_context_destroy(&u->sap_context);
783
784 if (u->by_origin)
785 pa_hashmap_free(u->by_origin);
786
787 pa_xfree(u->sink_name);
788 pa_xfree(u);
789 }