]> code.delx.au - pulseaudio/blob - src/modules/raop/module-raop-sink.c
Remove pa_bool_t and replace it with bool.
[pulseaudio] / src / modules / raop / module-raop-sink.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2008 Colin Guthrie
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <errno.h>
30 #include <string.h>
31 #include <unistd.h>
32 #include <sys/socket.h>
33 #include <netinet/in.h>
34 #include <netinet/tcp.h>
35 #include <sys/ioctl.h>
36
37 #ifdef HAVE_LINUX_SOCKIOS_H
38 #include <linux/sockios.h>
39 #endif
40
41 #include <pulse/rtclock.h>
42 #include <pulse/timeval.h>
43 #include <pulse/xmalloc.h>
44
45 #include <pulsecore/core-error.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/log.h>
51 #include <pulsecore/socket-client.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/thread.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/poll.h>
56
57 #include "module-raop-sink-symdef.h"
58 #include "rtp.h"
59 #include "sdp.h"
60 #include "sap.h"
61 #include "raop_client.h"
62
63 PA_MODULE_AUTHOR("Colin Guthrie");
64 PA_MODULE_DESCRIPTION("RAOP Sink");
65 PA_MODULE_VERSION(PACKAGE_VERSION);
66 PA_MODULE_LOAD_ONCE(false);
67 PA_MODULE_USAGE(
68 "sink_name=<name for the sink> "
69 "sink_properties=<properties for the sink> "
70 "server=<address> "
71 "format=<sample format> "
72 "rate=<sample rate> "
73 "channels=<number of channels>");
74
75 #define DEFAULT_SINK_NAME "raop"
76
77 struct userdata {
78 pa_core *core;
79 pa_module *module;
80 pa_sink *sink;
81
82 pa_thread_mq thread_mq;
83 pa_rtpoll *rtpoll;
84 pa_rtpoll_item *rtpoll_item;
85 pa_thread *thread;
86
87 pa_memchunk raw_memchunk;
88 pa_memchunk encoded_memchunk;
89
90 void *write_data;
91 size_t write_length, write_index;
92
93 void *read_data;
94 size_t read_length, read_index;
95
96 pa_usec_t latency;
97
98 /*esd_format_t format;*/
99 int32_t rate;
100
101 pa_smoother *smoother;
102 int fd;
103
104 int64_t offset;
105 int64_t encoding_overhead;
106 int32_t next_encoding_overhead;
107 double encoding_ratio;
108
109 pa_raop_client *raop;
110
111 size_t block_size;
112 };
113
114 static const char* const valid_modargs[] = {
115 "sink_name",
116 "sink_properties",
117 "server",
118 "format",
119 "rate",
120 "channels",
121 NULL
122 };
123
124 enum {
125 SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX,
126 SINK_MESSAGE_RIP_SOCKET
127 };
128
129 /* Forward declaration */
130 static void sink_set_volume_cb(pa_sink *);
131
132 static void on_connection(int fd, void*userdata) {
133 int so_sndbuf = 0;
134 socklen_t sl = sizeof(int);
135 struct userdata *u = userdata;
136 pa_assert(u);
137
138 pa_assert(u->fd < 0);
139 u->fd = fd;
140
141 if (getsockopt(u->fd, SOL_SOCKET, SO_SNDBUF, &so_sndbuf, &sl) < 0)
142 pa_log_warn("getsockopt(SO_SNDBUF) failed: %s", pa_cstrerror(errno));
143 else {
144 pa_log_debug("SO_SNDBUF is %zu.", (size_t) so_sndbuf);
145 pa_sink_set_max_request(u->sink, PA_MAX((size_t) so_sndbuf, u->block_size));
146 }
147
148 /* Set the initial volume */
149 sink_set_volume_cb(u->sink);
150
151 pa_log_debug("Connection authenticated, handing fd to IO thread...");
152
153 pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL);
154 }
155
156 static void on_close(void*userdata) {
157 struct userdata *u = userdata;
158 pa_assert(u);
159
160 pa_log_debug("Connection closed, informing IO thread...");
161
162 pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_RIP_SOCKET, NULL, 0, NULL, NULL);
163 }
164
165 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
166 struct userdata *u = PA_SINK(o)->userdata;
167
168 switch (code) {
169
170 case PA_SINK_MESSAGE_SET_STATE:
171
172 switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
173
174 case PA_SINK_SUSPENDED:
175 pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
176
177 pa_smoother_pause(u->smoother, pa_rtclock_now());
178
179 /* Issue a FLUSH if we are connected */
180 if (u->fd >= 0) {
181 pa_raop_flush(u->raop);
182 }
183 break;
184
185 case PA_SINK_IDLE:
186 case PA_SINK_RUNNING:
187
188 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
189 pa_smoother_resume(u->smoother, pa_rtclock_now(), true);
190
191 /* The connection can be closed when idle, so check to
192 see if we need to reestablish it */
193 if (u->fd < 0)
194 pa_raop_connect(u->raop);
195 else
196 pa_raop_flush(u->raop);
197 }
198
199 break;
200
201 case PA_SINK_UNLINKED:
202 case PA_SINK_INIT:
203 case PA_SINK_INVALID_STATE:
204 ;
205 }
206
207 break;
208
209 case PA_SINK_MESSAGE_GET_LATENCY: {
210 pa_usec_t w, r;
211
212 r = pa_smoother_get(u->smoother, pa_rtclock_now());
213 w = pa_bytes_to_usec((u->offset - u->encoding_overhead + (u->encoded_memchunk.length / u->encoding_ratio)), &u->sink->sample_spec);
214
215 *((pa_usec_t*) data) = w > r ? w - r : 0;
216 return 0;
217 }
218
219 case SINK_MESSAGE_PASS_SOCKET: {
220 struct pollfd *pollfd;
221
222 pa_assert(!u->rtpoll_item);
223
224 u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
225 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
226 pollfd->fd = u->fd;
227 pollfd->events = POLLOUT;
228 /*pollfd->events = */pollfd->revents = 0;
229
230 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
231 /* Our stream has been suspended so we just flush it.... */
232 pa_raop_flush(u->raop);
233 }
234 return 0;
235 }
236
237 case SINK_MESSAGE_RIP_SOCKET: {
238 if (u->fd >= 0) {
239 pa_close(u->fd);
240 u->fd = -1;
241 } else
242 /* FIXME */
243 pa_log("We should not get to this state. Cannot rip socket if not connected.");
244
245 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
246
247 pa_log_debug("RTSP control connection closed, but we're suspended so let's not worry about it... we'll open it again later");
248
249 if (u->rtpoll_item)
250 pa_rtpoll_item_free(u->rtpoll_item);
251 u->rtpoll_item = NULL;
252 } else {
253 /* Question: is this valid here: or should we do some sort of:
254 return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL);
255 ?? */
256 pa_module_unload_request(u->module, true);
257 }
258 return 0;
259 }
260 }
261
262 return pa_sink_process_msg(o, code, data, offset, chunk);
263 }
264
265 static void sink_set_volume_cb(pa_sink *s) {
266 struct userdata *u = s->userdata;
267 pa_cvolume hw;
268 pa_volume_t v;
269 char t[PA_CVOLUME_SNPRINT_MAX];
270
271 pa_assert(u);
272
273 /* If we're muted we don't need to do anything */
274 if (s->muted)
275 return;
276
277 /* Calculate the max volume of all channels.
278 We'll use this as our (single) volume on the APEX device and emulate
279 any variation in channel volumes in software */
280 v = pa_cvolume_max(&s->real_volume);
281
282 /* Create a pa_cvolume version of our single value */
283 pa_cvolume_set(&hw, s->sample_spec.channels, v);
284
285 /* Perform any software manipulation of the volume needed */
286 pa_sw_cvolume_divide(&s->soft_volume, &s->real_volume, &hw);
287
288 pa_log_debug("Requested volume: %s", pa_cvolume_snprint(t, sizeof(t), &s->real_volume));
289 pa_log_debug("Got hardware volume: %s", pa_cvolume_snprint(t, sizeof(t), &hw));
290 pa_log_debug("Calculated software volume: %s", pa_cvolume_snprint(t, sizeof(t), &s->soft_volume));
291
292 /* Any necessary software volume manipulation is done so set
293 our hw volume (or v as a single value) on the device */
294 pa_raop_client_set_volume(u->raop, v);
295 }
296
297 static void sink_set_mute_cb(pa_sink *s) {
298 struct userdata *u = s->userdata;
299
300 pa_assert(u);
301
302 if (s->muted) {
303 pa_raop_client_set_volume(u->raop, PA_VOLUME_MUTED);
304 } else {
305 sink_set_volume_cb(s);
306 }
307 }
308
309 static void thread_func(void *userdata) {
310 struct userdata *u = userdata;
311 int write_type = 0;
312 pa_memchunk silence;
313 uint32_t silence_overhead = 0;
314 double silence_ratio = 0;
315
316 pa_assert(u);
317
318 pa_log_debug("Thread starting up");
319
320 pa_thread_mq_install(&u->thread_mq);
321
322 pa_smoother_set_time_offset(u->smoother, pa_rtclock_now());
323
324 /* Create a chunk of memory that is our encoded silence sample. */
325 pa_memchunk_reset(&silence);
326
327 for (;;) {
328 int ret;
329
330 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
331 pa_sink_process_rewind(u->sink, 0);
332
333 if (u->rtpoll_item) {
334 struct pollfd *pollfd;
335 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
336
337 /* Render some data and write it to the fifo */
338 if (/*PA_SINK_IS_OPENED(u->sink->thread_info.state) && */pollfd->revents) {
339 pa_usec_t usec;
340 int64_t n;
341 void *p;
342
343 if (!silence.memblock) {
344 pa_memchunk silence_tmp;
345
346 pa_memchunk_reset(&silence_tmp);
347 silence_tmp.memblock = pa_memblock_new(u->core->mempool, 4096);
348 silence_tmp.length = 4096;
349 p = pa_memblock_acquire(silence_tmp.memblock);
350 memset(p, 0, 4096);
351 pa_memblock_release(silence_tmp.memblock);
352 pa_raop_client_encode_sample(u->raop, &silence_tmp, &silence);
353 pa_assert(0 == silence_tmp.length);
354 silence_overhead = silence_tmp.length - 4096;
355 silence_ratio = silence_tmp.length / 4096;
356 pa_memblock_unref(silence_tmp.memblock);
357 }
358
359 for (;;) {
360 ssize_t l;
361
362 if (u->encoded_memchunk.length <= 0) {
363 if (u->encoded_memchunk.memblock)
364 pa_memblock_unref(u->encoded_memchunk.memblock);
365 if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
366 size_t rl;
367
368 /* We render real data */
369 if (u->raw_memchunk.length <= 0) {
370 if (u->raw_memchunk.memblock)
371 pa_memblock_unref(u->raw_memchunk.memblock);
372 pa_memchunk_reset(&u->raw_memchunk);
373
374 /* Grab unencoded data */
375 pa_sink_render(u->sink, u->block_size, &u->raw_memchunk);
376 }
377 pa_assert(u->raw_memchunk.length > 0);
378
379 /* Encode it */
380 rl = u->raw_memchunk.length;
381 u->encoding_overhead += u->next_encoding_overhead;
382 pa_raop_client_encode_sample(u->raop, &u->raw_memchunk, &u->encoded_memchunk);
383 u->next_encoding_overhead = (u->encoded_memchunk.length - (rl - u->raw_memchunk.length));
384 u->encoding_ratio = u->encoded_memchunk.length / (rl - u->raw_memchunk.length);
385 } else {
386 /* We render some silence into our memchunk */
387 memcpy(&u->encoded_memchunk, &silence, sizeof(pa_memchunk));
388 pa_memblock_ref(silence.memblock);
389
390 /* Calculate/store some values to be used with the smoother */
391 u->next_encoding_overhead = silence_overhead;
392 u->encoding_ratio = silence_ratio;
393 }
394 }
395 pa_assert(u->encoded_memchunk.length > 0);
396
397 p = pa_memblock_acquire(u->encoded_memchunk.memblock);
398 l = pa_write(u->fd, (uint8_t*) p + u->encoded_memchunk.index, u->encoded_memchunk.length, &write_type);
399 pa_memblock_release(u->encoded_memchunk.memblock);
400
401 pa_assert(l != 0);
402
403 if (l < 0) {
404
405 if (errno == EINTR)
406 continue;
407 else if (errno == EAGAIN) {
408
409 /* OK, we filled all socket buffers up
410 * now. */
411 goto filled_up;
412
413 } else {
414 pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
415 goto fail;
416 }
417
418 } else {
419 u->offset += l;
420
421 u->encoded_memchunk.index += l;
422 u->encoded_memchunk.length -= l;
423
424 pollfd->revents = 0;
425
426 if (u->encoded_memchunk.length > 0) {
427 /* we've completely written the encoded data, so update our overhead */
428 u->encoding_overhead += u->next_encoding_overhead;
429
430 /* OK, we wrote less that we asked for,
431 * hence we can assume that the socket
432 * buffers are full now */
433 goto filled_up;
434 }
435 }
436 }
437
438 filled_up:
439
440 /* At this spot we know that the socket buffers are
441 * fully filled up. This is the best time to estimate
442 * the playback position of the server */
443
444 n = u->offset - u->encoding_overhead;
445
446 #ifdef SIOCOUTQ
447 {
448 int l;
449 if (ioctl(u->fd, SIOCOUTQ, &l) >= 0 && l > 0)
450 n -= (l / u->encoding_ratio);
451 }
452 #endif
453
454 usec = pa_bytes_to_usec(n, &u->sink->sample_spec);
455
456 if (usec > u->latency)
457 usec -= u->latency;
458 else
459 usec = 0;
460
461 pa_smoother_put(u->smoother, pa_rtclock_now(), usec);
462 }
463
464 /* Hmm, nothing to do. Let's sleep */
465 pollfd->events = POLLOUT; /*PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0;*/
466 }
467
468 if ((ret = pa_rtpoll_run(u->rtpoll, true)) < 0)
469 goto fail;
470
471 if (ret == 0)
472 goto finish;
473
474 if (u->rtpoll_item) {
475 struct pollfd* pollfd;
476
477 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
478
479 if (pollfd->revents & ~POLLOUT) {
480 if (u->sink->thread_info.state != PA_SINK_SUSPENDED) {
481 pa_log("FIFO shutdown.");
482 goto fail;
483 }
484
485 /* We expect this to happen on occasion if we are not sending data.
486 It's perfectly natural and normal and natural */
487 if (u->rtpoll_item)
488 pa_rtpoll_item_free(u->rtpoll_item);
489 u->rtpoll_item = NULL;
490 }
491 }
492 }
493
494 fail:
495 /* If this was no regular exit from the loop we have to continue
496 * processing messages until we received PA_MESSAGE_SHUTDOWN */
497 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
498 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
499
500 finish:
501 if (silence.memblock)
502 pa_memblock_unref(silence.memblock);
503 pa_log_debug("Thread shutting down");
504 }
505
506 int pa__init(pa_module*m) {
507 struct userdata *u = NULL;
508 pa_sample_spec ss;
509 pa_modargs *ma = NULL;
510 const char *server;
511 pa_sink_new_data data;
512
513 pa_assert(m);
514
515 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
516 pa_log("failed to parse module arguments");
517 goto fail;
518 }
519
520 ss = m->core->default_sample_spec;
521 if (pa_modargs_get_sample_spec(ma, &ss) < 0) {
522 pa_log("invalid sample format specification");
523 goto fail;
524 }
525
526 if ((/*ss.format != PA_SAMPLE_U8 &&*/ ss.format != PA_SAMPLE_S16NE) ||
527 (ss.channels > 2)) {
528 pa_log("sample type support is limited to mono/stereo and U8 or S16NE sample data");
529 goto fail;
530 }
531
532 u = pa_xnew0(struct userdata, 1);
533 u->core = m->core;
534 u->module = m;
535 m->userdata = u;
536 u->fd = -1;
537 u->smoother = pa_smoother_new(
538 PA_USEC_PER_SEC,
539 PA_USEC_PER_SEC*2,
540 true,
541 true,
542 10,
543 0,
544 false);
545 pa_memchunk_reset(&u->raw_memchunk);
546 pa_memchunk_reset(&u->encoded_memchunk);
547 u->offset = 0;
548 u->encoding_overhead = 0;
549 u->next_encoding_overhead = 0;
550 u->encoding_ratio = 1.0;
551
552 u->rtpoll = pa_rtpoll_new();
553 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
554 u->rtpoll_item = NULL;
555
556 /*u->format =
557 (ss.format == PA_SAMPLE_U8 ? ESD_BITS8 : ESD_BITS16) |
558 (ss.channels == 2 ? ESD_STEREO : ESD_MONO);*/
559 u->rate = ss.rate;
560 u->block_size = pa_usec_to_bytes(PA_USEC_PER_SEC/20, &ss);
561
562 u->read_data = u->write_data = NULL;
563 u->read_index = u->write_index = u->read_length = u->write_length = 0;
564
565 /*u->state = STATE_AUTH;*/
566 u->latency = 0;
567
568 if (!(server = pa_modargs_get_value(ma, "server", NULL))) {
569 pa_log("No server argument given.");
570 goto fail;
571 }
572
573 pa_sink_new_data_init(&data);
574 data.driver = __FILE__;
575 data.module = m;
576 pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
577 pa_sink_new_data_set_sample_spec(&data, &ss);
578 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, server);
579 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "music");
580 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "RAOP sink '%s'", server);
581
582 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
583 pa_log("Invalid properties");
584 pa_sink_new_data_done(&data);
585 goto fail;
586 }
587
588 u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_NETWORK);
589 pa_sink_new_data_done(&data);
590
591 if (!u->sink) {
592 pa_log("Failed to create sink.");
593 goto fail;
594 }
595
596 u->sink->parent.process_msg = sink_process_msg;
597 u->sink->userdata = u;
598 pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
599 pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
600 u->sink->flags = PA_SINK_LATENCY|PA_SINK_NETWORK;
601
602 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
603 pa_sink_set_rtpoll(u->sink, u->rtpoll);
604
605 if (!(u->raop = pa_raop_client_new(u->core, server))) {
606 pa_log("Failed to connect to server.");
607 goto fail;
608 }
609
610 pa_raop_client_set_callback(u->raop, on_connection, u);
611 pa_raop_client_set_closed_callback(u->raop, on_close, u);
612
613 if (!(u->thread = pa_thread_new("raop-sink", thread_func, u))) {
614 pa_log("Failed to create thread.");
615 goto fail;
616 }
617
618 pa_sink_put(u->sink);
619
620 pa_modargs_free(ma);
621
622 return 0;
623
624 fail:
625 if (ma)
626 pa_modargs_free(ma);
627
628 pa__done(m);
629
630 return -1;
631 }
632
633 int pa__get_n_used(pa_module *m) {
634 struct userdata *u;
635
636 pa_assert(m);
637 pa_assert_se(u = m->userdata);
638
639 return pa_sink_linked_by(u->sink);
640 }
641
642 void pa__done(pa_module*m) {
643 struct userdata *u;
644 pa_assert(m);
645
646 if (!(u = m->userdata))
647 return;
648
649 if (u->sink)
650 pa_sink_unlink(u->sink);
651
652 if (u->thread) {
653 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
654 pa_thread_free(u->thread);
655 }
656
657 pa_thread_mq_done(&u->thread_mq);
658
659 if (u->sink)
660 pa_sink_unref(u->sink);
661
662 if (u->rtpoll_item)
663 pa_rtpoll_item_free(u->rtpoll_item);
664
665 if (u->rtpoll)
666 pa_rtpoll_free(u->rtpoll);
667
668 if (u->raw_memchunk.memblock)
669 pa_memblock_unref(u->raw_memchunk.memblock);
670
671 if (u->encoded_memchunk.memblock)
672 pa_memblock_unref(u->encoded_memchunk.memblock);
673
674 if (u->raop)
675 pa_raop_client_free(u->raop);
676
677 pa_xfree(u->read_data);
678 pa_xfree(u->write_data);
679
680 if (u->smoother)
681 pa_smoother_free(u->smoother);
682
683 if (u->fd >= 0)
684 pa_close(u->fd);
685
686 pa_xfree(u);
687 }