3 This file is part of PulseAudio.
5 Copyright 2006 Lennart Poettering
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/core-error.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/llist.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/memblockq.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/sample-util.h>
52 #include <pulsecore/macro.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/atomic.h>
55 #include <pulsecore/time-smoother.h>
56 #include <pulsecore/socket-util.h>
57 #include <pulsecore/once.h>
59 #include "module-rtp-recv-symdef.h"
65 PA_MODULE_AUTHOR("Lennart Poettering");
66 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
67 PA_MODULE_VERSION(PACKAGE_VERSION
);
68 PA_MODULE_LOAD_ONCE(FALSE
);
70 "sink=<name of the sink> "
71 "sap_address=<multicast address to listen on> "
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
77 #define MAX_SESSIONS 16
78 #define DEATH_TIMEOUT 20
79 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
80 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
82 static const char* const valid_modargs
[] = {
89 struct userdata
*userdata
;
90 PA_LLIST_FIELDS(struct session
);
92 pa_sink_input
*sink_input
;
93 pa_memblockq
*memblockq
;
95 pa_bool_t first_packet
;
99 struct pa_sdp_info sdp_info
;
101 pa_rtp_context rtp_context
;
103 pa_rtpoll_item
*rtpoll_item
;
105 pa_atomic_t timestamp
;
107 pa_smoother
*smoother
;
108 pa_usec_t intended_latency
;
109 pa_usec_t sink_latency
;
111 pa_usec_t last_rate_update
;
118 pa_sap_context sap_context
;
119 pa_io_event
* sap_event
;
121 pa_time_event
*check_death_event
;
125 PA_LLIST_HEAD(struct session
, sessions
);
126 pa_hashmap
*by_origin
;
130 static void session_free(struct session
*s
);
132 /* Called from I/O thread context */
133 static int sink_input_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
134 struct session
*s
= PA_SINK_INPUT(o
)->userdata
;
137 case PA_SINK_INPUT_MESSAGE_GET_LATENCY
:
138 *((pa_usec_t
*) data
) = pa_bytes_to_usec(pa_memblockq_get_length(s
->memblockq
), &s
->sink_input
->sample_spec
);
140 /* Fall through, the default handler will add in the extra
141 * latency added by the resampler */
145 return pa_sink_input_process_msg(o
, code
, data
, offset
, chunk
);
148 /* Called from I/O thread context */
149 static int sink_input_pop_cb(pa_sink_input
*i
, size_t length
, pa_memchunk
*chunk
) {
151 pa_sink_input_assert_ref(i
);
152 pa_assert_se(s
= i
->userdata
);
154 if (pa_memblockq_peek(s
->memblockq
, chunk
) < 0)
157 pa_memblockq_drop(s
->memblockq
, chunk
->length
);
162 /* Called from I/O thread context */
163 static void sink_input_process_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
166 pa_sink_input_assert_ref(i
);
167 pa_assert_se(s
= i
->userdata
);
169 pa_memblockq_rewind(s
->memblockq
, nbytes
);
172 /* Called from I/O thread context */
173 static void sink_input_update_max_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
176 pa_sink_input_assert_ref(i
);
177 pa_assert_se(s
= i
->userdata
);
179 pa_memblockq_set_maxrewind(s
->memblockq
, nbytes
);
182 /* Called from main context */
183 static void sink_input_kill(pa_sink_input
* i
) {
185 pa_sink_input_assert_ref(i
);
186 pa_assert_se(s
= i
->userdata
);
191 /* Called from IO context */
192 static void sink_input_suspend_within_thread(pa_sink_input
* i
, pa_bool_t b
) {
194 pa_sink_input_assert_ref(i
);
195 pa_assert_se(s
= i
->userdata
);
198 pa_smoother_pause(s
->smoother
, pa_rtclock_now());
199 pa_memblockq_flush_read(s
->memblockq
);
201 s
->first_packet
= FALSE
;
204 /* Called from I/O thread context */
205 static int rtpoll_work_cb(pa_rtpoll_item
*i
) {
208 struct timeval now
= { 0, 0 };
212 pa_assert_se(s
= pa_rtpoll_item_get_userdata(i
));
214 p
= pa_rtpoll_item_get_pollfd(i
, NULL
);
216 if (p
->revents
& (POLLERR
|POLLNVAL
|POLLHUP
|POLLOUT
)) {
217 pa_log("poll() signalled bad revents.");
221 if ((p
->revents
& POLLIN
) == 0)
226 if (pa_rtp_recv(&s
->rtp_context
, &chunk
, s
->userdata
->module
->core
->mempool
, &now
) < 0)
229 if (s
->sdp_info
.payload
!= s
->rtp_context
.payload
||
230 !PA_SINK_IS_OPENED(s
->sink_input
->sink
->thread_info
.state
)) {
231 pa_memblock_unref(chunk
.memblock
);
235 if (!s
->first_packet
) {
236 s
->first_packet
= TRUE
;
238 s
->ssrc
= s
->rtp_context
.ssrc
;
239 s
->offset
= s
->rtp_context
.timestamp
;
241 if (s
->ssrc
== s
->userdata
->module
->core
->cookie
)
242 pa_log_warn("Detected RTP packet loop!");
244 if (s
->ssrc
!= s
->rtp_context
.ssrc
) {
245 pa_memblock_unref(chunk
.memblock
);
250 /* Check whether there was a timestamp overflow */
251 k
= (int64_t) s
->rtp_context
.timestamp
- (int64_t) s
->offset
;
252 j
= (int64_t) 0x100000000LL
- (int64_t) s
->offset
+ (int64_t) s
->rtp_context
.timestamp
;
254 if ((k
< 0 ? -k
: k
) < (j
< 0 ? -j
: j
))
259 pa_memblockq_seek(s
->memblockq
, delta
* (int64_t) s
->rtp_context
.frame_size
, PA_SEEK_RELATIVE
, TRUE
);
261 if (now
.tv_sec
== 0) {
263 pa_log_warn("Using artificial time instead of timestamp");
265 pa_rtclock_get(&now
);
267 pa_rtclock_from_wallclock(&now
);
269 pa_smoother_put(s
->smoother
, pa_timeval_load(&now
), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s
->memblockq
), &s
->sink_input
->sample_spec
));
271 /* Tell the smoother that we are rolling now, in case it is still paused */
272 pa_smoother_resume(s
->smoother
, pa_timeval_load(&now
), TRUE
);
274 if (pa_memblockq_push(s
->memblockq
, &chunk
) < 0) {
275 pa_log_warn("Queue overrun");
276 pa_memblockq_seek(s
->memblockq
, (int64_t) chunk
.length
, PA_SEEK_RELATIVE
, TRUE
);
279 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
281 pa_memblock_unref(chunk
.memblock
);
283 /* The next timestamp we expect */
284 s
->offset
= s
->rtp_context
.timestamp
+ (uint32_t) (chunk
.length
/ s
->rtp_context
.frame_size
);
286 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
288 if (s
->last_rate_update
+ RATE_UPDATE_INTERVAL
< pa_timeval_load(&now
)) {
289 pa_usec_t wi
, ri
, render_delay
, sink_delay
= 0, latency
, fix
;
290 unsigned fix_samples
;
292 pa_log_debug("Updating sample rate");
294 wi
= pa_smoother_get(s
->smoother
, pa_timeval_load(&now
));
295 ri
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
297 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi
, (unsigned long) ri
);
299 sink_delay
= pa_sink_get_latency_within_thread(s
->sink_input
->sink
);
300 render_delay
= pa_bytes_to_usec(pa_memblockq_get_length(s
->sink_input
->thread_info
.render_memblockq
), &s
->sink_input
->sink
->sample_spec
);
302 if (ri
> render_delay
+sink_delay
)
303 ri
-= render_delay
+sink_delay
;
312 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency
/PA_USEC_PER_MSEC
, (double) s
->intended_latency
/PA_USEC_PER_MSEC
);
314 /* Calculate deviation */
315 if (latency
< s
->intended_latency
)
316 fix
= s
->intended_latency
- latency
;
318 fix
= latency
- s
->intended_latency
;
320 /* How many samples is this per second? */
321 fix_samples
= (unsigned) (fix
* (pa_usec_t
) s
->sink_input
->thread_info
.sample_spec
.rate
/ (pa_usec_t
) RATE_UPDATE_INTERVAL
);
323 /* Check if deviation is in bounds */
324 if (fix_samples
> s
->sink_input
->sample_spec
.rate
*.50)
325 pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples
);
328 if (latency
< s
->intended_latency
)
329 s
->sink_input
->sample_spec
.rate
-= fix_samples
;
331 s
->sink_input
->sample_spec
.rate
+= fix_samples
;
333 if (s
->sink_input
->sample_spec
.rate
> PA_RATE_MAX
)
334 s
->sink_input
->sample_spec
.rate
= PA_RATE_MAX
;
337 pa_assert(pa_sample_spec_valid(&s
->sink_input
->sample_spec
));
339 pa_resampler_set_input_rate(s
->sink_input
->thread_info
.resampler
, s
->sink_input
->sample_spec
.rate
);
341 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s
->sink_input
->sample_spec
.rate
);
343 s
->last_rate_update
= pa_timeval_load(&now
);
346 if (pa_memblockq_is_readable(s
->memblockq
) &&
347 s
->sink_input
->thread_info
.underrun_for
> 0) {
348 pa_log_debug("Requesting rewind due to end of underrun");
349 pa_sink_input_request_rewind(s
->sink_input
, 0, FALSE
, TRUE
, FALSE
);
355 /* Called from I/O thread context */
356 static void sink_input_attach(pa_sink_input
*i
) {
360 pa_sink_input_assert_ref(i
);
361 pa_assert_se(s
= i
->userdata
);
363 pa_assert(!s
->rtpoll_item
);
364 s
->rtpoll_item
= pa_rtpoll_item_new(i
->sink
->thread_info
.rtpoll
, PA_RTPOLL_LATE
, 1);
366 p
= pa_rtpoll_item_get_pollfd(s
->rtpoll_item
, NULL
);
367 p
->fd
= s
->rtp_context
.fd
;
371 pa_rtpoll_item_set_work_callback(s
->rtpoll_item
, rtpoll_work_cb
);
372 pa_rtpoll_item_set_userdata(s
->rtpoll_item
, s
);
375 /* Called from I/O thread context */
376 static void sink_input_detach(pa_sink_input
*i
) {
378 pa_sink_input_assert_ref(i
);
379 pa_assert_se(s
= i
->userdata
);
381 pa_assert(s
->rtpoll_item
);
382 pa_rtpoll_item_free(s
->rtpoll_item
);
383 s
->rtpoll_item
= NULL
;
386 static int mcast_socket(const struct sockaddr
* sa
, socklen_t salen
) {
387 int af
, fd
= -1, r
, one
;
390 pa_assert(salen
> 0);
393 if ((fd
= socket(af
, SOCK_DGRAM
, 0)) < 0) {
394 pa_log("Failed to create socket: %s", pa_cstrerror(errno
));
398 pa_make_udp_socket_low_delay(fd
);
401 if (setsockopt(fd
, SOL_SOCKET
, SO_TIMESTAMP
, &one
, sizeof(one
)) < 0) {
402 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno
));
407 if (setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, &one
, sizeof(one
)) < 0) {
408 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno
));
414 memset(&mr4
, 0, sizeof(mr4
));
415 mr4
.imr_multiaddr
= ((const struct sockaddr_in
*) sa
)->sin_addr
;
416 r
= setsockopt(fd
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, &mr4
, sizeof(mr4
));
419 struct ipv6_mreq mr6
;
420 memset(&mr6
, 0, sizeof(mr6
));
421 mr6
.ipv6mr_multiaddr
= ((const struct sockaddr_in6
*) sa
)->sin6_addr
;
422 r
= setsockopt(fd
, IPPROTO_IPV6
, IPV6_JOIN_GROUP
, &mr6
, sizeof(mr6
));
427 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno
));
431 if (bind(fd
, sa
, salen
) < 0) {
432 pa_log("bind() failed: %s", pa_cstrerror(errno
));
445 static struct session
*session_new(struct userdata
*u
, const pa_sdp_info
*sdp_info
) {
446 struct session
*s
= NULL
;
450 pa_sink_input_new_data data
;
456 if (u
->n_sessions
>= MAX_SESSIONS
) {
457 pa_log("Session limit reached.");
461 if (!(sink
= pa_namereg_get(u
->module
->core
, u
->sink_name
, PA_NAMEREG_SINK
))) {
462 pa_log("Sink does not exist.");
466 pa_rtclock_get(&now
);
468 s
= pa_xnew0(struct session
, 1);
470 s
->first_packet
= FALSE
;
471 s
->sdp_info
= *sdp_info
;
472 s
->rtpoll_item
= NULL
;
473 s
->intended_latency
= LATENCY_USEC
;
474 s
->smoother
= pa_smoother_new(
480 pa_timeval_load(&now
),
482 s
->last_rate_update
= pa_timeval_load(&now
);
483 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
485 if ((fd
= mcast_socket((const struct sockaddr
*) &sdp_info
->sa
, sdp_info
->salen
)) < 0)
488 pa_sink_input_new_data_init(&data
);
490 data
.driver
= __FILE__
;
491 pa_proplist_sets(data
.proplist
, PA_PROP_MEDIA_ROLE
, "stream");
492 pa_proplist_setf(data
.proplist
, PA_PROP_MEDIA_NAME
,
494 sdp_info
->session_name
? " (" : "",
495 sdp_info
->session_name
? sdp_info
->session_name
: "",
496 sdp_info
->session_name
? ")" : "");
498 if (sdp_info
->session_name
)
499 pa_proplist_sets(data
.proplist
, "rtp.session", sdp_info
->session_name
);
500 pa_proplist_sets(data
.proplist
, "rtp.origin", sdp_info
->origin
);
501 pa_proplist_setf(data
.proplist
, "rtp.payload", "%u", (unsigned) sdp_info
->payload
);
502 data
.module
= u
->module
;
503 pa_sink_input_new_data_set_sample_spec(&data
, &sdp_info
->sample_spec
);
504 data
.flags
= PA_SINK_INPUT_VARIABLE_RATE
;
506 pa_sink_input_new(&s
->sink_input
, u
->module
->core
, &data
);
507 pa_sink_input_new_data_done(&data
);
509 if (!s
->sink_input
) {
510 pa_log("Failed to create sink input.");
514 s
->sink_input
->userdata
= s
;
516 s
->sink_input
->parent
.process_msg
= sink_input_process_msg
;
517 s
->sink_input
->pop
= sink_input_pop_cb
;
518 s
->sink_input
->process_rewind
= sink_input_process_rewind_cb
;
519 s
->sink_input
->update_max_rewind
= sink_input_update_max_rewind_cb
;
520 s
->sink_input
->kill
= sink_input_kill
;
521 s
->sink_input
->attach
= sink_input_attach
;
522 s
->sink_input
->detach
= sink_input_detach
;
523 s
->sink_input
->suspend_within_thread
= sink_input_suspend_within_thread
;
525 pa_sink_input_get_silence(s
->sink_input
, &silence
);
527 s
->sink_latency
= pa_sink_input_set_requested_latency(s
->sink_input
, s
->intended_latency
/2);
529 if (s
->intended_latency
< s
->sink_latency
*2)
530 s
->intended_latency
= s
->sink_latency
*2;
532 s
->memblockq
= pa_memblockq_new(
536 pa_frame_size(&s
->sink_input
->sample_spec
),
537 pa_usec_to_bytes(s
->intended_latency
- s
->sink_latency
, &s
->sink_input
->sample_spec
),
542 pa_memblock_unref(silence
.memblock
);
544 pa_rtp_context_init_recv(&s
->rtp_context
, fd
, pa_frame_size(&s
->sdp_info
.sample_spec
));
546 pa_hashmap_put(s
->userdata
->by_origin
, s
->sdp_info
.origin
, s
);
548 PA_LLIST_PREPEND(struct session
, s
->userdata
->sessions
, s
);
550 pa_sink_input_put(s
->sink_input
);
552 pa_log_info("New session '%s'", s
->sdp_info
.session_name
);
565 static void session_free(struct session
*s
) {
568 pa_log_info("Freeing session '%s'", s
->sdp_info
.session_name
);
570 pa_sink_input_unlink(s
->sink_input
);
571 pa_sink_input_unref(s
->sink_input
);
573 PA_LLIST_REMOVE(struct session
, s
->userdata
->sessions
, s
);
574 pa_assert(s
->userdata
->n_sessions
>= 1);
575 s
->userdata
->n_sessions
--;
576 pa_hashmap_remove(s
->userdata
->by_origin
, s
->sdp_info
.origin
);
578 pa_memblockq_free(s
->memblockq
);
579 pa_sdp_info_destroy(&s
->sdp_info
);
580 pa_rtp_context_destroy(&s
->rtp_context
);
582 pa_smoother_free(s
->smoother
);
587 static void sap_event_cb(pa_mainloop_api
*m
, pa_io_event
*e
, int fd
, pa_io_event_flags_t flags
, void *userdata
) {
588 struct userdata
*u
= userdata
;
589 pa_bool_t goodbye
= FALSE
;
596 pa_assert(fd
== u
->sap_context
.fd
);
597 pa_assert(flags
== PA_IO_EVENT_INPUT
);
599 if (pa_sap_recv(&u
->sap_context
, &goodbye
) < 0)
602 if (!pa_sdp_parse(u
->sap_context
.sdp_data
, &info
, goodbye
))
607 if ((s
= pa_hashmap_get(u
->by_origin
, info
.origin
)))
610 pa_sdp_info_destroy(&info
);
613 if (!(s
= pa_hashmap_get(u
->by_origin
, info
.origin
))) {
614 if (!session_new(u
, &info
))
615 pa_sdp_info_destroy(&info
);
619 pa_rtclock_get(&now
);
620 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
622 pa_sdp_info_destroy(&info
);
627 static void check_death_event_cb(pa_mainloop_api
*m
, pa_time_event
*t
, const struct timeval
*tv
, void *userdata
) {
628 struct session
*s
, *n
;
629 struct userdata
*u
= userdata
;
636 pa_rtclock_get(&now
);
638 pa_log_debug("Checking for dead streams ...");
640 for (s
= u
->sessions
; s
; s
= n
) {
644 k
= pa_atomic_load(&s
->timestamp
);
646 if (k
+ DEATH_TIMEOUT
< now
.tv_sec
)
651 pa_core_rttime_restart(u
->module
->core
, t
, pa_rtclock_now() + DEATH_TIMEOUT
* PA_USEC_PER_SEC
);
654 int pa__init(pa_module
*m
) {
656 pa_modargs
*ma
= NULL
;
657 struct sockaddr_in sa4
;
659 struct sockaddr_in6 sa6
;
663 const char *sap_address
;
668 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
669 pa_log("failed to parse module arguments");
673 sap_address
= pa_modargs_get_value(ma
, "sap_address", DEFAULT_SAP_ADDRESS
);
675 if (inet_pton(AF_INET
, sap_address
, &sa4
.sin_addr
) > 0) {
676 sa4
.sin_family
= AF_INET
;
677 sa4
.sin_port
= htons(SAP_PORT
);
678 sa
= (struct sockaddr
*) &sa4
;
681 } else if (inet_pton(AF_INET6
, sap_address
, &sa6
.sin6_addr
) > 0) {
682 sa6
.sin6_family
= AF_INET6
;
683 sa6
.sin6_port
= htons(SAP_PORT
);
684 sa
= (struct sockaddr
*) &sa6
;
688 pa_log("Invalid SAP address '%s'", sap_address
);
692 if ((fd
= mcast_socket(sa
, salen
)) < 0)
695 m
->userdata
= u
= pa_xnew(struct userdata
, 1);
698 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
700 u
->sap_event
= m
->core
->mainloop
->io_new(m
->core
->mainloop
, fd
, PA_IO_EVENT_INPUT
, sap_event_cb
, u
);
701 pa_sap_context_init_recv(&u
->sap_context
, fd
);
703 PA_LLIST_HEAD_INIT(struct session
, u
->sessions
);
705 u
->by_origin
= pa_hashmap_new(pa_idxset_string_hash_func
, pa_idxset_string_compare_func
);
707 u
->check_death_event
= pa_core_rttime_new(m
->core
, pa_rtclock_now() + DEATH_TIMEOUT
* PA_USEC_PER_SEC
, check_death_event_cb
, u
);
723 void pa__done(pa_module
*m
) {
729 if (!(u
= m
->userdata
))
733 m
->core
->mainloop
->io_free(u
->sap_event
);
735 if (u
->check_death_event
)
736 m
->core
->mainloop
->time_free(u
->check_death_event
);
738 pa_sap_context_destroy(&u
->sap_context
);
741 while ((s
= pa_hashmap_first(u
->by_origin
)))
744 pa_hashmap_free(u
->by_origin
, NULL
, NULL
);
747 pa_xfree(u
->sink_name
);