]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
Modify smoothing code to make cubic interpolation optional and allow 'quick fixups...
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
35
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-util.h>
47 #include <pulsecore/modargs.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/sample-util.h>
50 #include <pulsecore/macro.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/rtclock.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/time-smoother.h>
55
56 #include "module-rtp-recv-symdef.h"
57
58 #include "rtp.h"
59 #include "sdp.h"
60 #include "sap.h"
61
62 PA_MODULE_AUTHOR("Lennart Poettering");
63 PA_MODULE_DESCRIPTION("Recieve data from a network via RTP/SAP/SDP");
64 PA_MODULE_VERSION(PACKAGE_VERSION);
65 PA_MODULE_LOAD_ONCE(FALSE);
66 PA_MODULE_USAGE(
67 "sink=<name of the sink> "
68 "sap_address=<multicast address to listen on> "
69 );
70
71 #define SAP_PORT 9875
72 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
73 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
74 #define MAX_SESSIONS 16
75 #define DEATH_TIMEOUT 20
76 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
77 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
78
79 static const char* const valid_modargs[] = {
80 "sink",
81 "sap_address",
82 NULL
83 };
84
85 struct session {
86 struct userdata *userdata;
87 PA_LLIST_FIELDS(struct session);
88
89 pa_sink_input *sink_input;
90 pa_memblockq *memblockq;
91
92 pa_bool_t first_packet;
93 uint32_t ssrc;
94 uint32_t offset;
95
96 struct pa_sdp_info sdp_info;
97
98 pa_rtp_context rtp_context;
99
100 pa_rtpoll_item *rtpoll_item;
101
102 pa_atomic_t timestamp;
103
104 pa_smoother *smoother;
105 pa_usec_t intended_latency;
106 pa_usec_t sink_latency;
107
108 pa_usec_t last_rate_update;
109 };
110
111 struct userdata {
112 pa_module *module;
113
114 pa_sap_context sap_context;
115 pa_io_event* sap_event;
116
117 pa_time_event *check_death_event;
118
119 char *sink_name;
120
121 PA_LLIST_HEAD(struct session, sessions);
122 pa_hashmap *by_origin;
123 int n_sessions;
124 };
125
126 static void session_free(struct session *s);
127
128 /* Called from I/O thread context */
129 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
130 struct session *s = PA_SINK_INPUT(o)->userdata;
131
132 switch (code) {
133 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
134 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
135
136 /* Fall through, the default handler will add in the extra
137 * latency added by the resampler */
138 break;
139 }
140
141 return pa_sink_input_process_msg(o, code, data, offset, chunk);
142 }
143
144 /* Called from I/O thread context */
145 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
146 struct session *s;
147 pa_sink_input_assert_ref(i);
148 pa_assert_se(s = i->userdata);
149
150 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
151 return -1;
152
153 pa_memblockq_drop(s->memblockq, chunk->length);
154
155 return 0;
156 }
157
158 /* Called from I/O thread context */
159 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
160 struct session *s;
161
162 pa_sink_input_assert_ref(i);
163 pa_assert_se(s = i->userdata);
164
165 pa_memblockq_rewind(s->memblockq, nbytes);
166 }
167
168 /* Called from thread context */
169 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
170 struct session *s;
171
172 pa_sink_input_assert_ref(i);
173 pa_assert_se(s = i->userdata);
174
175 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
176 }
177
178 /* Called from main context */
179 static void sink_input_kill(pa_sink_input* i) {
180 struct session *s;
181 pa_sink_input_assert_ref(i);
182 pa_assert_se(s = i->userdata);
183
184 session_free(s);
185 }
186
187 /* Called from I/O thread context */
188 static int rtpoll_work_cb(pa_rtpoll_item *i) {
189 pa_memchunk chunk;
190 int64_t k, j, delta;
191 struct timeval now;
192 struct session *s;
193 struct pollfd *p;
194
195 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
196
197 p = pa_rtpoll_item_get_pollfd(i, NULL);
198
199 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
200 pa_log("poll() signalled bad revents.");
201 return -1;
202 }
203
204 if ((p->revents & POLLIN) == 0)
205 return 0;
206
207 p->revents = 0;
208
209 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool) < 0)
210 return 0;
211
212 if (s->sdp_info.payload != s->rtp_context.payload) {
213 pa_memblock_unref(chunk.memblock);
214 return 0;
215 }
216
217 if (!s->first_packet) {
218 s->first_packet = TRUE;
219
220 s->ssrc = s->rtp_context.ssrc;
221 s->offset = s->rtp_context.timestamp;
222
223 if (s->ssrc == s->userdata->module->core->cookie)
224 pa_log_warn("Detected RTP packet loop!");
225 } else {
226 if (s->ssrc != s->rtp_context.ssrc) {
227 pa_memblock_unref(chunk.memblock);
228 return 0;
229 }
230 }
231
232 /* Check wheter there was a timestamp overflow */
233 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
234 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
235
236 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
237 delta = k;
238 else
239 delta = j;
240
241 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
242
243 pa_rtclock_get(&now);
244
245 pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec));
246
247 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
248 pa_log_warn("Queue overrun");
249 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
250 }
251
252 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
253
254 pa_memblock_unref(chunk.memblock);
255
256 /* The next timestamp we expect */
257 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
258
259 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
260
261 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
262 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency, fix;
263 unsigned fix_samples;
264
265 pa_log("Updating sample rate");
266
267 wi = pa_smoother_get(s->smoother, pa_timeval_load(&now));
268 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
269
270 if (PA_MSGOBJECT(s->sink_input->sink)->process_msg(PA_MSGOBJECT(s->sink_input->sink), PA_SINK_MESSAGE_GET_LATENCY, &sink_delay, 0, NULL) < 0)
271 sink_delay = 0;
272
273 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
274
275 if (ri > render_delay+sink_delay)
276 ri -= render_delay+sink_delay;
277 else
278 ri = 0;
279
280 if (wi < ri)
281 latency = 0;
282 else
283 latency = wi - ri;
284
285 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
286
287 /* Calculate deviation */
288 if (latency < s->intended_latency)
289 fix = s->intended_latency - latency;
290 else
291 fix = latency - s->intended_latency;
292
293 /* How many samples is this per second? */
294 fix_samples = (unsigned) (fix * (pa_usec_t) s->sink_input->thread_info.sample_spec.rate / (pa_usec_t) RATE_UPDATE_INTERVAL);
295
296 /* Check if deviation is in bounds */
297 if (fix_samples > s->sink_input->sample_spec.rate*.20)
298 pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples);
299
300 /* Fix up rate */
301 if (latency < s->intended_latency)
302 s->sink_input->sample_spec.rate -= fix_samples;
303 else
304 s->sink_input->sample_spec.rate += fix_samples;
305
306 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
307
308 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
309
310 s->last_rate_update = pa_timeval_load(&now);
311 }
312
313 if (pa_memblockq_is_readable(s->memblockq) &&
314 s->sink_input->thread_info.underrun_for > 0) {
315 pa_log_debug("Requesting rewind due to end of underrun");
316 pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
317 }
318
319 return 1;
320 }
321
322 /* Called from I/O thread context */
323 static void sink_input_attach(pa_sink_input *i) {
324 struct session *s;
325 struct pollfd *p;
326
327 pa_sink_input_assert_ref(i);
328 pa_assert_se(s = i->userdata);
329
330 pa_assert(!s->rtpoll_item);
331 s->rtpoll_item = pa_rtpoll_item_new(i->sink->rtpoll, PA_RTPOLL_LATE, 1);
332
333 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
334 p->fd = s->rtp_context.fd;
335 p->events = POLLIN;
336 p->revents = 0;
337
338 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
339 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
340 }
341
342 /* Called from I/O thread context */
343 static void sink_input_detach(pa_sink_input *i) {
344 struct session *s;
345 pa_sink_input_assert_ref(i);
346 pa_assert_se(s = i->userdata);
347
348 pa_assert(s->rtpoll_item);
349 pa_rtpoll_item_free(s->rtpoll_item);
350 s->rtpoll_item = NULL;
351 }
352
353 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
354 int af, fd = -1, r, one;
355
356 pa_assert(sa);
357 pa_assert(salen > 0);
358
359 af = sa->sa_family;
360 if ((fd = socket(af, SOCK_DGRAM, 0)) < 0) {
361 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
362 goto fail;
363 }
364
365 one = 1;
366 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
367 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
368 goto fail;
369 }
370
371 if (af == AF_INET) {
372 struct ip_mreq mr4;
373 memset(&mr4, 0, sizeof(mr4));
374 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
375 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
376 #ifdef HAVE_IPV6
377 } else {
378 struct ipv6_mreq mr6;
379 memset(&mr6, 0, sizeof(mr6));
380 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
381 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
382 #endif
383 }
384
385 if (r < 0) {
386 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
387 goto fail;
388 }
389
390 if (bind(fd, sa, salen) < 0) {
391 pa_log("bind() failed: %s", pa_cstrerror(errno));
392 goto fail;
393 }
394
395 return fd;
396
397 fail:
398 if (fd >= 0)
399 close(fd);
400
401 return -1;
402 }
403
404 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
405 struct session *s = NULL;
406 pa_sink *sink;
407 int fd = -1;
408 pa_memchunk silence;
409 pa_sink_input_new_data data;
410 struct timeval now;
411
412 pa_assert(u);
413 pa_assert(sdp_info);
414
415 if (u->n_sessions >= MAX_SESSIONS) {
416 pa_log("Session limit reached.");
417 goto fail;
418 }
419
420 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
421 pa_log("Sink does not exist.");
422 goto fail;
423 }
424
425 pa_rtclock_get(&now);
426
427 s = pa_xnew0(struct session, 1);
428 s->userdata = u;
429 s->first_packet = FALSE;
430 s->sdp_info = *sdp_info;
431 s->rtpoll_item = NULL;
432 s->intended_latency = LATENCY_USEC;
433 s->smoother = pa_smoother_new(
434 PA_USEC_PER_SEC*5,
435 PA_USEC_PER_SEC*2,
436 TRUE,
437 TRUE,
438 10,
439 pa_timeval_load(&now),
440 FALSE);
441 s->last_rate_update = pa_timeval_load(&now);
442 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
443
444 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
445 goto fail;
446
447 pa_sink_input_new_data_init(&data);
448 data.sink = sink;
449 data.driver = __FILE__;
450 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
451 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
452 "RTP Stream%s%s%s",
453 sdp_info->session_name ? " (" : "",
454 sdp_info->session_name ? sdp_info->session_name : "",
455 sdp_info->session_name ? ")" : "");
456
457 if (sdp_info->session_name)
458 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
459 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
460 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
461 data.module = u->module;
462 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
463
464 pa_sink_input_new(&s->sink_input, u->module->core, &data, PA_SINK_INPUT_VARIABLE_RATE);
465 pa_sink_input_new_data_done(&data);
466
467 if (!s->sink_input) {
468 pa_log("Failed to create sink input.");
469 goto fail;
470 }
471
472 s->sink_input->userdata = s;
473
474 s->sink_input->parent.process_msg = sink_input_process_msg;
475 s->sink_input->pop = sink_input_pop_cb;
476 s->sink_input->process_rewind = sink_input_process_rewind_cb;
477 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
478 s->sink_input->kill = sink_input_kill;
479 s->sink_input->attach = sink_input_attach;
480 s->sink_input->detach = sink_input_detach;
481
482 pa_sink_input_get_silence(s->sink_input, &silence);
483
484 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
485
486 if (s->intended_latency < s->sink_latency*2)
487 s->intended_latency = s->sink_latency*2;
488
489 s->memblockq = pa_memblockq_new(
490 0,
491 MEMBLOCKQ_MAXLENGTH,
492 MEMBLOCKQ_MAXLENGTH,
493 pa_frame_size(&s->sink_input->sample_spec),
494 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
495 0,
496 0,
497 &silence);
498
499 pa_memblock_unref(silence.memblock);
500
501 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
502
503 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
504 u->n_sessions++;
505 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
506
507 pa_sink_input_put(s->sink_input);
508
509 pa_log_info("New session '%s'", s->sdp_info.session_name);
510
511 return s;
512
513 fail:
514 pa_xfree(s);
515
516 if (fd >= 0)
517 pa_close(fd);
518
519 return NULL;
520 }
521
522 static void session_free(struct session *s) {
523 pa_assert(s);
524
525 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
526
527 pa_sink_input_unlink(s->sink_input);
528 pa_sink_input_unref(s->sink_input);
529
530 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
531 pa_assert(s->userdata->n_sessions >= 1);
532 s->userdata->n_sessions--;
533 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
534
535 pa_memblockq_free(s->memblockq);
536 pa_sdp_info_destroy(&s->sdp_info);
537 pa_rtp_context_destroy(&s->rtp_context);
538
539 pa_smoother_free(s->smoother);
540
541 pa_xfree(s);
542 }
543
544 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
545 struct userdata *u = userdata;
546 pa_bool_t goodbye = FALSE;
547 pa_sdp_info info;
548 struct session *s;
549
550 pa_assert(m);
551 pa_assert(e);
552 pa_assert(u);
553 pa_assert(fd == u->sap_context.fd);
554 pa_assert(flags == PA_IO_EVENT_INPUT);
555
556 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
557 return;
558
559 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
560 return;
561
562 if (goodbye) {
563
564 if ((s = pa_hashmap_get(u->by_origin, info.origin)))
565 session_free(s);
566
567 pa_sdp_info_destroy(&info);
568 } else {
569
570 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
571 if (!session_new(u, &info))
572 pa_sdp_info_destroy(&info);
573
574 } else {
575 struct timeval now;
576 pa_rtclock_get(&now);
577 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
578
579 pa_sdp_info_destroy(&info);
580 }
581 }
582 }
583
584 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *ptv, void *userdata) {
585 struct session *s, *n;
586 struct userdata *u = userdata;
587 struct timeval now;
588 struct timeval tv;
589
590 pa_assert(m);
591 pa_assert(t);
592 pa_assert(ptv);
593 pa_assert(u);
594
595 pa_rtclock_get(&now);
596
597 pa_log_debug("Checking for dead streams ...");
598
599 for (s = u->sessions; s; s = n) {
600 int k;
601 n = s->next;
602
603 k = pa_atomic_load(&s->timestamp);
604
605 if (k + DEATH_TIMEOUT < now.tv_sec)
606 session_free(s);
607 }
608
609 /* Restart timer */
610 pa_gettimeofday(&tv);
611 pa_timeval_add(&tv, DEATH_TIMEOUT*PA_USEC_PER_SEC);
612 m->time_restart(t, &tv);
613 }
614
615 int pa__init(pa_module*m) {
616 struct userdata *u;
617 pa_modargs *ma = NULL;
618 struct sockaddr_in sa4;
619 #ifdef HAVE_IPV6
620 struct sockaddr_in6 sa6;
621 #endif
622 struct sockaddr *sa;
623 socklen_t salen;
624 const char *sap_address;
625 int fd = -1;
626 struct timeval tv;
627
628 pa_assert(m);
629
630 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
631 pa_log("failed to parse module arguments");
632 goto fail;
633 }
634
635 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
636
637 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
638 sa4.sin_family = AF_INET;
639 sa4.sin_port = htons(SAP_PORT);
640 sa = (struct sockaddr*) &sa4;
641 salen = sizeof(sa4);
642 #ifdef HAVE_IPV6
643 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
644 sa6.sin6_family = AF_INET6;
645 sa6.sin6_port = htons(SAP_PORT);
646 sa = (struct sockaddr*) &sa6;
647 salen = sizeof(sa6);
648 #endif
649 } else {
650 pa_log("Invalid SAP address '%s'", sap_address);
651 goto fail;
652 }
653
654 if ((fd = mcast_socket(sa, salen)) < 0)
655 goto fail;
656
657 u = pa_xnew(struct userdata, 1);
658 m->userdata = u;
659 u->module = m;
660 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
661
662 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
663 pa_sap_context_init_recv(&u->sap_context, fd);
664
665 PA_LLIST_HEAD_INIT(struct session, u->sessions);
666 u->n_sessions = 0;
667 u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
668
669 pa_gettimeofday(&tv);
670 pa_timeval_add(&tv, DEATH_TIMEOUT * PA_USEC_PER_SEC);
671 u->check_death_event = m->core->mainloop->time_new(m->core->mainloop, &tv, check_death_event_cb, u);
672
673 pa_modargs_free(ma);
674
675 return 0;
676
677 fail:
678 if (ma)
679 pa_modargs_free(ma);
680
681 if (fd >= 0)
682 pa_close(fd);
683
684 return -1;
685 }
686
687 void pa__done(pa_module*m) {
688 struct userdata *u;
689 struct session *s;
690
691 pa_assert(m);
692
693 if (!(u = m->userdata))
694 return;
695
696 if (u->sap_event)
697 m->core->mainloop->io_free(u->sap_event);
698
699 if (u->check_death_event)
700 m->core->mainloop->time_free(u->check_death_event);
701
702 pa_sap_context_destroy(&u->sap_context);
703
704 if (u->by_origin) {
705 while ((s = pa_hashmap_first(u->by_origin)))
706 session_free(s);
707
708 pa_hashmap_free(u->by_origin, NULL, NULL);
709 }
710
711 pa_xfree(u->sink_name);
712 pa_xfree(u);
713 }