3 This file is part of PulseAudio.
5 PulseAudio is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published
7 by the Free Software Foundation; either version 2 of the License,
8 or (at your option) any later version.
10 PulseAudio is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License
16 along with PulseAudio; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29 #include <arpa/inet.h>
34 #include <pulse/timeval.h>
35 #include <pulse/xmalloc.h>
37 #include <pulsecore/core-error.h>
38 #include <pulsecore/module.h>
39 #include <pulsecore/llist.h>
40 #include <pulsecore/sink.h>
41 #include <pulsecore/sink-input.h>
42 #include <pulsecore/memblockq.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-util.h>
45 #include <pulsecore/modargs.h>
46 #include <pulsecore/namereg.h>
47 #include <pulsecore/sample-util.h>
49 #include "module-rtp-recv-symdef.h"
55 PA_MODULE_AUTHOR("Lennart Poettering")
56 PA_MODULE_DESCRIPTION("Recieve data from a network via RTP/SAP/SDP")
57 PA_MODULE_VERSION(PACKAGE_VERSION
)
59 "sink=<name of the sink> "
60 "sap_address=<multicast address to listen on> "
64 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
65 #define MEMBLOCKQ_MAXLENGTH (1024*170)
66 #define MAX_SESSIONS 16
67 #define DEATH_TIMEOUT 20000000
69 static const char* const valid_modargs
[] = {
76 struct userdata
*userdata
;
78 pa_sink_input
*sink_input
;
79 pa_memblockq
*memblockq
;
81 pa_time_event
*death_event
;
87 struct pa_sdp_info sdp_info
;
89 pa_rtp_context rtp_context
;
90 pa_io_event
* rtp_event
;
97 pa_sap_context sap_context
;
98 pa_io_event
* sap_event
;
100 pa_hashmap
*by_origin
;
107 static void session_free(struct session
*s
, int from_hash
);
109 static int sink_input_peek(pa_sink_input
*i
, pa_memchunk
*chunk
) {
114 return pa_memblockq_peek(s
->memblockq
, chunk
);
117 static void sink_input_drop(pa_sink_input
*i
, const pa_memchunk
*chunk
, size_t length
) {
122 pa_memblockq_drop(s
->memblockq
, chunk
, length
);
125 static void sink_input_kill(pa_sink_input
* i
) {
133 static pa_usec_t
sink_input_get_latency(pa_sink_input
*i
) {
138 return pa_bytes_to_usec(pa_memblockq_get_length(s
->memblockq
), &i
->sample_spec
);
141 static void rtp_event_cb(pa_mainloop_api
*m
, pa_io_event
*e
, int fd
, pa_io_event_flags_t flags
, void *userdata
) {
142 struct session
*s
= userdata
;
150 assert(fd
== s
->rtp_context
.fd
);
151 assert(flags
== PA_IO_EVENT_INPUT
);
153 if (pa_rtp_recv(&s
->rtp_context
, &chunk
, s
->userdata
->core
->memblock_stat
) < 0)
156 if (s
->sdp_info
.payload
!= s
->rtp_context
.payload
) {
157 pa_memblock_unref(chunk
.memblock
);
161 if (!s
->first_packet
) {
164 s
->ssrc
= s
->rtp_context
.ssrc
;
165 s
->offset
= s
->rtp_context
.timestamp
;
167 if (s
->ssrc
== s
->userdata
->core
->cookie
)
168 pa_log_warn(__FILE__
": WARNING! Detected RTP packet loop!");
170 if (s
->ssrc
!= s
->rtp_context
.ssrc
) {
171 pa_memblock_unref(chunk
.memblock
);
176 /* Check wheter there was a timestamp overflow */
177 k
= (int64_t) s
->rtp_context
.timestamp
- (int64_t) s
->offset
;
178 j
= (int64_t) 0x100000000LL
- (int64_t) s
->offset
+ (int64_t) s
->rtp_context
.timestamp
;
180 if ((k
< 0 ? -k
: k
) < (j
< 0 ? -j
: j
))
185 pa_memblockq_seek(s
->memblockq
, delta
* s
->rtp_context
.frame_size
, PA_SEEK_RELATIVE
);
187 if (pa_memblockq_push(s
->memblockq
, &chunk
) < 0) {
188 /* queue overflow, let's flush it and try again */
189 pa_memblockq_flush(s
->memblockq
);
190 pa_memblockq_push(s
->memblockq
, &chunk
);
193 /* The next timestamp we expect */
194 s
->offset
= s
->rtp_context
.timestamp
+ (chunk
.length
/ s
->rtp_context
.frame_size
);
196 pa_memblock_unref(chunk
.memblock
);
198 /* Reset death timer */
199 pa_gettimeofday(&tv
);
200 pa_timeval_add(&tv
, DEATH_TIMEOUT
);
201 m
->time_restart(s
->death_event
, &tv
);
204 static void death_event_cb(pa_mainloop_api
*m
, pa_time_event
*t
, const struct timeval
*tv
, void *userdata
) {
205 struct session
*s
= userdata
;
215 static int mcast_socket(const struct sockaddr
* sa
, socklen_t salen
) {
216 int af
, fd
= -1, r
, one
;
219 if ((fd
= socket(af
, SOCK_DGRAM
, 0)) < 0) {
220 pa_log(__FILE__
": Failed to create socket: %s", pa_cstrerror(errno
));
225 if (setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, &one
, sizeof(one
)) < 0) {
226 pa_log(__FILE__
": SO_REUSEADDR failed: %s", pa_cstrerror(errno
));
232 memset(&mr4
, 0, sizeof(mr4
));
233 mr4
.imr_multiaddr
= ((const struct sockaddr_in
*) sa
)->sin_addr
;
234 r
= setsockopt(fd
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, &mr4
, sizeof(mr4
));
236 struct ipv6_mreq mr6
;
237 memset(&mr6
, 0, sizeof(mr6
));
238 mr6
.ipv6mr_multiaddr
= ((const struct sockaddr_in6
*) sa
)->sin6_addr
;
239 r
= setsockopt(fd
, IPPROTO_IPV6
, IPV6_JOIN_GROUP
, &mr6
, sizeof(mr6
));
243 pa_log_info(__FILE__
": Joining mcast group failed: %s", pa_cstrerror(errno
));
247 if (bind(fd
, sa
, salen
) < 0) {
248 pa_log(__FILE__
": bind() failed: %s", pa_cstrerror(errno
));
261 static struct session
*session_new(struct userdata
*u
, const pa_sdp_info
*sdp_info
) {
262 struct session
*s
= NULL
;
267 pa_memblock
*silence
;
268 pa_sink_input_new_data data
;
270 if (u
->n_sessions
>= MAX_SESSIONS
) {
271 pa_log(__FILE__
": session limit reached.");
275 if (!(sink
= pa_namereg_get(u
->core
, u
->sink_name
, PA_NAMEREG_SINK
, 1))) {
276 pa_log(__FILE__
": sink does not exist.");
280 s
= pa_xnew0(struct session
, 1);
283 s
->sdp_info
= *sdp_info
;
285 if ((fd
= mcast_socket((const struct sockaddr
*) &sdp_info
->sa
, sdp_info
->salen
)) < 0)
288 c
= pa_sprintf_malloc("RTP Stream%s%s%s",
289 sdp_info
->session_name
? " (" : "",
290 sdp_info
->session_name
? sdp_info
->session_name
: "",
291 sdp_info
->session_name
? ")" : "");
293 pa_sink_input_new_data_init(&data
);
295 data
.driver
= __FILE__
;
297 data
.module
= u
->module
;
298 pa_sink_input_new_data_set_sample_spec(&data
, &sdp_info
->sample_spec
);
300 s
->sink_input
= pa_sink_input_new(u
->core
, &data
, 0);
303 if (!s
->sink_input
) {
304 pa_log(__FILE__
": failed to create sink input.");
308 s
->sink_input
->userdata
= s
;
310 s
->sink_input
->peek
= sink_input_peek
;
311 s
->sink_input
->drop
= sink_input_drop
;
312 s
->sink_input
->kill
= sink_input_kill
;
313 s
->sink_input
->get_latency
= sink_input_get_latency
;
315 silence
= pa_silence_memblock_new(&s
->sink_input
->sample_spec
,
316 (pa_bytes_per_second(&s
->sink_input
->sample_spec
)/128/pa_frame_size(&s
->sink_input
->sample_spec
))*
317 pa_frame_size(&s
->sink_input
->sample_spec
),
318 s
->userdata
->core
->memblock_stat
);
320 s
->memblockq
= pa_memblockq_new(
324 pa_frame_size(&s
->sink_input
->sample_spec
),
325 pa_bytes_per_second(&s
->sink_input
->sample_spec
)/10+1,
328 u
->core
->memblock_stat
);
330 pa_memblock_unref(silence
);
332 s
->rtp_event
= u
->core
->mainloop
->io_new(u
->core
->mainloop
, fd
, PA_IO_EVENT_INPUT
, rtp_event_cb
, s
);
334 pa_gettimeofday(&tv
);
335 pa_timeval_add(&tv
, DEATH_TIMEOUT
);
336 s
->death_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &tv
, death_event_cb
, s
);
338 pa_hashmap_put(s
->userdata
->by_origin
, s
->sdp_info
.origin
, s
);
340 pa_rtp_context_init_recv(&s
->rtp_context
, fd
, pa_frame_size(&s
->sdp_info
.sample_spec
));
342 pa_log_info(__FILE__
": Found new session '%s'", s
->sdp_info
.session_name
);
359 static void session_free(struct session
*s
, int from_hash
) {
362 pa_log_info(__FILE__
": Freeing session '%s'", s
->sdp_info
.session_name
);
364 s
->userdata
->core
->mainloop
->time_free(s
->death_event
);
365 s
->userdata
->core
->mainloop
->io_free(s
->rtp_event
);
368 pa_hashmap_remove(s
->userdata
->by_origin
, s
->sdp_info
.origin
);
370 pa_sink_input_disconnect(s
->sink_input
);
371 pa_sink_input_unref(s
->sink_input
);
373 pa_memblockq_free(s
->memblockq
);
374 pa_sdp_info_destroy(&s
->sdp_info
);
375 pa_rtp_context_destroy(&s
->rtp_context
);
377 assert(s
->userdata
->n_sessions
>= 1);
378 s
->userdata
->n_sessions
--;
383 static void sap_event_cb(pa_mainloop_api
*m
, pa_io_event
*e
, int fd
, pa_io_event_flags_t flags
, void *userdata
) {
384 struct userdata
*u
= userdata
;
392 assert(fd
== u
->sap_context
.fd
);
393 assert(flags
== PA_IO_EVENT_INPUT
);
395 if (pa_sap_recv(&u
->sap_context
, &goodbye
) < 0)
398 if (!pa_sdp_parse(u
->sap_context
.sdp_data
, &info
, goodbye
))
403 if ((s
= pa_hashmap_get(u
->by_origin
, info
.origin
)))
406 pa_sdp_info_destroy(&info
);
409 if (!(s
= pa_hashmap_get(u
->by_origin
, info
.origin
))) {
410 if (!(s
= session_new(u
, &info
)))
411 pa_sdp_info_destroy(&info
);
416 pa_gettimeofday(&tv
);
417 pa_timeval_add(&tv
, DEATH_TIMEOUT
);
418 m
->time_restart(s
->death_event
, &tv
);
420 pa_sdp_info_destroy(&info
);
425 int pa__init(pa_core
*c
, pa_module
*m
) {
427 pa_modargs
*ma
= NULL
;
428 struct sockaddr_in sa4
;
429 struct sockaddr_in6 sa6
;
432 const char *sap_address
;
438 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
439 pa_log(__FILE__
": failed to parse module arguments");
443 sap_address
= pa_modargs_get_value(ma
, "sap_address", DEFAULT_SAP_ADDRESS
);
445 if (inet_pton(AF_INET6
, sap_address
, &sa6
.sin6_addr
) > 0) {
446 sa6
.sin6_family
= AF_INET6
;
447 sa6
.sin6_port
= htons(SAP_PORT
);
448 sa
= (struct sockaddr
*) &sa6
;
450 } else if (inet_pton(AF_INET
, sap_address
, &sa4
.sin_addr
) > 0) {
451 sa4
.sin_family
= AF_INET
;
452 sa4
.sin_port
= htons(SAP_PORT
);
453 sa
= (struct sockaddr
*) &sa4
;
456 pa_log(__FILE__
": invalid SAP address '%s'", sap_address
);
460 if ((fd
= mcast_socket(sa
, salen
)) < 0)
463 u
= pa_xnew(struct userdata
, 1);
467 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
470 u
->sap_event
= c
->mainloop
->io_new(c
->mainloop
, fd
, PA_IO_EVENT_INPUT
, sap_event_cb
, u
);
472 u
->by_origin
= pa_hashmap_new(pa_idxset_string_hash_func
, pa_idxset_string_compare_func
);
474 pa_sap_context_init_recv(&u
->sap_context
, fd
);
490 static void free_func(void *p
, PA_GCC_UNUSED
void *userdata
) {
494 void pa__done(pa_core
*c
, pa_module
*m
) {
499 if (!(u
= m
->userdata
))
502 c
->mainloop
->io_free(u
->sap_event
);
503 pa_sap_context_destroy(&u
->sap_context
);
505 pa_hashmap_free(u
->by_origin
, free_func
, NULL
);
507 pa_xfree(u
->sink_name
);