2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2008 Colin Guthrie
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
32 #include <sys/socket.h>
33 #include <netinet/in.h>
34 #include <netinet/tcp.h>
35 #include <sys/ioctl.h>
37 #ifdef HAVE_LINUX_SOCKIOS_H
38 #include <linux/sockios.h>
41 #include <pulse/rtclock.h>
42 #include <pulse/timeval.h>
43 #include <pulse/xmalloc.h>
45 #include <pulsecore/core-error.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/log.h>
51 #include <pulsecore/socket-client.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/thread.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/poll.h>
57 #include "module-raop-sink-symdef.h"
61 #include "raop_client.h"
63 PA_MODULE_AUTHOR("Colin Guthrie");
64 PA_MODULE_DESCRIPTION("RAOP Sink");
65 PA_MODULE_VERSION(PACKAGE_VERSION
);
66 PA_MODULE_LOAD_ONCE(false);
68 "sink_name=<name for the sink> "
69 "sink_properties=<properties for the sink> "
71 "format=<sample format> "
73 "channels=<number of channels>");
75 #define DEFAULT_SINK_NAME "raop"
82 pa_thread_mq thread_mq
;
84 pa_rtpoll_item
*rtpoll_item
;
87 pa_memchunk raw_memchunk
;
88 pa_memchunk encoded_memchunk
;
91 size_t write_length
, write_index
;
94 size_t read_length
, read_index
;
98 /*esd_format_t format;*/
101 pa_smoother
*smoother
;
105 int64_t encoding_overhead
;
106 int32_t next_encoding_overhead
;
107 double encoding_ratio
;
109 pa_raop_client
*raop
;
114 static const char* const valid_modargs
[] = {
125 SINK_MESSAGE_PASS_SOCKET
= PA_SINK_MESSAGE_MAX
,
126 SINK_MESSAGE_RIP_SOCKET
129 /* Forward declaration */
130 static void sink_set_volume_cb(pa_sink
*);
132 static void on_connection(int fd
, void*userdata
) {
134 socklen_t sl
= sizeof(int);
135 struct userdata
*u
= userdata
;
138 pa_assert(u
->fd
< 0);
141 if (getsockopt(u
->fd
, SOL_SOCKET
, SO_SNDBUF
, &so_sndbuf
, &sl
) < 0)
142 pa_log_warn("getsockopt(SO_SNDBUF) failed: %s", pa_cstrerror(errno
));
144 pa_log_debug("SO_SNDBUF is %zu.", (size_t) so_sndbuf
);
145 pa_sink_set_max_request(u
->sink
, PA_MAX((size_t) so_sndbuf
, u
->block_size
));
148 /* Set the initial volume */
149 sink_set_volume_cb(u
->sink
);
151 pa_log_debug("Connection authenticated, handing fd to IO thread...");
153 pa_asyncmsgq_post(u
->thread_mq
.inq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_PASS_SOCKET
, NULL
, 0, NULL
, NULL
);
156 static void on_close(void*userdata
) {
157 struct userdata
*u
= userdata
;
160 pa_log_debug("Connection closed, informing IO thread...");
162 pa_asyncmsgq_post(u
->thread_mq
.inq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_RIP_SOCKET
, NULL
, 0, NULL
, NULL
);
165 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
166 struct userdata
*u
= PA_SINK(o
)->userdata
;
170 case PA_SINK_MESSAGE_SET_STATE
:
172 switch ((pa_sink_state_t
) PA_PTR_TO_UINT(data
)) {
174 case PA_SINK_SUSPENDED
:
175 pa_assert(PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
));
177 pa_smoother_pause(u
->smoother
, pa_rtclock_now());
179 /* Issue a FLUSH if we are connected */
181 pa_raop_flush(u
->raop
);
186 case PA_SINK_RUNNING
:
188 if (u
->sink
->thread_info
.state
== PA_SINK_SUSPENDED
) {
189 pa_smoother_resume(u
->smoother
, pa_rtclock_now(), true);
191 /* The connection can be closed when idle, so check to
192 see if we need to reestablish it */
194 pa_raop_connect(u
->raop
);
196 pa_raop_flush(u
->raop
);
201 case PA_SINK_UNLINKED
:
203 case PA_SINK_INVALID_STATE
:
209 case PA_SINK_MESSAGE_GET_LATENCY
: {
212 r
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
213 w
= pa_bytes_to_usec((u
->offset
- u
->encoding_overhead
+ (u
->encoded_memchunk
.length
/ u
->encoding_ratio
)), &u
->sink
->sample_spec
);
215 *((pa_usec_t
*) data
) = w
> r
? w
- r
: 0;
219 case SINK_MESSAGE_PASS_SOCKET
: {
220 struct pollfd
*pollfd
;
222 pa_assert(!u
->rtpoll_item
);
224 u
->rtpoll_item
= pa_rtpoll_item_new(u
->rtpoll
, PA_RTPOLL_NEVER
, 1);
225 pollfd
= pa_rtpoll_item_get_pollfd(u
->rtpoll_item
, NULL
);
227 pollfd
->events
= POLLOUT
;
228 /*pollfd->events = */pollfd
->revents
= 0;
230 if (u
->sink
->thread_info
.state
== PA_SINK_SUSPENDED
) {
231 /* Our stream has been suspended so we just flush it.... */
232 pa_raop_flush(u
->raop
);
237 case SINK_MESSAGE_RIP_SOCKET
: {
243 pa_log("We should not get to this state. Cannot rip socket if not connected.");
245 if (u
->sink
->thread_info
.state
== PA_SINK_SUSPENDED
) {
247 pa_log_debug("RTSP control connection closed, but we're suspended so let's not worry about it... we'll open it again later");
250 pa_rtpoll_item_free(u
->rtpoll_item
);
251 u
->rtpoll_item
= NULL
;
253 /* Question: is this valid here: or should we do some sort of:
254 return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL);
256 pa_module_unload_request(u
->module
, true);
262 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
265 static void sink_set_volume_cb(pa_sink
*s
) {
266 struct userdata
*u
= s
->userdata
;
269 char t
[PA_CVOLUME_SNPRINT_VERBOSE_MAX
];
273 /* If we're muted we don't need to do anything */
277 /* Calculate the max volume of all channels.
278 We'll use this as our (single) volume on the APEX device and emulate
279 any variation in channel volumes in software */
280 v
= pa_cvolume_max(&s
->real_volume
);
282 /* Create a pa_cvolume version of our single value */
283 pa_cvolume_set(&hw
, s
->sample_spec
.channels
, v
);
285 /* Perform any software manipulation of the volume needed */
286 pa_sw_cvolume_divide(&s
->soft_volume
, &s
->real_volume
, &hw
);
288 pa_log_debug("Requested volume: %s", pa_cvolume_snprint_verbose(t
, sizeof(t
), &s
->real_volume
, &s
->channel_map
, false));
289 pa_log_debug("Got hardware volume: %s", pa_cvolume_snprint_verbose(t
, sizeof(t
), &hw
, &s
->channel_map
, false));
290 pa_log_debug("Calculated software volume: %s",
291 pa_cvolume_snprint_verbose(t
, sizeof(t
), &s
->soft_volume
, &s
->channel_map
, true));
293 /* Any necessary software volume manipulation is done so set
294 our hw volume (or v as a single value) on the device */
295 pa_raop_client_set_volume(u
->raop
, v
);
298 static void sink_set_mute_cb(pa_sink
*s
) {
299 struct userdata
*u
= s
->userdata
;
304 pa_raop_client_set_volume(u
->raop
, PA_VOLUME_MUTED
);
306 sink_set_volume_cb(s
);
310 static void thread_func(void *userdata
) {
311 struct userdata
*u
= userdata
;
314 uint32_t silence_overhead
= 0;
315 double silence_ratio
= 0;
319 pa_log_debug("Thread starting up");
321 pa_thread_mq_install(&u
->thread_mq
);
323 pa_smoother_set_time_offset(u
->smoother
, pa_rtclock_now());
325 /* Create a chunk of memory that is our encoded silence sample. */
326 pa_memchunk_reset(&silence
);
331 if (PA_UNLIKELY(u
->sink
->thread_info
.rewind_requested
))
332 pa_sink_process_rewind(u
->sink
, 0);
334 if (u
->rtpoll_item
) {
335 struct pollfd
*pollfd
;
336 pollfd
= pa_rtpoll_item_get_pollfd(u
->rtpoll_item
, NULL
);
338 /* Render some data and write it to the fifo */
339 if (/*PA_SINK_IS_OPENED(u->sink->thread_info.state) && */pollfd
->revents
) {
344 if (!silence
.memblock
) {
345 pa_memchunk silence_tmp
;
347 pa_memchunk_reset(&silence_tmp
);
348 silence_tmp
.memblock
= pa_memblock_new(u
->core
->mempool
, 4096);
349 silence_tmp
.length
= 4096;
350 p
= pa_memblock_acquire(silence_tmp
.memblock
);
352 pa_memblock_release(silence_tmp
.memblock
);
353 pa_raop_client_encode_sample(u
->raop
, &silence_tmp
, &silence
);
354 pa_assert(0 == silence_tmp
.length
);
355 silence_overhead
= silence_tmp
.length
- 4096;
356 silence_ratio
= silence_tmp
.length
/ 4096;
357 pa_memblock_unref(silence_tmp
.memblock
);
363 if (u
->encoded_memchunk
.length
<= 0) {
364 if (u
->encoded_memchunk
.memblock
)
365 pa_memblock_unref(u
->encoded_memchunk
.memblock
);
366 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
)) {
369 /* We render real data */
370 if (u
->raw_memchunk
.length
<= 0) {
371 if (u
->raw_memchunk
.memblock
)
372 pa_memblock_unref(u
->raw_memchunk
.memblock
);
373 pa_memchunk_reset(&u
->raw_memchunk
);
375 /* Grab unencoded data */
376 pa_sink_render(u
->sink
, u
->block_size
, &u
->raw_memchunk
);
378 pa_assert(u
->raw_memchunk
.length
> 0);
381 rl
= u
->raw_memchunk
.length
;
382 u
->encoding_overhead
+= u
->next_encoding_overhead
;
383 pa_raop_client_encode_sample(u
->raop
, &u
->raw_memchunk
, &u
->encoded_memchunk
);
384 u
->next_encoding_overhead
= (u
->encoded_memchunk
.length
- (rl
- u
->raw_memchunk
.length
));
385 u
->encoding_ratio
= u
->encoded_memchunk
.length
/ (rl
- u
->raw_memchunk
.length
);
387 /* We render some silence into our memchunk */
388 memcpy(&u
->encoded_memchunk
, &silence
, sizeof(pa_memchunk
));
389 pa_memblock_ref(silence
.memblock
);
391 /* Calculate/store some values to be used with the smoother */
392 u
->next_encoding_overhead
= silence_overhead
;
393 u
->encoding_ratio
= silence_ratio
;
396 pa_assert(u
->encoded_memchunk
.length
> 0);
398 p
= pa_memblock_acquire(u
->encoded_memchunk
.memblock
);
399 l
= pa_write(u
->fd
, (uint8_t*) p
+ u
->encoded_memchunk
.index
, u
->encoded_memchunk
.length
, &write_type
);
400 pa_memblock_release(u
->encoded_memchunk
.memblock
);
408 else if (errno
== EAGAIN
) {
410 /* OK, we filled all socket buffers up
415 pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno
));
422 u
->encoded_memchunk
.index
+= l
;
423 u
->encoded_memchunk
.length
-= l
;
427 if (u
->encoded_memchunk
.length
> 0) {
428 /* we've completely written the encoded data, so update our overhead */
429 u
->encoding_overhead
+= u
->next_encoding_overhead
;
431 /* OK, we wrote less that we asked for,
432 * hence we can assume that the socket
433 * buffers are full now */
441 /* At this spot we know that the socket buffers are
442 * fully filled up. This is the best time to estimate
443 * the playback position of the server */
445 n
= u
->offset
- u
->encoding_overhead
;
450 if (ioctl(u
->fd
, SIOCOUTQ
, &l
) >= 0 && l
> 0)
451 n
-= (l
/ u
->encoding_ratio
);
455 usec
= pa_bytes_to_usec(n
, &u
->sink
->sample_spec
);
457 if (usec
> u
->latency
)
462 pa_smoother_put(u
->smoother
, pa_rtclock_now(), usec
);
465 /* Hmm, nothing to do. Let's sleep */
466 pollfd
->events
= POLLOUT
; /*PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0;*/
469 if ((ret
= pa_rtpoll_run(u
->rtpoll
, true)) < 0)
475 if (u
->rtpoll_item
) {
476 struct pollfd
* pollfd
;
478 pollfd
= pa_rtpoll_item_get_pollfd(u
->rtpoll_item
, NULL
);
480 if (pollfd
->revents
& ~POLLOUT
) {
481 if (u
->sink
->thread_info
.state
!= PA_SINK_SUSPENDED
) {
482 pa_log("FIFO shutdown.");
486 /* We expect this to happen on occasion if we are not sending data.
487 It's perfectly natural and normal and natural */
489 pa_rtpoll_item_free(u
->rtpoll_item
);
490 u
->rtpoll_item
= NULL
;
496 /* If this was no regular exit from the loop we have to continue
497 * processing messages until we received PA_MESSAGE_SHUTDOWN */
498 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
499 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
502 if (silence
.memblock
)
503 pa_memblock_unref(silence
.memblock
);
504 pa_log_debug("Thread shutting down");
507 int pa__init(pa_module
*m
) {
508 struct userdata
*u
= NULL
;
510 pa_modargs
*ma
= NULL
;
512 pa_sink_new_data data
;
516 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
517 pa_log("failed to parse module arguments");
521 ss
= m
->core
->default_sample_spec
;
522 if (pa_modargs_get_sample_spec(ma
, &ss
) < 0) {
523 pa_log("invalid sample format specification");
527 if ((/*ss.format != PA_SAMPLE_U8 &&*/ ss
.format
!= PA_SAMPLE_S16NE
) ||
529 pa_log("sample type support is limited to mono/stereo and U8 or S16NE sample data");
533 u
= pa_xnew0(struct userdata
, 1);
538 u
->smoother
= pa_smoother_new(
546 pa_memchunk_reset(&u
->raw_memchunk
);
547 pa_memchunk_reset(&u
->encoded_memchunk
);
549 u
->encoding_overhead
= 0;
550 u
->next_encoding_overhead
= 0;
551 u
->encoding_ratio
= 1.0;
553 u
->rtpoll
= pa_rtpoll_new();
554 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
555 u
->rtpoll_item
= NULL
;
558 (ss.format == PA_SAMPLE_U8 ? ESD_BITS8 : ESD_BITS16) |
559 (ss.channels == 2 ? ESD_STEREO : ESD_MONO);*/
561 u
->block_size
= pa_usec_to_bytes(PA_USEC_PER_SEC
/20, &ss
);
563 u
->read_data
= u
->write_data
= NULL
;
564 u
->read_index
= u
->write_index
= u
->read_length
= u
->write_length
= 0;
566 /*u->state = STATE_AUTH;*/
569 if (!(server
= pa_modargs_get_value(ma
, "server", NULL
))) {
570 pa_log("No server argument given.");
574 pa_sink_new_data_init(&data
);
575 data
.driver
= __FILE__
;
577 pa_sink_new_data_set_name(&data
, pa_modargs_get_value(ma
, "sink_name", DEFAULT_SINK_NAME
));
578 pa_sink_new_data_set_sample_spec(&data
, &ss
);
579 pa_proplist_sets(data
.proplist
, PA_PROP_DEVICE_STRING
, server
);
580 pa_proplist_sets(data
.proplist
, PA_PROP_DEVICE_INTENDED_ROLES
, "music");
581 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "RAOP sink '%s'", server
);
583 if (pa_modargs_get_proplist(ma
, "sink_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
584 pa_log("Invalid properties");
585 pa_sink_new_data_done(&data
);
589 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_LATENCY
|PA_SINK_NETWORK
);
590 pa_sink_new_data_done(&data
);
593 pa_log("Failed to create sink.");
597 u
->sink
->parent
.process_msg
= sink_process_msg
;
598 u
->sink
->userdata
= u
;
599 pa_sink_set_set_volume_callback(u
->sink
, sink_set_volume_cb
);
600 pa_sink_set_set_mute_callback(u
->sink
, sink_set_mute_cb
);
601 u
->sink
->flags
= PA_SINK_LATENCY
|PA_SINK_NETWORK
;
603 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
604 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
606 if (!(u
->raop
= pa_raop_client_new(u
->core
, server
))) {
607 pa_log("Failed to connect to server.");
611 pa_raop_client_set_callback(u
->raop
, on_connection
, u
);
612 pa_raop_client_set_closed_callback(u
->raop
, on_close
, u
);
614 if (!(u
->thread
= pa_thread_new("raop-sink", thread_func
, u
))) {
615 pa_log("Failed to create thread.");
619 pa_sink_put(u
->sink
);
634 int pa__get_n_used(pa_module
*m
) {
638 pa_assert_se(u
= m
->userdata
);
640 return pa_sink_linked_by(u
->sink
);
643 void pa__done(pa_module
*m
) {
647 if (!(u
= m
->userdata
))
651 pa_sink_unlink(u
->sink
);
654 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
655 pa_thread_free(u
->thread
);
658 pa_thread_mq_done(&u
->thread_mq
);
661 pa_sink_unref(u
->sink
);
664 pa_rtpoll_item_free(u
->rtpoll_item
);
667 pa_rtpoll_free(u
->rtpoll
);
669 if (u
->raw_memchunk
.memblock
)
670 pa_memblock_unref(u
->raw_memchunk
.memblock
);
672 if (u
->encoded_memchunk
.memblock
)
673 pa_memblock_unref(u
->encoded_memchunk
.memblock
);
676 pa_raop_client_free(u
->raop
);
678 pa_xfree(u
->read_data
);
679 pa_xfree(u
->write_data
);
682 pa_smoother_free(u
->smoother
);