2 This file is part of PulseAudio.
4 Copyright 2013 Alexander Couzens
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26 #include <pulse/context.h>
27 #include <pulse/timeval.h>
28 #include <pulse/xmalloc.h>
29 #include <pulse/stream.h>
30 #include <pulse/mainloop.h>
31 #include <pulse/introspect.h>
32 #include <pulse/error.h>
34 #include <pulsecore/core.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/i18n.h>
37 #include <pulsecore/sink.h>
38 #include <pulsecore/modargs.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/thread.h>
41 #include <pulsecore/thread-mq.h>
42 #include <pulsecore/poll.h>
43 #include <pulsecore/proplist-util.h>
45 #include "module-tunnel-sink-new-symdef.h"
47 PA_MODULE_AUTHOR("Alexander Couzens");
48 PA_MODULE_DESCRIPTION("Create a network sink which connects via a stream to a remote PulseAudio server");
49 PA_MODULE_VERSION(PACKAGE_VERSION
);
50 PA_MODULE_LOAD_ONCE(false);
53 "sink=<name of the remote sink> "
54 "sink_name=<name for the local sink> "
55 "sink_properties=<properties for the local sink> "
56 "format=<sample format> "
57 "channels=<number of channels> "
59 "channel_map=<channel map> "
60 "cookie=<cookie file path>"
63 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
65 static void stream_state_cb(pa_stream
*stream
, void *userdata
);
66 static void stream_changed_buffer_attr_cb(pa_stream
*stream
, void *userdata
);
67 static void stream_set_buffer_attr_cb(pa_stream
*stream
, int success
, void *userdata
);
68 static void context_state_cb(pa_context
*c
, void *userdata
);
69 static void sink_update_requested_latency_cb(pa_sink
*s
);
75 pa_thread_mq
*thread_mq
;
76 pa_mainloop
*thread_mainloop
;
77 pa_mainloop_api
*thread_mainloop_api
;
82 bool update_stream_bufferattr_after_connect
;
88 char *remote_sink_name
;
91 static const char* const valid_modargs
[] = {
101 /* "reconnect", reconnect if server comes back again - unimplemented */
105 static void reset_bufferattr(pa_buffer_attr
*bufferattr
) {
106 pa_assert(bufferattr
);
107 bufferattr
->fragsize
= (uint32_t) -1;
108 bufferattr
->minreq
= (uint32_t) -1;
109 bufferattr
->maxlength
= (uint32_t) -1;
110 bufferattr
->prebuf
= (uint32_t) -1;
111 bufferattr
->tlength
= (uint32_t) -1;
114 static pa_proplist
* tunnel_new_proplist(struct userdata
*u
) {
115 pa_proplist
*proplist
= pa_proplist_new();
117 pa_proplist_sets(proplist
, PA_PROP_APPLICATION_NAME
, "PulseAudio");
118 pa_proplist_sets(proplist
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
119 pa_proplist_sets(proplist
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
120 pa_init_proplist(proplist
);
125 static void thread_func(void *userdata
) {
126 struct userdata
*u
= userdata
;
127 pa_proplist
*proplist
;
130 pa_log_debug("Thread starting up");
131 pa_thread_mq_install(u
->thread_mq
);
133 proplist
= tunnel_new_proplist(u
);
134 u
->context
= pa_context_new_with_proplist(u
->thread_mainloop_api
,
137 pa_proplist_free(proplist
);
140 pa_log("Failed to create libpulse context");
144 if (u
->cookie_file
&& pa_context_load_cookie_from_file(u
->context
, u
->cookie_file
) != 0) {
145 pa_log_error("Can not load cookie file!");
149 pa_context_set_state_callback(u
->context
, context_state_cb
, u
);
150 if (pa_context_connect(u
->context
,
152 PA_CONTEXT_NOAUTOSPAWN
,
154 pa_log("Failed to connect libpulse context");
161 if (pa_mainloop_iterate(u
->thread_mainloop
, 1, &ret
) < 0) {
168 if (PA_UNLIKELY(u
->sink
->thread_info
.rewind_requested
))
169 pa_sink_process_rewind(u
->sink
, 0);
172 pa_stream_get_state(u
->stream
) == PA_STREAM_READY
&&
173 PA_SINK_IS_LINKED(u
->sink
->thread_info
.state
)) {
174 /* TODO: Cork the stream when the sink is suspended. */
176 if (pa_stream_is_corked(u
->stream
)) {
177 pa_operation
*operation
;
178 if ((operation
= pa_stream_cork(u
->stream
, 0, NULL
, NULL
)))
179 pa_operation_unref(operation
);
183 writable
= pa_stream_writable_size(u
->stream
);
185 pa_memchunk memchunk
;
188 pa_sink_render_full(u
->sink
, writable
, &memchunk
);
190 pa_assert(memchunk
.length
> 0);
192 /* we have new data to write */
193 p
= pa_memblock_acquire(memchunk
.memblock
);
194 /* TODO: Use pa_stream_begin_write() to reduce copying. */
195 ret
= pa_stream_write(u
->stream
,
196 (uint8_t*) p
+ memchunk
.index
,
198 NULL
, /**< A cleanup routine for the data or NULL to request an internal copy */
201 pa_memblock_release(memchunk
.memblock
);
202 pa_memblock_unref(memchunk
.memblock
);
205 pa_log_error("Could not write data into the stream ... ret = %i", ret
);
206 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
213 pa_asyncmsgq_post(u
->thread_mq
->outq
, PA_MSGOBJECT(u
->module
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
214 pa_asyncmsgq_wait_for(u
->thread_mq
->inq
, PA_MESSAGE_SHUTDOWN
);
218 pa_stream_disconnect(u
->stream
);
219 pa_stream_unref(u
->stream
);
224 pa_context_disconnect(u
->context
);
225 pa_context_unref(u
->context
);
229 pa_log_debug("Thread shutting down");
232 static void stream_state_cb(pa_stream
*stream
, void *userdata
) {
233 struct userdata
*u
= userdata
;
237 switch (pa_stream_get_state(stream
)) {
238 case PA_STREAM_FAILED
:
239 pa_log_error("Stream failed.");
240 u
->connected
= false;
241 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
243 case PA_STREAM_TERMINATED
:
244 pa_log_debug("Stream terminated.");
246 case PA_STREAM_READY
:
247 /* Only call our requested_latency_cb when requested_latency
248 * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
249 * we don't want to override the initial tlength set by the server
250 * without a good reason. */
251 if (u
->update_stream_bufferattr_after_connect
)
252 sink_update_requested_latency_cb(u
->sink
);
254 stream_changed_buffer_attr_cb(stream
, userdata
);
255 case PA_STREAM_CREATING
:
256 case PA_STREAM_UNCONNECTED
:
261 /* called when remote server changes the stream buffer_attr */
262 static void stream_changed_buffer_attr_cb(pa_stream
*stream
, void *userdata
) {
263 struct userdata
*u
= userdata
;
264 const pa_buffer_attr
*bufferattr
;
267 bufferattr
= pa_stream_get_buffer_attr(u
->stream
);
268 pa_sink_set_max_request_within_thread(u
->sink
, bufferattr
->tlength
);
271 /* called after we requested a change of the stream buffer_attr */
272 static void stream_set_buffer_attr_cb(pa_stream
*stream
, int success
, void *userdata
) {
273 stream_changed_buffer_attr_cb(stream
, userdata
);
276 static void context_state_cb(pa_context
*c
, void *userdata
) {
277 struct userdata
*u
= userdata
;
280 switch (pa_context_get_state(c
)) {
281 case PA_CONTEXT_UNCONNECTED
:
282 case PA_CONTEXT_CONNECTING
:
283 case PA_CONTEXT_AUTHORIZING
:
284 case PA_CONTEXT_SETTING_NAME
:
286 case PA_CONTEXT_READY
: {
287 pa_proplist
*proplist
;
288 pa_buffer_attr bufferattr
;
289 pa_usec_t requested_latency
;
290 const char *username
= pa_get_user_name_malloc();
291 const char *hostname
= pa_get_host_name_malloc();
292 /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis@lazus' */
293 char *stream_name
= pa_sprintf_malloc(_("Tunnel for %s@%s"), username
, hostname
);
295 pa_log_debug("Connection successful. Creating stream.");
296 pa_assert(!u
->stream
);
298 proplist
= tunnel_new_proplist(u
);
299 u
->stream
= pa_stream_new_with_proplist(u
->context
,
301 &u
->sink
->sample_spec
,
302 &u
->sink
->channel_map
,
304 pa_proplist_free(proplist
);
305 pa_xfree(stream_name
);
308 pa_log_error("Could not create a stream.");
309 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
313 requested_latency
= pa_sink_get_requested_latency_within_thread(u
->sink
);
314 if (requested_latency
== (uint32_t) -1)
315 requested_latency
= u
->sink
->thread_info
.max_latency
;
317 reset_bufferattr(&bufferattr
);
318 bufferattr
.tlength
= pa_usec_to_bytes(requested_latency
, &u
->sink
->sample_spec
);
320 pa_stream_set_state_callback(u
->stream
, stream_state_cb
, userdata
);
321 pa_stream_set_buffer_attr_callback(u
->stream
, stream_changed_buffer_attr_cb
, userdata
);
322 if (pa_stream_connect_playback(u
->stream
,
325 PA_STREAM_INTERPOLATE_TIMING
| PA_STREAM_DONT_MOVE
| PA_STREAM_START_CORKED
| PA_STREAM_AUTO_TIMING_UPDATE
,
328 pa_log_error("Could not connect stream.");
329 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
334 case PA_CONTEXT_FAILED
:
335 pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u
->context
)));
336 u
->connected
= false;
337 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
339 case PA_CONTEXT_TERMINATED
:
340 pa_log_debug("Context terminated.");
341 u
->connected
= false;
342 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
347 static void sink_update_requested_latency_cb(pa_sink
*s
) {
349 pa_operation
*operation
;
351 pa_usec_t block_usec
;
352 pa_buffer_attr bufferattr
;
354 pa_sink_assert_ref(s
);
355 pa_assert_se(u
= s
->userdata
);
357 block_usec
= pa_sink_get_requested_latency_within_thread(s
);
358 if (block_usec
== (pa_usec_t
) -1)
359 block_usec
= s
->thread_info
.max_latency
;
361 nbytes
= pa_usec_to_bytes(block_usec
, &s
->sample_spec
);
362 pa_sink_set_max_request_within_thread(s
, nbytes
);
365 switch (pa_stream_get_state(u
->stream
)) {
366 case PA_STREAM_READY
:
367 if (pa_stream_get_buffer_attr(u
->stream
)->tlength
== nbytes
)
370 reset_bufferattr(&bufferattr
);
371 bufferattr
.tlength
= nbytes
;
372 if ((operation
= pa_stream_set_buffer_attr(u
->stream
, &bufferattr
, stream_set_buffer_attr_cb
, u
)))
373 pa_operation_unref(operation
);
375 case PA_STREAM_CREATING
:
376 /* we have to delay our request until stream is ready */
377 u
->update_stream_bufferattr_after_connect
= true;
385 static int sink_process_msg_cb(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
386 struct userdata
*u
= PA_SINK(o
)->userdata
;
389 case PA_SINK_MESSAGE_GET_LATENCY
: {
391 pa_usec_t remote_latency
;
393 if (!PA_SINK_IS_LINKED(u
->sink
->thread_info
.state
)) {
394 *((pa_usec_t
*) data
) = 0;
399 *((pa_usec_t
*) data
) = 0;
403 if (pa_stream_get_state(u
->stream
) != PA_STREAM_READY
) {
404 *((pa_usec_t
*) data
) = 0;
408 if (pa_stream_get_latency(u
->stream
, &remote_latency
, &negative
) < 0) {
409 *((pa_usec_t
*) data
) = 0;
413 *((pa_usec_t
*) data
) = remote_latency
;
417 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
420 int pa__init(pa_module
*m
) {
421 struct userdata
*u
= NULL
;
422 pa_modargs
*ma
= NULL
;
423 pa_sink_new_data sink_data
;
426 const char *remote_server
= NULL
;
427 const char *sink_name
= NULL
;
428 char *default_sink_name
= NULL
;
432 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
433 pa_log("Failed to parse module arguments.");
437 ss
= m
->core
->default_sample_spec
;
438 map
= m
->core
->default_channel_map
;
439 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
440 pa_log("Invalid sample format specification or channel map");
444 remote_server
= pa_modargs_get_value(ma
, "server", NULL
);
445 if (!remote_server
) {
446 pa_log("No server given!");
450 u
= pa_xnew0(struct userdata
, 1);
453 u
->remote_server
= pa_xstrdup(remote_server
);
454 u
->thread_mainloop
= pa_mainloop_new();
455 if (u
->thread_mainloop
== NULL
) {
456 pa_log("Failed to create mainloop");
459 u
->thread_mainloop_api
= pa_mainloop_get_api(u
->thread_mainloop
);
460 u
->cookie_file
= pa_xstrdup(pa_modargs_get_value(ma
, "cookie", NULL
));
461 u
->remote_sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
463 u
->thread_mq
= pa_xnew0(pa_thread_mq
, 1);
464 pa_thread_mq_init_thread_mainloop(u
->thread_mq
, m
->core
->mainloop
, u
->thread_mainloop_api
);
467 pa_sink_new_data_init(&sink_data
);
468 sink_data
.driver
= __FILE__
;
469 sink_data
.module
= m
;
471 default_sink_name
= pa_sprintf_malloc("tunnel-sink-new.%s", remote_server
);
472 sink_name
= pa_modargs_get_value(ma
, "sink_name", default_sink_name
);
474 pa_sink_new_data_set_name(&sink_data
, sink_name
);
475 pa_sink_new_data_set_sample_spec(&sink_data
, &ss
);
476 pa_sink_new_data_set_channel_map(&sink_data
, &map
);
478 pa_proplist_sets(sink_data
.proplist
, PA_PROP_DEVICE_CLASS
, "sound");
479 pa_proplist_setf(sink_data
.proplist
,
480 PA_PROP_DEVICE_DESCRIPTION
,
481 _("Tunnel to %s/%s"),
483 pa_strempty(u
->remote_sink_name
));
485 if (pa_modargs_get_proplist(ma
, "sink_properties", sink_data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
486 pa_log("Invalid properties");
487 pa_sink_new_data_done(&sink_data
);
490 if (!(u
->sink
= pa_sink_new(m
->core
, &sink_data
, PA_SINK_LATENCY
| PA_SINK_DYNAMIC_LATENCY
| PA_SINK_NETWORK
))) {
491 pa_log("Failed to create sink.");
492 pa_sink_new_data_done(&sink_data
);
496 pa_sink_new_data_done(&sink_data
);
497 u
->sink
->userdata
= u
;
498 u
->sink
->parent
.process_msg
= sink_process_msg_cb
;
499 u
->sink
->update_requested_latency
= sink_update_requested_latency_cb
;
501 /* set thread message queue */
502 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
->inq
);
504 if (!(u
->thread
= pa_thread_new("tunnel-sink", thread_func
, u
))) {
505 pa_log("Failed to create thread.");
509 pa_sink_put(u
->sink
);
511 pa_xfree(default_sink_name
);
519 if (default_sink_name
)
520 pa_xfree(default_sink_name
);
527 void pa__done(pa_module
*m
) {
532 if (!(u
= m
->userdata
))
536 pa_sink_unlink(u
->sink
);
539 pa_asyncmsgq_send(u
->thread_mq
->inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
540 pa_thread_free(u
->thread
);
544 pa_thread_mq_done(u
->thread_mq
);
545 pa_xfree(u
->thread_mq
);
548 if (u
->thread_mainloop
)
549 pa_mainloop_free(u
->thread_mainloop
);
552 pa_xfree(u
->cookie_file
);
554 if (u
->remote_sink_name
)
555 pa_xfree(u
->remote_sink_name
);
557 if (u
->remote_server
)
558 pa_xfree(u
->remote_server
);
561 pa_sink_unref(u
->sink
);