2 This file is part of PulseAudio.
4 Copyright 2013 Alexander Couzens
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26 #include <pulse/context.h>
27 #include <pulse/timeval.h>
28 #include <pulse/xmalloc.h>
29 #include <pulse/stream.h>
30 #include <pulse/mainloop.h>
31 #include <pulse/introspect.h>
32 #include <pulse/error.h>
34 #include <pulsecore/core.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/i18n.h>
37 #include <pulsecore/source.h>
38 #include <pulsecore/modargs.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/thread.h>
41 #include <pulsecore/thread-mq.h>
42 #include <pulsecore/poll.h>
43 #include <pulsecore/proplist-util.h>
45 #include "module-tunnel-source-new-symdef.h"
47 PA_MODULE_AUTHOR("Alexander Couzens");
48 PA_MODULE_DESCRIPTION("Create a network source which connects via a stream to a remote PulseAudio server");
49 PA_MODULE_VERSION(PACKAGE_VERSION
);
50 PA_MODULE_LOAD_ONCE(false);
53 "source=<name of the remote source> "
54 "source_name=<name for the local source> "
55 "source_properties=<properties for the local source> "
56 "format=<sample format> "
57 "channels=<number of channels> "
59 "channel_map=<channel map> "
60 "cookie=<cookie file path>"
63 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
65 static void stream_state_cb(pa_stream
*stream
, void *userdata
);
66 static void stream_read_cb(pa_stream
*s
, size_t length
, void *userdata
);
67 static void context_state_cb(pa_context
*c
, void *userdata
);
68 static void source_update_requested_latency_cb(pa_source
*s
);
74 pa_thread_mq
*thread_mq
;
75 pa_mainloop
*thread_mainloop
;
76 pa_mainloop_api
*thread_mainloop_api
;
81 bool update_stream_bufferattr_after_connect
;
87 char *remote_source_name
;
90 static const char* const valid_modargs
[] = {
100 /* "reconnect", reconnect if server comes back again - unimplemented */
104 static void reset_bufferattr(pa_buffer_attr
*bufferattr
) {
105 pa_assert(bufferattr
);
106 bufferattr
->fragsize
= (uint32_t) -1;
107 bufferattr
->minreq
= (uint32_t) -1;
108 bufferattr
->maxlength
= (uint32_t) -1;
109 bufferattr
->prebuf
= (uint32_t) -1;
110 bufferattr
->tlength
= (uint32_t) -1;
113 static pa_proplist
* tunnel_new_proplist(struct userdata
*u
) {
114 pa_proplist
*proplist
= pa_proplist_new();
116 pa_proplist_sets(proplist
, PA_PROP_APPLICATION_NAME
, "PulseAudio");
117 pa_proplist_sets(proplist
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
118 pa_proplist_sets(proplist
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
119 pa_init_proplist(proplist
);
124 static void stream_read_cb(pa_stream
*s
, size_t length
, void *userdata
) {
125 struct userdata
*u
= userdata
;
129 /* called from io context to read samples from the stream into our source */
130 static void read_new_samples(struct userdata
*u
) {
133 pa_memchunk memchunk
;
138 pa_memchunk_reset(&memchunk
);
140 if (PA_UNLIKELY(!u
->connected
|| pa_stream_get_state(u
->stream
) != PA_STREAM_READY
))
143 readable
= pa_stream_readable_size(u
->stream
);
144 while (readable
> 0) {
146 if (PA_UNLIKELY(pa_stream_peek(u
->stream
, &p
, &read
) != 0)) {
147 pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u
->context
)));
148 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
153 /* we have valid data */
154 memchunk
.memblock
= pa_memblock_new_fixed(u
->module
->core
->mempool
, (void *) p
, read
, true);
155 memchunk
.length
= read
;
158 pa_source_post(u
->source
, &memchunk
);
159 pa_memblock_unref_fixed(memchunk
.memblock
);
161 size_t bytes_to_generate
= read
;
163 /* we have a hole. generate silence */
164 memchunk
= u
->source
->silence
;
165 pa_memblock_ref(memchunk
.memblock
);
167 while (bytes_to_generate
> 0) {
168 if (bytes_to_generate
< memchunk
.length
)
169 memchunk
.length
= bytes_to_generate
;
171 pa_source_post(u
->source
, &memchunk
);
172 bytes_to_generate
-= memchunk
.length
;
175 pa_memblock_unref(memchunk
.memblock
);
178 pa_stream_drop(u
->stream
);
183 static void thread_func(void *userdata
) {
184 struct userdata
*u
= userdata
;
185 pa_proplist
*proplist
;
189 pa_log_debug("Thread starting up");
190 pa_thread_mq_install(u
->thread_mq
);
192 proplist
= tunnel_new_proplist(u
);
193 u
->context
= pa_context_new_with_proplist(u
->thread_mainloop_api
,
196 pa_proplist_free(proplist
);
199 pa_log("Failed to create libpulse context");
203 if (u
->cookie_file
&& pa_context_load_cookie_from_file(u
->context
, u
->cookie_file
) != 0) {
204 pa_log_error("Can not load cookie file!");
208 pa_context_set_state_callback(u
->context
, context_state_cb
, u
);
209 if (pa_context_connect(u
->context
,
211 PA_CONTEXT_NOAUTOSPAWN
,
213 pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u
->context
)));
220 if (pa_mainloop_iterate(u
->thread_mainloop
, 1, &ret
) < 0) {
231 pa_asyncmsgq_post(u
->thread_mq
->outq
, PA_MSGOBJECT(u
->module
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
232 pa_asyncmsgq_wait_for(u
->thread_mq
->inq
, PA_MESSAGE_SHUTDOWN
);
236 pa_stream_disconnect(u
->stream
);
237 pa_stream_unref(u
->stream
);
242 pa_context_disconnect(u
->context
);
243 pa_context_unref(u
->context
);
247 pa_log_debug("Thread shutting down");
250 static void stream_state_cb(pa_stream
*stream
, void *userdata
) {
251 struct userdata
*u
= userdata
;
255 switch (pa_stream_get_state(stream
)) {
256 case PA_STREAM_FAILED
:
257 pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u
->context
)));
258 u
->connected
= false;
259 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
261 case PA_STREAM_TERMINATED
:
262 pa_log_debug("Stream terminated.");
264 case PA_STREAM_READY
:
265 /* Only call our requested_latency_cb when requested_latency
266 * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
267 * we don't want to override the initial fragsize set by the server
268 * without a good reason. */
269 if (u
->update_stream_bufferattr_after_connect
)
270 source_update_requested_latency_cb(u
->source
);
271 case PA_STREAM_UNCONNECTED
:
272 case PA_STREAM_CREATING
:
277 static void context_state_cb(pa_context
*c
, void *userdata
) {
278 struct userdata
*u
= userdata
;
281 switch (pa_context_get_state(c
)) {
282 case PA_CONTEXT_UNCONNECTED
:
283 case PA_CONTEXT_CONNECTING
:
284 case PA_CONTEXT_AUTHORIZING
:
285 case PA_CONTEXT_SETTING_NAME
:
287 case PA_CONTEXT_READY
: {
288 pa_proplist
*proplist
;
289 pa_buffer_attr bufferattr
;
290 pa_usec_t requested_latency
;
291 char *username
= pa_get_user_name_malloc();
292 char *hostname
= pa_get_host_name_malloc();
293 /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis@lazus' */
294 char *stream_name
= pa_sprintf_malloc(_("Tunnel for %s@%s"), username
, hostname
);
298 pa_log_debug("Connection successful. Creating stream.");
299 pa_assert(!u
->stream
);
301 proplist
= tunnel_new_proplist(u
);
302 u
->stream
= pa_stream_new_with_proplist(u
->context
,
304 &u
->source
->sample_spec
,
305 &u
->source
->channel_map
,
307 pa_proplist_free(proplist
);
308 pa_xfree(stream_name
);
311 pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u
->context
)));
312 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
316 requested_latency
= pa_source_get_requested_latency_within_thread(u
->source
);
317 if (requested_latency
== (uint32_t) -1)
318 requested_latency
= u
->source
->thread_info
.max_latency
;
320 reset_bufferattr(&bufferattr
);
321 bufferattr
.fragsize
= pa_usec_to_bytes(requested_latency
, &u
->source
->sample_spec
);
323 pa_stream_set_state_callback(u
->stream
, stream_state_cb
, userdata
);
324 pa_stream_set_read_callback(u
->stream
, stream_read_cb
, userdata
);
325 if (pa_stream_connect_record(u
->stream
,
326 u
->remote_source_name
,
328 PA_STREAM_INTERPOLATE_TIMING
|PA_STREAM_DONT_MOVE
|PA_STREAM_AUTO_TIMING_UPDATE
) < 0) {
329 pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u
->context
)));
330 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
335 case PA_CONTEXT_FAILED
:
336 pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u
->context
)));
337 u
->connected
= false;
338 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
340 case PA_CONTEXT_TERMINATED
:
341 pa_log_debug("Context terminated.");
342 u
->connected
= false;
343 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
348 static void source_update_requested_latency_cb(pa_source
*s
) {
350 pa_operation
*operation
;
352 pa_usec_t block_usec
;
353 pa_buffer_attr bufferattr
;
355 pa_source_assert_ref(s
);
356 pa_assert_se(u
= s
->userdata
);
358 block_usec
= pa_source_get_requested_latency_within_thread(s
);
359 if (block_usec
== (pa_usec_t
) -1)
360 block_usec
= s
->thread_info
.max_latency
;
362 nbytes
= pa_usec_to_bytes(block_usec
, &s
->sample_spec
);
365 switch (pa_stream_get_state(u
->stream
)) {
366 case PA_STREAM_READY
:
367 if (pa_stream_get_buffer_attr(u
->stream
)->fragsize
== nbytes
)
370 reset_bufferattr(&bufferattr
);
371 bufferattr
.fragsize
= nbytes
;
372 if ((operation
= pa_stream_set_buffer_attr(u
->stream
, &bufferattr
, NULL
, NULL
)))
373 pa_operation_unref(operation
);
375 case PA_STREAM_CREATING
:
376 /* we have to delay our request until stream is ready */
377 u
->update_stream_bufferattr_after_connect
= true;
385 static int source_process_msg_cb(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
386 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
389 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
391 pa_usec_t remote_latency
;
393 if (!PA_SOURCE_IS_LINKED(u
->source
->thread_info
.state
)) {
394 *((pa_usec_t
*) data
) = 0;
399 *((pa_usec_t
*) data
) = 0;
403 if (pa_stream_get_state(u
->stream
) != PA_STREAM_READY
) {
404 *((pa_usec_t
*) data
) = 0;
408 if (pa_stream_get_latency(u
->stream
, &remote_latency
, &negative
) < 0) {
409 *((pa_usec_t
*) data
) = 0;
414 *((pa_usec_t
*) data
) = 0;
416 *((pa_usec_t
*) data
) = remote_latency
;
421 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
424 int pa__init(pa_module
*m
) {
425 struct userdata
*u
= NULL
;
426 pa_modargs
*ma
= NULL
;
427 pa_source_new_data source_data
;
430 const char *remote_server
= NULL
;
431 const char *source_name
= NULL
;
432 char *default_source_name
= NULL
;
436 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
437 pa_log("Failed to parse module arguments.");
441 ss
= m
->core
->default_sample_spec
;
442 map
= m
->core
->default_channel_map
;
443 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
444 pa_log("Invalid sample format specification or channel map");
448 remote_server
= pa_modargs_get_value(ma
, "server", NULL
);
449 if (!remote_server
) {
450 pa_log("No server given!");
454 u
= pa_xnew0(struct userdata
, 1);
457 u
->remote_server
= pa_xstrdup(remote_server
);
458 u
->thread_mainloop
= pa_mainloop_new();
459 if (u
->thread_mainloop
== NULL
) {
460 pa_log("Failed to create mainloop");
463 u
->thread_mainloop_api
= pa_mainloop_get_api(u
->thread_mainloop
);
464 u
->cookie_file
= pa_xstrdup(pa_modargs_get_value(ma
, "cookie", NULL
));
465 u
->remote_source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));
467 u
->thread_mq
= pa_xnew0(pa_thread_mq
, 1);
468 pa_thread_mq_init_thread_mainloop(u
->thread_mq
, m
->core
->mainloop
, u
->thread_mainloop_api
);
471 pa_source_new_data_init(&source_data
);
472 source_data
.driver
= __FILE__
;
473 source_data
.module
= m
;
475 default_source_name
= pa_sprintf_malloc("tunnel-source-new.%s", remote_server
);
476 source_name
= pa_modargs_get_value(ma
, "source_name", default_source_name
);
478 pa_source_new_data_set_name(&source_data
, source_name
);
479 pa_source_new_data_set_sample_spec(&source_data
, &ss
);
480 pa_source_new_data_set_channel_map(&source_data
, &map
);
482 pa_proplist_sets(source_data
.proplist
, PA_PROP_DEVICE_CLASS
, "sound");
483 pa_proplist_setf(source_data
.proplist
,
484 PA_PROP_DEVICE_DESCRIPTION
,
485 _("Tunnel to %s/%s"),
487 pa_strempty(u
->remote_source_name
));
489 if (pa_modargs_get_proplist(ma
, "source_properties", source_data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
490 pa_log("Invalid properties");
491 pa_source_new_data_done(&source_data
);
494 if (!(u
->source
= pa_source_new(m
->core
, &source_data
, PA_SOURCE_LATENCY
| PA_SOURCE_DYNAMIC_LATENCY
| PA_SOURCE_NETWORK
))) {
495 pa_log("Failed to create source.");
496 pa_source_new_data_done(&source_data
);
500 pa_source_new_data_done(&source_data
);
501 u
->source
->userdata
= u
;
502 u
->source
->parent
.process_msg
= source_process_msg_cb
;
503 u
->source
->update_requested_latency
= source_update_requested_latency_cb
;
505 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
->inq
);
507 if (!(u
->thread
= pa_thread_new("tunnel-source", thread_func
, u
))) {
508 pa_log("Failed to create thread.");
512 pa_source_put(u
->source
);
514 pa_xfree(default_source_name
);
522 if (default_source_name
)
523 pa_xfree(default_source_name
);
530 void pa__done(pa_module
*m
) {
535 if (!(u
= m
->userdata
))
539 pa_source_unlink(u
->source
);
542 pa_asyncmsgq_send(u
->thread_mq
->inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
543 pa_thread_free(u
->thread
);
547 pa_thread_mq_done(u
->thread_mq
);
548 pa_xfree(u
->thread_mq
);
551 if (u
->thread_mainloop
)
552 pa_mainloop_free(u
->thread_mainloop
);
555 pa_xfree(u
->cookie_file
);
557 if (u
->remote_source_name
)
558 pa_xfree(u
->remote_source_name
);
560 if (u
->remote_server
)
561 pa_xfree(u
->remote_server
);
564 pa_source_unref(u
->source
);