2 This file is part of PulseAudio.
4 Copyright 2013 Alexander Couzens
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26 #include <pulse/context.h>
27 #include <pulse/timeval.h>
28 #include <pulse/xmalloc.h>
29 #include <pulse/stream.h>
30 #include <pulse/mainloop.h>
31 #include <pulse/introspect.h>
32 #include <pulse/error.h>
34 #include <pulsecore/core.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/i18n.h>
37 #include <pulsecore/sink.h>
38 #include <pulsecore/modargs.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/thread.h>
41 #include <pulsecore/thread-mq.h>
42 #include <pulsecore/poll.h>
43 #include <pulsecore/proplist-util.h>
45 #include "module-tunnel-sink-new-symdef.h"
47 PA_MODULE_AUTHOR("Alexander Couzens");
48 PA_MODULE_DESCRIPTION("Create a network sink which connects via a stream to a remote PulseAudio server");
49 PA_MODULE_VERSION(PACKAGE_VERSION
);
50 PA_MODULE_LOAD_ONCE(false);
53 "sink=<name of the remote sink> "
54 "sink_name=<name for the local sink> "
55 "sink_properties=<properties for the local sink> "
56 "format=<sample format> "
57 "channels=<number of channels> "
59 "channel_map=<channel map> "
60 "cookie=<cookie file path>"
63 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
65 static void stream_state_cb(pa_stream
*stream
, void *userdata
);
66 static void stream_changed_buffer_attr_cb(pa_stream
*stream
, void *userdata
);
67 static void stream_set_buffer_attr_cb(pa_stream
*stream
, int success
, void *userdata
);
68 static void context_state_cb(pa_context
*c
, void *userdata
);
69 static void sink_update_requested_latency_cb(pa_sink
*s
);
75 pa_thread_mq
*thread_mq
;
76 pa_mainloop
*thread_mainloop
;
77 pa_mainloop_api
*thread_mainloop_api
;
82 bool update_stream_bufferattr_after_connect
;
88 char *remote_sink_name
;
91 static const char* const valid_modargs
[] = {
101 /* "reconnect", reconnect if server comes back again - unimplemented */
105 static void reset_bufferattr(pa_buffer_attr
*bufferattr
) {
106 pa_assert(bufferattr
);
107 bufferattr
->fragsize
= (uint32_t) -1;
108 bufferattr
->minreq
= (uint32_t) -1;
109 bufferattr
->maxlength
= (uint32_t) -1;
110 bufferattr
->prebuf
= (uint32_t) -1;
111 bufferattr
->tlength
= (uint32_t) -1;
114 static pa_proplist
* tunnel_new_proplist(struct userdata
*u
) {
115 pa_proplist
*proplist
= pa_proplist_new();
117 pa_proplist_sets(proplist
, PA_PROP_APPLICATION_NAME
, "PulseAudio");
118 pa_proplist_sets(proplist
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
119 pa_proplist_sets(proplist
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
120 pa_init_proplist(proplist
);
125 static void thread_func(void *userdata
) {
126 struct userdata
*u
= userdata
;
127 pa_proplist
*proplist
;
130 pa_log_debug("Thread starting up");
131 pa_thread_mq_install(u
->thread_mq
);
133 proplist
= tunnel_new_proplist(u
);
134 u
->context
= pa_context_new_with_proplist(u
->thread_mainloop_api
,
137 pa_proplist_free(proplist
);
140 pa_log("Failed to create libpulse context");
144 if (u
->cookie_file
&& pa_context_load_cookie_from_file(u
->context
, u
->cookie_file
) != 0) {
145 pa_log_error("Can not load cookie file!");
149 pa_context_set_state_callback(u
->context
, context_state_cb
, u
);
150 if (pa_context_connect(u
->context
,
152 PA_CONTEXT_NOAUTOSPAWN
,
154 pa_log("Failed to connect libpulse context");
161 if (pa_mainloop_iterate(u
->thread_mainloop
, 1, &ret
) < 0) {
168 if (PA_UNLIKELY(u
->sink
->thread_info
.rewind_requested
))
169 pa_sink_process_rewind(u
->sink
, 0);
172 pa_stream_get_state(u
->stream
) == PA_STREAM_READY
&&
173 PA_SINK_IS_LINKED(u
->sink
->thread_info
.state
)) {
174 /* TODO: Cork the stream when the sink is suspended. */
176 if (pa_stream_is_corked(u
->stream
)) {
177 pa_operation
*operation
;
178 if ((operation
= pa_stream_cork(u
->stream
, 0, NULL
, NULL
)))
179 pa_operation_unref(operation
);
183 writable
= pa_stream_writable_size(u
->stream
);
185 pa_memchunk memchunk
;
188 pa_sink_render_full(u
->sink
, writable
, &memchunk
);
190 pa_assert(memchunk
.length
> 0);
192 /* we have new data to write */
193 p
= pa_memblock_acquire(memchunk
.memblock
);
194 /* TODO: Use pa_stream_begin_write() to reduce copying. */
195 ret
= pa_stream_write(u
->stream
,
196 (uint8_t*) p
+ memchunk
.index
,
198 NULL
, /**< A cleanup routine for the data or NULL to request an internal copy */
201 pa_memblock_release(memchunk
.memblock
);
202 pa_memblock_unref(memchunk
.memblock
);
205 pa_log_error("Could not write data into the stream ... ret = %i", ret
);
206 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
213 pa_asyncmsgq_post(u
->thread_mq
->outq
, PA_MSGOBJECT(u
->module
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
214 pa_asyncmsgq_wait_for(u
->thread_mq
->inq
, PA_MESSAGE_SHUTDOWN
);
218 pa_stream_disconnect(u
->stream
);
219 pa_stream_unref(u
->stream
);
224 pa_context_disconnect(u
->context
);
225 pa_context_unref(u
->context
);
229 pa_log_debug("Thread shutting down");
232 static void stream_state_cb(pa_stream
*stream
, void *userdata
) {
233 struct userdata
*u
= userdata
;
237 switch (pa_stream_get_state(stream
)) {
238 case PA_STREAM_FAILED
:
239 pa_log_error("Stream failed.");
240 u
->connected
= false;
241 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
243 case PA_STREAM_TERMINATED
:
244 pa_log_debug("Stream terminated.");
246 case PA_STREAM_READY
:
247 /* Only call our requested_latency_cb when requested_latency
248 * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
249 * we don't want to override the initial tlength set by the server
250 * without a good reason. */
251 if (u
->update_stream_bufferattr_after_connect
)
252 sink_update_requested_latency_cb(u
->sink
);
254 stream_changed_buffer_attr_cb(stream
, userdata
);
255 case PA_STREAM_CREATING
:
256 case PA_STREAM_UNCONNECTED
:
261 /* called when remote server changes the stream buffer_attr */
262 static void stream_changed_buffer_attr_cb(pa_stream
*stream
, void *userdata
) {
263 struct userdata
*u
= userdata
;
264 const pa_buffer_attr
*bufferattr
;
267 bufferattr
= pa_stream_get_buffer_attr(u
->stream
);
268 pa_sink_set_max_request_within_thread(u
->sink
, bufferattr
->tlength
);
271 /* called after we requested a change of the stream buffer_attr */
272 static void stream_set_buffer_attr_cb(pa_stream
*stream
, int success
, void *userdata
) {
273 stream_changed_buffer_attr_cb(stream
, userdata
);
276 static void context_state_cb(pa_context
*c
, void *userdata
) {
277 struct userdata
*u
= userdata
;
280 switch (pa_context_get_state(c
)) {
281 case PA_CONTEXT_UNCONNECTED
:
282 case PA_CONTEXT_CONNECTING
:
283 case PA_CONTEXT_AUTHORIZING
:
284 case PA_CONTEXT_SETTING_NAME
:
286 case PA_CONTEXT_READY
: {
287 pa_proplist
*proplist
;
288 pa_buffer_attr bufferattr
;
289 pa_usec_t requested_latency
;
290 char *username
= pa_get_user_name_malloc();
291 char *hostname
= pa_get_host_name_malloc();
292 /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis@lazus' */
293 char *stream_name
= pa_sprintf_malloc(_("Tunnel for %s@%s"), username
, hostname
);
297 pa_log_debug("Connection successful. Creating stream.");
298 pa_assert(!u
->stream
);
300 proplist
= tunnel_new_proplist(u
);
301 u
->stream
= pa_stream_new_with_proplist(u
->context
,
303 &u
->sink
->sample_spec
,
304 &u
->sink
->channel_map
,
306 pa_proplist_free(proplist
);
307 pa_xfree(stream_name
);
310 pa_log_error("Could not create a stream.");
311 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
315 requested_latency
= pa_sink_get_requested_latency_within_thread(u
->sink
);
316 if (requested_latency
== (uint32_t) -1)
317 requested_latency
= u
->sink
->thread_info
.max_latency
;
319 reset_bufferattr(&bufferattr
);
320 bufferattr
.tlength
= pa_usec_to_bytes(requested_latency
, &u
->sink
->sample_spec
);
322 pa_stream_set_state_callback(u
->stream
, stream_state_cb
, userdata
);
323 pa_stream_set_buffer_attr_callback(u
->stream
, stream_changed_buffer_attr_cb
, userdata
);
324 if (pa_stream_connect_playback(u
->stream
,
327 PA_STREAM_INTERPOLATE_TIMING
| PA_STREAM_DONT_MOVE
| PA_STREAM_START_CORKED
| PA_STREAM_AUTO_TIMING_UPDATE
,
330 pa_log_error("Could not connect stream.");
331 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
336 case PA_CONTEXT_FAILED
:
337 pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u
->context
)));
338 u
->connected
= false;
339 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
341 case PA_CONTEXT_TERMINATED
:
342 pa_log_debug("Context terminated.");
343 u
->connected
= false;
344 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
349 static void sink_update_requested_latency_cb(pa_sink
*s
) {
351 pa_operation
*operation
;
353 pa_usec_t block_usec
;
354 pa_buffer_attr bufferattr
;
356 pa_sink_assert_ref(s
);
357 pa_assert_se(u
= s
->userdata
);
359 block_usec
= pa_sink_get_requested_latency_within_thread(s
);
360 if (block_usec
== (pa_usec_t
) -1)
361 block_usec
= s
->thread_info
.max_latency
;
363 nbytes
= pa_usec_to_bytes(block_usec
, &s
->sample_spec
);
364 pa_sink_set_max_request_within_thread(s
, nbytes
);
367 switch (pa_stream_get_state(u
->stream
)) {
368 case PA_STREAM_READY
:
369 if (pa_stream_get_buffer_attr(u
->stream
)->tlength
== nbytes
)
372 reset_bufferattr(&bufferattr
);
373 bufferattr
.tlength
= nbytes
;
374 if ((operation
= pa_stream_set_buffer_attr(u
->stream
, &bufferattr
, stream_set_buffer_attr_cb
, u
)))
375 pa_operation_unref(operation
);
377 case PA_STREAM_CREATING
:
378 /* we have to delay our request until stream is ready */
379 u
->update_stream_bufferattr_after_connect
= true;
387 static int sink_process_msg_cb(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
388 struct userdata
*u
= PA_SINK(o
)->userdata
;
391 case PA_SINK_MESSAGE_GET_LATENCY
: {
393 pa_usec_t remote_latency
;
395 if (!PA_SINK_IS_LINKED(u
->sink
->thread_info
.state
)) {
396 *((pa_usec_t
*) data
) = 0;
401 *((pa_usec_t
*) data
) = 0;
405 if (pa_stream_get_state(u
->stream
) != PA_STREAM_READY
) {
406 *((pa_usec_t
*) data
) = 0;
410 if (pa_stream_get_latency(u
->stream
, &remote_latency
, &negative
) < 0) {
411 *((pa_usec_t
*) data
) = 0;
415 *((pa_usec_t
*) data
) = remote_latency
;
419 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
422 int pa__init(pa_module
*m
) {
423 struct userdata
*u
= NULL
;
424 pa_modargs
*ma
= NULL
;
425 pa_sink_new_data sink_data
;
428 const char *remote_server
= NULL
;
429 const char *sink_name
= NULL
;
430 char *default_sink_name
= NULL
;
434 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
435 pa_log("Failed to parse module arguments.");
439 ss
= m
->core
->default_sample_spec
;
440 map
= m
->core
->default_channel_map
;
441 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
442 pa_log("Invalid sample format specification or channel map");
446 remote_server
= pa_modargs_get_value(ma
, "server", NULL
);
447 if (!remote_server
) {
448 pa_log("No server given!");
452 u
= pa_xnew0(struct userdata
, 1);
455 u
->remote_server
= pa_xstrdup(remote_server
);
456 u
->thread_mainloop
= pa_mainloop_new();
457 if (u
->thread_mainloop
== NULL
) {
458 pa_log("Failed to create mainloop");
461 u
->thread_mainloop_api
= pa_mainloop_get_api(u
->thread_mainloop
);
462 u
->cookie_file
= pa_xstrdup(pa_modargs_get_value(ma
, "cookie", NULL
));
463 u
->remote_sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
465 u
->thread_mq
= pa_xnew0(pa_thread_mq
, 1);
466 pa_thread_mq_init_thread_mainloop(u
->thread_mq
, m
->core
->mainloop
, u
->thread_mainloop_api
);
469 pa_sink_new_data_init(&sink_data
);
470 sink_data
.driver
= __FILE__
;
471 sink_data
.module
= m
;
473 default_sink_name
= pa_sprintf_malloc("tunnel-sink-new.%s", remote_server
);
474 sink_name
= pa_modargs_get_value(ma
, "sink_name", default_sink_name
);
476 pa_sink_new_data_set_name(&sink_data
, sink_name
);
477 pa_sink_new_data_set_sample_spec(&sink_data
, &ss
);
478 pa_sink_new_data_set_channel_map(&sink_data
, &map
);
480 pa_proplist_sets(sink_data
.proplist
, PA_PROP_DEVICE_CLASS
, "sound");
481 pa_proplist_setf(sink_data
.proplist
,
482 PA_PROP_DEVICE_DESCRIPTION
,
483 _("Tunnel to %s/%s"),
485 pa_strempty(u
->remote_sink_name
));
487 if (pa_modargs_get_proplist(ma
, "sink_properties", sink_data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
488 pa_log("Invalid properties");
489 pa_sink_new_data_done(&sink_data
);
492 if (!(u
->sink
= pa_sink_new(m
->core
, &sink_data
, PA_SINK_LATENCY
| PA_SINK_DYNAMIC_LATENCY
| PA_SINK_NETWORK
))) {
493 pa_log("Failed to create sink.");
494 pa_sink_new_data_done(&sink_data
);
498 pa_sink_new_data_done(&sink_data
);
499 u
->sink
->userdata
= u
;
500 u
->sink
->parent
.process_msg
= sink_process_msg_cb
;
501 u
->sink
->update_requested_latency
= sink_update_requested_latency_cb
;
503 /* set thread message queue */
504 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
->inq
);
506 if (!(u
->thread
= pa_thread_new("tunnel-sink", thread_func
, u
))) {
507 pa_log("Failed to create thread.");
511 pa_sink_put(u
->sink
);
513 pa_xfree(default_sink_name
);
521 if (default_sink_name
)
522 pa_xfree(default_sink_name
);
529 void pa__done(pa_module
*m
) {
534 if (!(u
= m
->userdata
))
538 pa_sink_unlink(u
->sink
);
541 pa_asyncmsgq_send(u
->thread_mq
->inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
542 pa_thread_free(u
->thread
);
546 pa_thread_mq_done(u
->thread_mq
);
547 pa_xfree(u
->thread_mq
);
550 if (u
->thread_mainloop
)
551 pa_mainloop_free(u
->thread_mainloop
);
554 pa_xfree(u
->cookie_file
);
556 if (u
->remote_sink_name
)
557 pa_xfree(u
->remote_sink_name
);
559 if (u
->remote_server
)
560 pa_xfree(u
->remote_server
);
563 pa_sink_unref(u
->sink
);