]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
module-rtp-recv: Remove smoother from write index
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
35
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/core-error.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/llist.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/memblockq.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/sample-util.h>
52 #include <pulsecore/macro.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/atomic.h>
55 #include <pulsecore/socket-util.h>
56 #include <pulsecore/once.h>
57
58 #include "module-rtp-recv-symdef.h"
59
60 #include "rtp.h"
61 #include "sdp.h"
62 #include "sap.h"
63
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION);
67 PA_MODULE_LOAD_ONCE(FALSE);
68 PA_MODULE_USAGE(
69 "sink=<name of the sink> "
70 "sap_address=<multicast address to listen on> "
71 );
72
73 #define SAP_PORT 9875
74 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
75 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
76 #define MAX_SESSIONS 16
77 #define DEATH_TIMEOUT 20
78 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
79 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
80
81 static const char* const valid_modargs[] = {
82 "sink",
83 "sap_address",
84 NULL
85 };
86
87 struct session {
88 struct userdata *userdata;
89 PA_LLIST_FIELDS(struct session);
90
91 pa_sink_input *sink_input;
92 pa_memblockq *memblockq;
93
94 pa_bool_t first_packet;
95 uint32_t ssrc;
96 uint32_t offset;
97
98 struct pa_sdp_info sdp_info;
99
100 pa_rtp_context rtp_context;
101
102 pa_rtpoll_item *rtpoll_item;
103
104 pa_atomic_t timestamp;
105
106 pa_usec_t intended_latency;
107 pa_usec_t sink_latency;
108
109 pa_usec_t last_rate_update;
110 pa_usec_t last_latency;
111 double estimated_rate;
112 double avg_estimated_rate;
113 };
114
115 struct userdata {
116 pa_module *module;
117 pa_core *core;
118
119 pa_sap_context sap_context;
120 pa_io_event* sap_event;
121
122 pa_time_event *check_death_event;
123
124 char *sink_name;
125
126 PA_LLIST_HEAD(struct session, sessions);
127 pa_hashmap *by_origin;
128 int n_sessions;
129 };
130
131 static void session_free(struct session *s);
132
133 /* Called from I/O thread context */
134 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
135 struct session *s = PA_SINK_INPUT(o)->userdata;
136
137 switch (code) {
138 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
139 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
140
141 /* Fall through, the default handler will add in the extra
142 * latency added by the resampler */
143 break;
144 }
145
146 return pa_sink_input_process_msg(o, code, data, offset, chunk);
147 }
148
149 /* Called from I/O thread context */
150 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
151 struct session *s;
152 pa_sink_input_assert_ref(i);
153 pa_assert_se(s = i->userdata);
154
155 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
156 return -1;
157
158 pa_memblockq_drop(s->memblockq, chunk->length);
159
160 return 0;
161 }
162
163 /* Called from I/O thread context */
164 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
165 struct session *s;
166
167 pa_sink_input_assert_ref(i);
168 pa_assert_se(s = i->userdata);
169
170 pa_memblockq_rewind(s->memblockq, nbytes);
171 }
172
173 /* Called from I/O thread context */
174 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
175 struct session *s;
176
177 pa_sink_input_assert_ref(i);
178 pa_assert_se(s = i->userdata);
179
180 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
181 }
182
183 /* Called from main context */
184 static void sink_input_kill(pa_sink_input* i) {
185 struct session *s;
186 pa_sink_input_assert_ref(i);
187 pa_assert_se(s = i->userdata);
188
189 session_free(s);
190 }
191
192 /* Called from IO context */
193 static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) {
194 struct session *s;
195 pa_sink_input_assert_ref(i);
196 pa_assert_se(s = i->userdata);
197
198 if (b)
199 pa_memblockq_flush_read(s->memblockq);
200 else
201 s->first_packet = FALSE;
202 }
203
204 /* Called from I/O thread context */
205 static int rtpoll_work_cb(pa_rtpoll_item *i) {
206 pa_memchunk chunk;
207 int64_t k, j, delta;
208 struct timeval now = { 0, 0 };
209 struct session *s;
210 struct pollfd *p;
211
212 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
213
214 p = pa_rtpoll_item_get_pollfd(i, NULL);
215
216 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
217 pa_log("poll() signalled bad revents.");
218 return -1;
219 }
220
221 if ((p->revents & POLLIN) == 0)
222 return 0;
223
224 p->revents = 0;
225
226 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
227 return 0;
228
229 if (s->sdp_info.payload != s->rtp_context.payload ||
230 !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
231 pa_memblock_unref(chunk.memblock);
232 return 0;
233 }
234
235 if (!s->first_packet) {
236 s->first_packet = TRUE;
237
238 s->ssrc = s->rtp_context.ssrc;
239 s->offset = s->rtp_context.timestamp;
240
241 if (s->ssrc == s->userdata->module->core->cookie)
242 pa_log_warn("Detected RTP packet loop!");
243 } else {
244 if (s->ssrc != s->rtp_context.ssrc) {
245 pa_memblock_unref(chunk.memblock);
246 return 0;
247 }
248 }
249
250 /* Check whether there was a timestamp overflow */
251 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
252 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
253
254 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
255 delta = k;
256 else
257 delta = j;
258
259 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
260
261 if (now.tv_sec == 0) {
262 PA_ONCE_BEGIN {
263 pa_log_warn("Using artificial time instead of timestamp");
264 } PA_ONCE_END;
265 pa_rtclock_get(&now);
266 } else
267 pa_rtclock_from_wallclock(&now);
268
269 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
270 pa_log_warn("Queue overrun");
271 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
272 }
273
274 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
275
276 pa_memblock_unref(chunk.memblock);
277
278 /* The next timestamp we expect */
279 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
280
281 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
282
283 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
284 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
285 uint32_t base_rate = s->sink_input->sink->sample_spec.rate;
286 uint32_t current_rate = s->sink_input->sample_spec.rate;
287 uint32_t new_rate;
288 double estimated_rate, alpha = 0.02;
289
290 pa_log_debug("Updating sample rate");
291
292 wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
293 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
294
295 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
296
297 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
298 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
299
300 if (ri > render_delay+sink_delay)
301 ri -= render_delay+sink_delay;
302 else
303 ri = 0;
304
305 if (wi < ri)
306 latency = 0;
307 else
308 latency = wi - ri;
309
310 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
311
312 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
313 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
314 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
315 * T
316 * R̂ = ─────────────── Rⁿ . (1)
317 * T - (Lⁿ - Lⁿ⁻ⁱ)
318 *
319 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
320 * is correct). But there is also the requirement to keep the buffer at a predefined target
321 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
322 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
323 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
324 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
325 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
326 * ʲ⁼ⁱ R̂ a a
327 * Solving for Rⁿ⁺ⁱ gives
328 * T - ²∕ₐ₊₁(L̂ - Lⁿ)
329 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
330 * T
331 * In the code below a = 7 is used.
332 *
333 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
334 * of the estimated rate R̂ is used. This average R̅ is defined as
335 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
336 * Because it is difficult to find a fixed value for the coefficient α such that the
337 * averaging is without significant lag but oscillations are filtered out, a heuristic is
338 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
339 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
340 */
341 estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
342 if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
343 double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
344 alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
345 }
346 s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
347 s->estimated_rate = estimated_rate;
348 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
349 new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
350 s->last_latency = latency;
351
352 if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
353 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
354 new_rate = base_rate;
355 } else {
356 if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
357 new_rate = base_rate;
358 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
359 if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
360 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
361 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
362 }
363 }
364 s->sink_input->sample_spec.rate = new_rate;
365
366 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
367
368 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
369
370 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
371
372 s->last_rate_update = pa_timeval_load(&now);
373 }
374
375 if (pa_memblockq_is_readable(s->memblockq) &&
376 s->sink_input->thread_info.underrun_for > 0) {
377 pa_log_debug("Requesting rewind due to end of underrun");
378 pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
379 }
380
381 return 1;
382 }
383
384 /* Called from I/O thread context */
385 static void sink_input_attach(pa_sink_input *i) {
386 struct session *s;
387 struct pollfd *p;
388
389 pa_sink_input_assert_ref(i);
390 pa_assert_se(s = i->userdata);
391
392 pa_assert(!s->rtpoll_item);
393 s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
394
395 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
396 p->fd = s->rtp_context.fd;
397 p->events = POLLIN;
398 p->revents = 0;
399
400 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
401 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
402 }
403
404 /* Called from I/O thread context */
405 static void sink_input_detach(pa_sink_input *i) {
406 struct session *s;
407 pa_sink_input_assert_ref(i);
408 pa_assert_se(s = i->userdata);
409
410 pa_assert(s->rtpoll_item);
411 pa_rtpoll_item_free(s->rtpoll_item);
412 s->rtpoll_item = NULL;
413 }
414
415 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
416 int af, fd = -1, r, one;
417
418 pa_assert(sa);
419 pa_assert(salen > 0);
420
421 af = sa->sa_family;
422 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
423 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
424 goto fail;
425 }
426
427 pa_make_udp_socket_low_delay(fd);
428
429 one = 1;
430 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
431 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
432 goto fail;
433 }
434
435 one = 1;
436 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
437 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
438 goto fail;
439 }
440
441 if (af == AF_INET) {
442 struct ip_mreq mr4;
443 memset(&mr4, 0, sizeof(mr4));
444 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
445 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
446 #ifdef HAVE_IPV6
447 } else {
448 struct ipv6_mreq mr6;
449 memset(&mr6, 0, sizeof(mr6));
450 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
451 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
452 #endif
453 }
454
455 if (r < 0) {
456 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
457 goto fail;
458 }
459
460 if (bind(fd, sa, salen) < 0) {
461 pa_log("bind() failed: %s", pa_cstrerror(errno));
462 goto fail;
463 }
464
465 return fd;
466
467 fail:
468 if (fd >= 0)
469 close(fd);
470
471 return -1;
472 }
473
474 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
475 struct session *s = NULL;
476 pa_sink *sink;
477 int fd = -1;
478 pa_memchunk silence;
479 pa_sink_input_new_data data;
480 struct timeval now;
481
482 pa_assert(u);
483 pa_assert(sdp_info);
484
485 if (u->n_sessions >= MAX_SESSIONS) {
486 pa_log("Session limit reached.");
487 goto fail;
488 }
489
490 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
491 pa_log("Sink does not exist.");
492 goto fail;
493 }
494
495 pa_rtclock_get(&now);
496
497 s = pa_xnew0(struct session, 1);
498 s->userdata = u;
499 s->first_packet = FALSE;
500 s->sdp_info = *sdp_info;
501 s->rtpoll_item = NULL;
502 s->intended_latency = LATENCY_USEC;
503 s->last_rate_update = pa_timeval_load(&now);
504 s->last_latency = LATENCY_USEC;
505 s->estimated_rate = (double) sink->sample_spec.rate;
506 s->avg_estimated_rate = (double) sink->sample_spec.rate;
507 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
508
509 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
510 goto fail;
511
512 pa_sink_input_new_data_init(&data);
513 data.sink = sink;
514 data.driver = __FILE__;
515 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
516 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
517 "RTP Stream%s%s%s",
518 sdp_info->session_name ? " (" : "",
519 sdp_info->session_name ? sdp_info->session_name : "",
520 sdp_info->session_name ? ")" : "");
521
522 if (sdp_info->session_name)
523 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
524 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
525 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
526 data.module = u->module;
527 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
528 data.flags = PA_SINK_INPUT_VARIABLE_RATE;
529
530 pa_sink_input_new(&s->sink_input, u->module->core, &data);
531 pa_sink_input_new_data_done(&data);
532
533 if (!s->sink_input) {
534 pa_log("Failed to create sink input.");
535 goto fail;
536 }
537
538 s->sink_input->userdata = s;
539
540 s->sink_input->parent.process_msg = sink_input_process_msg;
541 s->sink_input->pop = sink_input_pop_cb;
542 s->sink_input->process_rewind = sink_input_process_rewind_cb;
543 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
544 s->sink_input->kill = sink_input_kill;
545 s->sink_input->attach = sink_input_attach;
546 s->sink_input->detach = sink_input_detach;
547 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
548
549 pa_sink_input_get_silence(s->sink_input, &silence);
550
551 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
552
553 if (s->intended_latency < s->sink_latency*2)
554 s->intended_latency = s->sink_latency*2;
555
556 s->memblockq = pa_memblockq_new(
557 0,
558 MEMBLOCKQ_MAXLENGTH,
559 MEMBLOCKQ_MAXLENGTH,
560 pa_frame_size(&s->sink_input->sample_spec),
561 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
562 0,
563 0,
564 &silence);
565
566 pa_memblock_unref(silence.memblock);
567
568 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
569
570 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
571 u->n_sessions++;
572 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
573
574 pa_sink_input_put(s->sink_input);
575
576 pa_log_info("New session '%s'", s->sdp_info.session_name);
577
578 return s;
579
580 fail:
581 pa_xfree(s);
582
583 if (fd >= 0)
584 pa_close(fd);
585
586 return NULL;
587 }
588
589 static void session_free(struct session *s) {
590 pa_assert(s);
591
592 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
593
594 pa_sink_input_unlink(s->sink_input);
595 pa_sink_input_unref(s->sink_input);
596
597 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
598 pa_assert(s->userdata->n_sessions >= 1);
599 s->userdata->n_sessions--;
600 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
601
602 pa_memblockq_free(s->memblockq);
603 pa_sdp_info_destroy(&s->sdp_info);
604 pa_rtp_context_destroy(&s->rtp_context);
605
606 pa_xfree(s);
607 }
608
609 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
610 struct userdata *u = userdata;
611 pa_bool_t goodbye = FALSE;
612 pa_sdp_info info;
613 struct session *s;
614
615 pa_assert(m);
616 pa_assert(e);
617 pa_assert(u);
618 pa_assert(fd == u->sap_context.fd);
619 pa_assert(flags == PA_IO_EVENT_INPUT);
620
621 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
622 return;
623
624 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
625 return;
626
627 if (goodbye) {
628
629 if ((s = pa_hashmap_get(u->by_origin, info.origin)))
630 session_free(s);
631
632 pa_sdp_info_destroy(&info);
633 } else {
634
635 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
636 if (!session_new(u, &info))
637 pa_sdp_info_destroy(&info);
638
639 } else {
640 struct timeval now;
641 pa_rtclock_get(&now);
642 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
643
644 pa_sdp_info_destroy(&info);
645 }
646 }
647 }
648
649 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
650 struct session *s, *n;
651 struct userdata *u = userdata;
652 struct timeval now;
653
654 pa_assert(m);
655 pa_assert(t);
656 pa_assert(u);
657
658 pa_rtclock_get(&now);
659
660 pa_log_debug("Checking for dead streams ...");
661
662 for (s = u->sessions; s; s = n) {
663 int k;
664 n = s->next;
665
666 k = pa_atomic_load(&s->timestamp);
667
668 if (k + DEATH_TIMEOUT < now.tv_sec)
669 session_free(s);
670 }
671
672 /* Restart timer */
673 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
674 }
675
676 int pa__init(pa_module*m) {
677 struct userdata *u;
678 pa_modargs *ma = NULL;
679 struct sockaddr_in sa4;
680 #ifdef HAVE_IPV6
681 struct sockaddr_in6 sa6;
682 #endif
683 struct sockaddr *sa;
684 socklen_t salen;
685 const char *sap_address;
686 int fd = -1;
687
688 pa_assert(m);
689
690 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
691 pa_log("failed to parse module arguments");
692 goto fail;
693 }
694
695 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
696
697 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
698 sa4.sin_family = AF_INET;
699 sa4.sin_port = htons(SAP_PORT);
700 sa = (struct sockaddr*) &sa4;
701 salen = sizeof(sa4);
702 #ifdef HAVE_IPV6
703 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
704 sa6.sin6_family = AF_INET6;
705 sa6.sin6_port = htons(SAP_PORT);
706 sa = (struct sockaddr*) &sa6;
707 salen = sizeof(sa6);
708 #endif
709 } else {
710 pa_log("Invalid SAP address '%s'", sap_address);
711 goto fail;
712 }
713
714 if ((fd = mcast_socket(sa, salen)) < 0)
715 goto fail;
716
717 m->userdata = u = pa_xnew(struct userdata, 1);
718 u->module = m;
719 u->core = m->core;
720 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
721
722 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
723 pa_sap_context_init_recv(&u->sap_context, fd);
724
725 PA_LLIST_HEAD_INIT(struct session, u->sessions);
726 u->n_sessions = 0;
727 u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
728
729 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
730
731 pa_modargs_free(ma);
732
733 return 0;
734
735 fail:
736 if (ma)
737 pa_modargs_free(ma);
738
739 if (fd >= 0)
740 pa_close(fd);
741
742 return -1;
743 }
744
745 void pa__done(pa_module*m) {
746 struct userdata *u;
747 struct session *s;
748
749 pa_assert(m);
750
751 if (!(u = m->userdata))
752 return;
753
754 if (u->sap_event)
755 m->core->mainloop->io_free(u->sap_event);
756
757 if (u->check_death_event)
758 m->core->mainloop->time_free(u->check_death_event);
759
760 pa_sap_context_destroy(&u->sap_context);
761
762 if (u->by_origin) {
763 while ((s = pa_hashmap_first(u->by_origin)))
764 session_free(s);
765
766 pa_hashmap_free(u->by_origin, NULL, NULL);
767 }
768
769 pa_xfree(u->sink_name);
770 pa_xfree(u);
771 }