]> code.delx.au - pulseaudio/blob - src/modules/raop/module-raop-sink.c
alsa-mixer: Add surround 2.1 profile
[pulseaudio] / src / modules / raop / module-raop-sink.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2008 Colin Guthrie
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <errno.h>
30 #include <string.h>
31 #include <unistd.h>
32 #include <sys/socket.h>
33 #include <netinet/in.h>
34 #include <netinet/tcp.h>
35 #include <sys/ioctl.h>
36
37 #ifdef HAVE_LINUX_SOCKIOS_H
38 #include <linux/sockios.h>
39 #endif
40
41 #include <pulse/rtclock.h>
42 #include <pulse/timeval.h>
43 #include <pulse/xmalloc.h>
44
45 #include <pulsecore/core-error.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/log.h>
51 #include <pulsecore/socket-client.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/thread.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/poll.h>
56
57 #include "module-raop-sink-symdef.h"
58 #include "rtp.h"
59 #include "sdp.h"
60 #include "sap.h"
61 #include "raop_client.h"
62
63 PA_MODULE_AUTHOR("Colin Guthrie");
64 PA_MODULE_DESCRIPTION("RAOP Sink");
65 PA_MODULE_VERSION(PACKAGE_VERSION);
66 PA_MODULE_LOAD_ONCE(false);
67 PA_MODULE_USAGE(
68 "sink_name=<name for the sink> "
69 "sink_properties=<properties for the sink> "
70 "server=<address> "
71 "format=<sample format> "
72 "rate=<sample rate> "
73 "channels=<number of channels>");
74
75 #define DEFAULT_SINK_NAME "raop"
76
77 struct userdata {
78 pa_core *core;
79 pa_module *module;
80 pa_sink *sink;
81
82 pa_thread_mq thread_mq;
83 pa_rtpoll *rtpoll;
84 pa_rtpoll_item *rtpoll_item;
85 pa_thread *thread;
86
87 pa_memchunk raw_memchunk;
88 pa_memchunk encoded_memchunk;
89
90 void *write_data;
91 size_t write_length, write_index;
92
93 void *read_data;
94 size_t read_length, read_index;
95
96 pa_usec_t latency;
97
98 /*esd_format_t format;*/
99 int32_t rate;
100
101 pa_smoother *smoother;
102 int fd;
103
104 int64_t offset;
105 int64_t encoding_overhead;
106 int32_t next_encoding_overhead;
107 double encoding_ratio;
108
109 pa_raop_client *raop;
110
111 size_t block_size;
112 };
113
114 static const char* const valid_modargs[] = {
115 "sink_name",
116 "sink_properties",
117 "server",
118 "format",
119 "rate",
120 "channels",
121 NULL
122 };
123
124 enum {
125 SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX,
126 SINK_MESSAGE_RIP_SOCKET
127 };
128
129 /* Forward declaration */
130 static void sink_set_volume_cb(pa_sink *);
131
132 static void on_connection(int fd, void*userdata) {
133 int so_sndbuf = 0;
134 socklen_t sl = sizeof(int);
135 struct userdata *u = userdata;
136 pa_assert(u);
137
138 pa_assert(u->fd < 0);
139 u->fd = fd;
140
141 if (getsockopt(u->fd, SOL_SOCKET, SO_SNDBUF, &so_sndbuf, &sl) < 0)
142 pa_log_warn("getsockopt(SO_SNDBUF) failed: %s", pa_cstrerror(errno));
143 else {
144 pa_log_debug("SO_SNDBUF is %zu.", (size_t) so_sndbuf);
145 pa_sink_set_max_request(u->sink, PA_MAX((size_t) so_sndbuf, u->block_size));
146 }
147
148 /* Set the initial volume */
149 sink_set_volume_cb(u->sink);
150
151 pa_log_debug("Connection authenticated, handing fd to IO thread...");
152
153 pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL);
154 }
155
156 static void on_close(void*userdata) {
157 struct userdata *u = userdata;
158 pa_assert(u);
159
160 pa_log_debug("Connection closed, informing IO thread...");
161
162 pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_RIP_SOCKET, NULL, 0, NULL, NULL);
163 }
164
165 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
166 struct userdata *u = PA_SINK(o)->userdata;
167
168 switch (code) {
169
170 case PA_SINK_MESSAGE_SET_STATE:
171
172 switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
173
174 case PA_SINK_SUSPENDED:
175 pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
176
177 pa_smoother_pause(u->smoother, pa_rtclock_now());
178
179 /* Issue a FLUSH if we are connected */
180 if (u->fd >= 0) {
181 pa_raop_flush(u->raop);
182 }
183 break;
184
185 case PA_SINK_IDLE:
186 case PA_SINK_RUNNING:
187
188 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
189 pa_smoother_resume(u->smoother, pa_rtclock_now(), true);
190
191 /* The connection can be closed when idle, so check to
192 see if we need to reestablish it */
193 if (u->fd < 0)
194 pa_raop_connect(u->raop);
195 else
196 pa_raop_flush(u->raop);
197 }
198
199 break;
200
201 case PA_SINK_UNLINKED:
202 case PA_SINK_INIT:
203 case PA_SINK_INVALID_STATE:
204 ;
205 }
206
207 break;
208
209 case PA_SINK_MESSAGE_GET_LATENCY: {
210 pa_usec_t w, r;
211
212 r = pa_smoother_get(u->smoother, pa_rtclock_now());
213 w = pa_bytes_to_usec((u->offset - u->encoding_overhead + (u->encoded_memchunk.length / u->encoding_ratio)), &u->sink->sample_spec);
214
215 *((pa_usec_t*) data) = w > r ? w - r : 0;
216 return 0;
217 }
218
219 case SINK_MESSAGE_PASS_SOCKET: {
220 struct pollfd *pollfd;
221
222 pa_assert(!u->rtpoll_item);
223
224 u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
225 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
226 pollfd->fd = u->fd;
227 pollfd->events = POLLOUT;
228 /*pollfd->events = */pollfd->revents = 0;
229
230 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
231 /* Our stream has been suspended so we just flush it.... */
232 pa_raop_flush(u->raop);
233 }
234 return 0;
235 }
236
237 case SINK_MESSAGE_RIP_SOCKET: {
238 if (u->fd >= 0) {
239 pa_close(u->fd);
240 u->fd = -1;
241 } else
242 /* FIXME */
243 pa_log("We should not get to this state. Cannot rip socket if not connected.");
244
245 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
246
247 pa_log_debug("RTSP control connection closed, but we're suspended so let's not worry about it... we'll open it again later");
248
249 if (u->rtpoll_item)
250 pa_rtpoll_item_free(u->rtpoll_item);
251 u->rtpoll_item = NULL;
252 } else {
253 /* Question: is this valid here: or should we do some sort of:
254 return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL);
255 ?? */
256 pa_module_unload_request(u->module, true);
257 }
258 return 0;
259 }
260 }
261
262 return pa_sink_process_msg(o, code, data, offset, chunk);
263 }
264
265 static void sink_set_volume_cb(pa_sink *s) {
266 struct userdata *u = s->userdata;
267 pa_cvolume hw;
268 pa_volume_t v;
269 char t[PA_CVOLUME_SNPRINT_VERBOSE_MAX];
270
271 pa_assert(u);
272
273 /* If we're muted we don't need to do anything */
274 if (s->muted)
275 return;
276
277 /* Calculate the max volume of all channels.
278 We'll use this as our (single) volume on the APEX device and emulate
279 any variation in channel volumes in software */
280 v = pa_cvolume_max(&s->real_volume);
281
282 /* Create a pa_cvolume version of our single value */
283 pa_cvolume_set(&hw, s->sample_spec.channels, v);
284
285 /* Perform any software manipulation of the volume needed */
286 pa_sw_cvolume_divide(&s->soft_volume, &s->real_volume, &hw);
287
288 pa_log_debug("Requested volume: %s", pa_cvolume_snprint_verbose(t, sizeof(t), &s->real_volume, &s->channel_map, false));
289 pa_log_debug("Got hardware volume: %s", pa_cvolume_snprint_verbose(t, sizeof(t), &hw, &s->channel_map, false));
290 pa_log_debug("Calculated software volume: %s",
291 pa_cvolume_snprint_verbose(t, sizeof(t), &s->soft_volume, &s->channel_map, true));
292
293 /* Any necessary software volume manipulation is done so set
294 our hw volume (or v as a single value) on the device */
295 pa_raop_client_set_volume(u->raop, v);
296 }
297
298 static void sink_set_mute_cb(pa_sink *s) {
299 struct userdata *u = s->userdata;
300
301 pa_assert(u);
302
303 if (s->muted) {
304 pa_raop_client_set_volume(u->raop, PA_VOLUME_MUTED);
305 } else {
306 sink_set_volume_cb(s);
307 }
308 }
309
310 static void thread_func(void *userdata) {
311 struct userdata *u = userdata;
312 int write_type = 0;
313 pa_memchunk silence;
314 uint32_t silence_overhead = 0;
315 double silence_ratio = 0;
316
317 pa_assert(u);
318
319 pa_log_debug("Thread starting up");
320
321 pa_thread_mq_install(&u->thread_mq);
322
323 pa_smoother_set_time_offset(u->smoother, pa_rtclock_now());
324
325 /* Create a chunk of memory that is our encoded silence sample. */
326 pa_memchunk_reset(&silence);
327
328 for (;;) {
329 int ret;
330
331 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
332 pa_sink_process_rewind(u->sink, 0);
333
334 if (u->rtpoll_item) {
335 struct pollfd *pollfd;
336 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
337
338 /* Render some data and write it to the fifo */
339 if (/*PA_SINK_IS_OPENED(u->sink->thread_info.state) && */pollfd->revents) {
340 pa_usec_t usec;
341 int64_t n;
342 void *p;
343
344 if (!silence.memblock) {
345 pa_memchunk silence_tmp;
346
347 pa_memchunk_reset(&silence_tmp);
348 silence_tmp.memblock = pa_memblock_new(u->core->mempool, 4096);
349 silence_tmp.length = 4096;
350 p = pa_memblock_acquire(silence_tmp.memblock);
351 memset(p, 0, 4096);
352 pa_memblock_release(silence_tmp.memblock);
353 pa_raop_client_encode_sample(u->raop, &silence_tmp, &silence);
354 pa_assert(0 == silence_tmp.length);
355 silence_overhead = silence_tmp.length - 4096;
356 silence_ratio = silence_tmp.length / 4096;
357 pa_memblock_unref(silence_tmp.memblock);
358 }
359
360 for (;;) {
361 ssize_t l;
362
363 if (u->encoded_memchunk.length <= 0) {
364 if (u->encoded_memchunk.memblock)
365 pa_memblock_unref(u->encoded_memchunk.memblock);
366 if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
367 size_t rl;
368
369 /* We render real data */
370 if (u->raw_memchunk.length <= 0) {
371 if (u->raw_memchunk.memblock)
372 pa_memblock_unref(u->raw_memchunk.memblock);
373 pa_memchunk_reset(&u->raw_memchunk);
374
375 /* Grab unencoded data */
376 pa_sink_render(u->sink, u->block_size, &u->raw_memchunk);
377 }
378 pa_assert(u->raw_memchunk.length > 0);
379
380 /* Encode it */
381 rl = u->raw_memchunk.length;
382 u->encoding_overhead += u->next_encoding_overhead;
383 pa_raop_client_encode_sample(u->raop, &u->raw_memchunk, &u->encoded_memchunk);
384 u->next_encoding_overhead = (u->encoded_memchunk.length - (rl - u->raw_memchunk.length));
385 u->encoding_ratio = u->encoded_memchunk.length / (rl - u->raw_memchunk.length);
386 } else {
387 /* We render some silence into our memchunk */
388 memcpy(&u->encoded_memchunk, &silence, sizeof(pa_memchunk));
389 pa_memblock_ref(silence.memblock);
390
391 /* Calculate/store some values to be used with the smoother */
392 u->next_encoding_overhead = silence_overhead;
393 u->encoding_ratio = silence_ratio;
394 }
395 }
396 pa_assert(u->encoded_memchunk.length > 0);
397
398 p = pa_memblock_acquire(u->encoded_memchunk.memblock);
399 l = pa_write(u->fd, (uint8_t*) p + u->encoded_memchunk.index, u->encoded_memchunk.length, &write_type);
400 pa_memblock_release(u->encoded_memchunk.memblock);
401
402 pa_assert(l != 0);
403
404 if (l < 0) {
405
406 if (errno == EINTR)
407 continue;
408 else if (errno == EAGAIN) {
409
410 /* OK, we filled all socket buffers up
411 * now. */
412 goto filled_up;
413
414 } else {
415 pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
416 goto fail;
417 }
418
419 } else {
420 u->offset += l;
421
422 u->encoded_memchunk.index += l;
423 u->encoded_memchunk.length -= l;
424
425 pollfd->revents = 0;
426
427 if (u->encoded_memchunk.length > 0) {
428 /* we've completely written the encoded data, so update our overhead */
429 u->encoding_overhead += u->next_encoding_overhead;
430
431 /* OK, we wrote less that we asked for,
432 * hence we can assume that the socket
433 * buffers are full now */
434 goto filled_up;
435 }
436 }
437 }
438
439 filled_up:
440
441 /* At this spot we know that the socket buffers are
442 * fully filled up. This is the best time to estimate
443 * the playback position of the server */
444
445 n = u->offset - u->encoding_overhead;
446
447 #ifdef SIOCOUTQ
448 {
449 int l;
450 if (ioctl(u->fd, SIOCOUTQ, &l) >= 0 && l > 0)
451 n -= (l / u->encoding_ratio);
452 }
453 #endif
454
455 usec = pa_bytes_to_usec(n, &u->sink->sample_spec);
456
457 if (usec > u->latency)
458 usec -= u->latency;
459 else
460 usec = 0;
461
462 pa_smoother_put(u->smoother, pa_rtclock_now(), usec);
463 }
464
465 /* Hmm, nothing to do. Let's sleep */
466 pollfd->events = POLLOUT; /*PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0;*/
467 }
468
469 if ((ret = pa_rtpoll_run(u->rtpoll, true)) < 0)
470 goto fail;
471
472 if (ret == 0)
473 goto finish;
474
475 if (u->rtpoll_item) {
476 struct pollfd* pollfd;
477
478 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
479
480 if (pollfd->revents & ~POLLOUT) {
481 if (u->sink->thread_info.state != PA_SINK_SUSPENDED) {
482 pa_log("FIFO shutdown.");
483 goto fail;
484 }
485
486 /* We expect this to happen on occasion if we are not sending data.
487 It's perfectly natural and normal and natural */
488 if (u->rtpoll_item)
489 pa_rtpoll_item_free(u->rtpoll_item);
490 u->rtpoll_item = NULL;
491 }
492 }
493 }
494
495 fail:
496 /* If this was no regular exit from the loop we have to continue
497 * processing messages until we received PA_MESSAGE_SHUTDOWN */
498 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
499 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
500
501 finish:
502 if (silence.memblock)
503 pa_memblock_unref(silence.memblock);
504 pa_log_debug("Thread shutting down");
505 }
506
507 int pa__init(pa_module*m) {
508 struct userdata *u = NULL;
509 pa_sample_spec ss;
510 pa_modargs *ma = NULL;
511 const char *server;
512 pa_sink_new_data data;
513
514 pa_assert(m);
515
516 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
517 pa_log("failed to parse module arguments");
518 goto fail;
519 }
520
521 ss = m->core->default_sample_spec;
522 if (pa_modargs_get_sample_spec(ma, &ss) < 0) {
523 pa_log("invalid sample format specification");
524 goto fail;
525 }
526
527 if ((/*ss.format != PA_SAMPLE_U8 &&*/ ss.format != PA_SAMPLE_S16NE) ||
528 (ss.channels > 2)) {
529 pa_log("sample type support is limited to mono/stereo and U8 or S16NE sample data");
530 goto fail;
531 }
532
533 u = pa_xnew0(struct userdata, 1);
534 u->core = m->core;
535 u->module = m;
536 m->userdata = u;
537 u->fd = -1;
538 u->smoother = pa_smoother_new(
539 PA_USEC_PER_SEC,
540 PA_USEC_PER_SEC*2,
541 true,
542 true,
543 10,
544 0,
545 false);
546 pa_memchunk_reset(&u->raw_memchunk);
547 pa_memchunk_reset(&u->encoded_memchunk);
548 u->offset = 0;
549 u->encoding_overhead = 0;
550 u->next_encoding_overhead = 0;
551 u->encoding_ratio = 1.0;
552
553 u->rtpoll = pa_rtpoll_new();
554 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
555 u->rtpoll_item = NULL;
556
557 /*u->format =
558 (ss.format == PA_SAMPLE_U8 ? ESD_BITS8 : ESD_BITS16) |
559 (ss.channels == 2 ? ESD_STEREO : ESD_MONO);*/
560 u->rate = ss.rate;
561 u->block_size = pa_usec_to_bytes(PA_USEC_PER_SEC/20, &ss);
562
563 u->read_data = u->write_data = NULL;
564 u->read_index = u->write_index = u->read_length = u->write_length = 0;
565
566 /*u->state = STATE_AUTH;*/
567 u->latency = 0;
568
569 if (!(server = pa_modargs_get_value(ma, "server", NULL))) {
570 pa_log("No server argument given.");
571 goto fail;
572 }
573
574 pa_sink_new_data_init(&data);
575 data.driver = __FILE__;
576 data.module = m;
577 pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
578 pa_sink_new_data_set_sample_spec(&data, &ss);
579 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, server);
580 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "music");
581 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "RAOP sink '%s'", server);
582
583 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
584 pa_log("Invalid properties");
585 pa_sink_new_data_done(&data);
586 goto fail;
587 }
588
589 u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_NETWORK);
590 pa_sink_new_data_done(&data);
591
592 if (!u->sink) {
593 pa_log("Failed to create sink.");
594 goto fail;
595 }
596
597 u->sink->parent.process_msg = sink_process_msg;
598 u->sink->userdata = u;
599 pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
600 pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
601 u->sink->flags = PA_SINK_LATENCY|PA_SINK_NETWORK;
602
603 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
604 pa_sink_set_rtpoll(u->sink, u->rtpoll);
605
606 if (!(u->raop = pa_raop_client_new(u->core, server))) {
607 pa_log("Failed to connect to server.");
608 goto fail;
609 }
610
611 pa_raop_client_set_callback(u->raop, on_connection, u);
612 pa_raop_client_set_closed_callback(u->raop, on_close, u);
613
614 if (!(u->thread = pa_thread_new("raop-sink", thread_func, u))) {
615 pa_log("Failed to create thread.");
616 goto fail;
617 }
618
619 pa_sink_put(u->sink);
620
621 pa_modargs_free(ma);
622
623 return 0;
624
625 fail:
626 if (ma)
627 pa_modargs_free(ma);
628
629 pa__done(m);
630
631 return -1;
632 }
633
634 int pa__get_n_used(pa_module *m) {
635 struct userdata *u;
636
637 pa_assert(m);
638 pa_assert_se(u = m->userdata);
639
640 return pa_sink_linked_by(u->sink);
641 }
642
643 void pa__done(pa_module*m) {
644 struct userdata *u;
645 pa_assert(m);
646
647 if (!(u = m->userdata))
648 return;
649
650 if (u->sink)
651 pa_sink_unlink(u->sink);
652
653 if (u->thread) {
654 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
655 pa_thread_free(u->thread);
656 }
657
658 pa_thread_mq_done(&u->thread_mq);
659
660 if (u->sink)
661 pa_sink_unref(u->sink);
662
663 if (u->rtpoll_item)
664 pa_rtpoll_item_free(u->rtpoll_item);
665
666 if (u->rtpoll)
667 pa_rtpoll_free(u->rtpoll);
668
669 if (u->raw_memchunk.memblock)
670 pa_memblock_unref(u->raw_memchunk.memblock);
671
672 if (u->encoded_memchunk.memblock)
673 pa_memblock_unref(u->encoded_memchunk.memblock);
674
675 if (u->raop)
676 pa_raop_client_free(u->raop);
677
678 pa_xfree(u->read_data);
679 pa_xfree(u->write_data);
680
681 if (u->smoother)
682 pa_smoother_free(u->smoother);
683
684 if (u->fd >= 0)
685 pa_close(u->fd);
686
687 pa_xfree(u);
688 }