2 This file is part of PulseAudio.
4 Copyright 2013 Alexander Couzens
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26 #include <pulse/context.h>
27 #include <pulse/timeval.h>
28 #include <pulse/xmalloc.h>
29 #include <pulse/stream.h>
30 #include <pulse/mainloop.h>
31 #include <pulse/introspect.h>
32 #include <pulse/error.h>
34 #include <pulsecore/core.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/i18n.h>
37 #include <pulsecore/source.h>
38 #include <pulsecore/modargs.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/thread.h>
41 #include <pulsecore/thread-mq.h>
42 #include <pulsecore/poll.h>
43 #include <pulsecore/proplist-util.h>
45 #include "module-tunnel-source-new-symdef.h"
47 PA_MODULE_AUTHOR("Alexander Couzens");
48 PA_MODULE_DESCRIPTION("Create a network source which connects via a stream to a remote PulseAudio server");
49 PA_MODULE_VERSION(PACKAGE_VERSION
);
50 PA_MODULE_LOAD_ONCE(false);
53 "source=<name of the remote source> "
54 "source_name=<name for the local source> "
55 "source_properties=<properties for the local source> "
56 "format=<sample format> "
57 "channels=<number of channels> "
59 "channel_map=<channel map>"
62 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
64 static void stream_state_cb(pa_stream
*stream
, void *userdata
);
65 static void stream_read_cb(pa_stream
*s
, size_t length
, void *userdata
);
66 static void context_state_cb(pa_context
*c
, void *userdata
);
67 static void source_update_requested_latency_cb(pa_source
*s
);
73 pa_thread_mq
*thread_mq
;
74 pa_mainloop
*thread_mainloop
;
75 pa_mainloop_api
*thread_mainloop_api
;
80 bool update_stream_bufferattr_after_connect
;
85 char *remote_source_name
;
88 static const char* const valid_modargs
[] = {
97 /* "cookie", unimplemented */
98 /* "reconnect", reconnect if server comes back again - unimplemented */
102 static void reset_bufferattr(pa_buffer_attr
*bufferattr
) {
103 pa_assert(bufferattr
);
104 bufferattr
->fragsize
= (uint32_t) -1;
105 bufferattr
->minreq
= (uint32_t) -1;
106 bufferattr
->maxlength
= (uint32_t) -1;
107 bufferattr
->prebuf
= (uint32_t) -1;
108 bufferattr
->tlength
= (uint32_t) -1;
111 static pa_proplist
* tunnel_new_proplist(struct userdata
*u
) {
112 pa_proplist
*proplist
= pa_proplist_new();
114 pa_proplist_sets(proplist
, PA_PROP_APPLICATION_NAME
, "PulseAudio");
115 pa_proplist_sets(proplist
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
116 pa_proplist_sets(proplist
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
117 pa_init_proplist(proplist
);
122 static void stream_read_cb(pa_stream
*s
, size_t length
, void *userdata
) {
123 struct userdata
*u
= userdata
;
127 /* called from io context to read samples from the stream into our source */
128 static void read_new_samples(struct userdata
*u
) {
131 pa_memchunk memchunk
;
136 pa_memchunk_reset(&memchunk
);
138 if (PA_UNLIKELY(!u
->connected
|| pa_stream_get_state(u
->stream
) != PA_STREAM_READY
))
141 readable
= pa_stream_readable_size(u
->stream
);
142 while (readable
> 0) {
144 if (PA_UNLIKELY(pa_stream_peek(u
->stream
, &p
, &read
) != 0)) {
145 pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u
->context
)));
146 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
151 /* we have valid data */
152 memchunk
.memblock
= pa_memblock_new_fixed(u
->module
->core
->mempool
, (void *) p
, read
, true);
153 memchunk
.length
= read
;
156 pa_source_post(u
->source
, &memchunk
);
157 pa_memblock_unref_fixed(memchunk
.memblock
);
159 size_t bytes_to_generate
= read
;
161 /* we have a hole. generate silence */
162 memchunk
= u
->source
->silence
;
163 pa_memblock_ref(memchunk
.memblock
);
165 while (bytes_to_generate
> 0) {
166 if (bytes_to_generate
< memchunk
.length
)
167 memchunk
.length
= bytes_to_generate
;
169 pa_source_post(u
->source
, &memchunk
);
170 bytes_to_generate
-= memchunk
.length
;
173 pa_memblock_unref(memchunk
.memblock
);
176 pa_stream_drop(u
->stream
);
181 static void thread_func(void *userdata
) {
182 struct userdata
*u
= userdata
;
183 pa_proplist
*proplist
;
187 pa_log_debug("Thread starting up");
188 pa_thread_mq_install(u
->thread_mq
);
190 proplist
= tunnel_new_proplist(u
);
191 u
->context
= pa_context_new_with_proplist(u
->thread_mainloop_api
,
194 pa_proplist_free(proplist
);
197 pa_log("Failed to create libpulse context");
201 pa_context_set_state_callback(u
->context
, context_state_cb
, u
);
202 if (pa_context_connect(u
->context
,
204 PA_CONTEXT_NOAUTOSPAWN
,
206 pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u
->context
)));
213 if (pa_mainloop_iterate(u
->thread_mainloop
, 1, &ret
) < 0) {
224 pa_asyncmsgq_post(u
->thread_mq
->outq
, PA_MSGOBJECT(u
->module
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
225 pa_asyncmsgq_wait_for(u
->thread_mq
->inq
, PA_MESSAGE_SHUTDOWN
);
229 pa_stream_disconnect(u
->stream
);
230 pa_stream_unref(u
->stream
);
235 pa_context_disconnect(u
->context
);
236 pa_context_unref(u
->context
);
240 pa_log_debug("Thread shutting down");
243 static void stream_state_cb(pa_stream
*stream
, void *userdata
) {
244 struct userdata
*u
= userdata
;
248 switch (pa_stream_get_state(stream
)) {
249 case PA_STREAM_FAILED
:
250 pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u
->context
)));
251 u
->connected
= false;
252 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
254 case PA_STREAM_TERMINATED
:
255 pa_log_debug("Stream terminated.");
257 case PA_STREAM_READY
:
258 /* Only call our requested_latency_cb when requested_latency
259 * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
260 * we don't want to override the initial fragsize set by the server
261 * without a good reason. */
262 if (u
->update_stream_bufferattr_after_connect
)
263 source_update_requested_latency_cb(u
->source
);
264 case PA_STREAM_UNCONNECTED
:
265 case PA_STREAM_CREATING
:
270 static void context_state_cb(pa_context
*c
, void *userdata
) {
271 struct userdata
*u
= userdata
;
274 switch (pa_context_get_state(c
)) {
275 case PA_CONTEXT_UNCONNECTED
:
276 case PA_CONTEXT_CONNECTING
:
277 case PA_CONTEXT_AUTHORIZING
:
278 case PA_CONTEXT_SETTING_NAME
:
280 case PA_CONTEXT_READY
: {
281 pa_proplist
*proplist
;
282 pa_buffer_attr bufferattr
;
283 pa_usec_t requested_latency
;
284 char *username
= pa_get_user_name_malloc();
285 char *hostname
= pa_get_host_name_malloc();
286 /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis@lazus' */
287 char *stream_name
= pa_sprintf_malloc(_("Tunnel for %s@%s"), username
, hostname
);
291 pa_log_debug("Connection successful. Creating stream.");
292 pa_assert(!u
->stream
);
294 proplist
= tunnel_new_proplist(u
);
295 u
->stream
= pa_stream_new_with_proplist(u
->context
,
297 &u
->source
->sample_spec
,
298 &u
->source
->channel_map
,
300 pa_proplist_free(proplist
);
301 pa_xfree(stream_name
);
304 pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u
->context
)));
305 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
309 requested_latency
= pa_source_get_requested_latency_within_thread(u
->source
);
310 if (requested_latency
== (uint32_t) -1)
311 requested_latency
= u
->source
->thread_info
.max_latency
;
313 reset_bufferattr(&bufferattr
);
314 bufferattr
.fragsize
= pa_usec_to_bytes(requested_latency
, &u
->source
->sample_spec
);
316 pa_stream_set_state_callback(u
->stream
, stream_state_cb
, userdata
);
317 pa_stream_set_read_callback(u
->stream
, stream_read_cb
, userdata
);
318 if (pa_stream_connect_record(u
->stream
,
319 u
->remote_source_name
,
321 PA_STREAM_INTERPOLATE_TIMING
|PA_STREAM_DONT_MOVE
|PA_STREAM_AUTO_TIMING_UPDATE
) < 0) {
322 pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u
->context
)));
323 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
328 case PA_CONTEXT_FAILED
:
329 pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u
->context
)));
330 u
->connected
= false;
331 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
333 case PA_CONTEXT_TERMINATED
:
334 pa_log_debug("Context terminated.");
335 u
->connected
= false;
336 u
->thread_mainloop_api
->quit(u
->thread_mainloop_api
, TUNNEL_THREAD_FAILED_MAINLOOP
);
341 static void source_update_requested_latency_cb(pa_source
*s
) {
343 pa_operation
*operation
;
345 pa_usec_t block_usec
;
346 pa_buffer_attr bufferattr
;
348 pa_source_assert_ref(s
);
349 pa_assert_se(u
= s
->userdata
);
351 block_usec
= pa_source_get_requested_latency_within_thread(s
);
352 if (block_usec
== (pa_usec_t
) -1)
353 block_usec
= s
->thread_info
.max_latency
;
355 nbytes
= pa_usec_to_bytes(block_usec
, &s
->sample_spec
);
358 switch (pa_stream_get_state(u
->stream
)) {
359 case PA_STREAM_READY
:
360 if (pa_stream_get_buffer_attr(u
->stream
)->fragsize
== nbytes
)
363 reset_bufferattr(&bufferattr
);
364 bufferattr
.fragsize
= nbytes
;
365 if ((operation
= pa_stream_set_buffer_attr(u
->stream
, &bufferattr
, NULL
, NULL
)))
366 pa_operation_unref(operation
);
368 case PA_STREAM_CREATING
:
369 /* we have to delay our request until stream is ready */
370 u
->update_stream_bufferattr_after_connect
= true;
378 static int source_process_msg_cb(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
379 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
382 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
384 pa_usec_t remote_latency
;
386 if (!PA_SOURCE_IS_LINKED(u
->source
->thread_info
.state
)) {
387 *((pa_usec_t
*) data
) = 0;
392 *((pa_usec_t
*) data
) = 0;
396 if (pa_stream_get_state(u
->stream
) != PA_STREAM_READY
) {
397 *((pa_usec_t
*) data
) = 0;
401 if (pa_stream_get_latency(u
->stream
, &remote_latency
, &negative
) < 0) {
402 *((pa_usec_t
*) data
) = 0;
407 *((pa_usec_t
*) data
) = 0;
409 *((pa_usec_t
*) data
) = remote_latency
;
414 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
417 int pa__init(pa_module
*m
) {
418 struct userdata
*u
= NULL
;
419 pa_modargs
*ma
= NULL
;
420 pa_source_new_data source_data
;
423 const char *remote_server
= NULL
;
424 const char *source_name
= NULL
;
425 char *default_source_name
= NULL
;
429 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
430 pa_log("Failed to parse module arguments.");
434 ss
= m
->core
->default_sample_spec
;
435 map
= m
->core
->default_channel_map
;
436 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
437 pa_log("Invalid sample format specification or channel map");
441 remote_server
= pa_modargs_get_value(ma
, "server", NULL
);
442 if (!remote_server
) {
443 pa_log("No server given!");
447 u
= pa_xnew0(struct userdata
, 1);
450 u
->remote_server
= pa_xstrdup(remote_server
);
451 u
->thread_mainloop
= pa_mainloop_new();
452 if (u
->thread_mainloop
== NULL
) {
453 pa_log("Failed to create mainloop");
456 u
->thread_mainloop_api
= pa_mainloop_get_api(u
->thread_mainloop
);
457 u
->remote_source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));
459 u
->thread_mq
= pa_xnew0(pa_thread_mq
, 1);
460 pa_thread_mq_init_thread_mainloop(u
->thread_mq
, m
->core
->mainloop
, u
->thread_mainloop_api
);
463 pa_source_new_data_init(&source_data
);
464 source_data
.driver
= __FILE__
;
465 source_data
.module
= m
;
467 default_source_name
= pa_sprintf_malloc("tunnel-source-new.%s", remote_server
);
468 source_name
= pa_modargs_get_value(ma
, "source_name", default_source_name
);
470 pa_source_new_data_set_name(&source_data
, source_name
);
471 pa_source_new_data_set_sample_spec(&source_data
, &ss
);
472 pa_source_new_data_set_channel_map(&source_data
, &map
);
474 pa_proplist_sets(source_data
.proplist
, PA_PROP_DEVICE_CLASS
, "sound");
475 pa_proplist_setf(source_data
.proplist
,
476 PA_PROP_DEVICE_DESCRIPTION
,
477 _("Tunnel to %s/%s"),
479 pa_strempty(u
->remote_source_name
));
481 if (pa_modargs_get_proplist(ma
, "source_properties", source_data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
482 pa_log("Invalid properties");
483 pa_source_new_data_done(&source_data
);
486 if (!(u
->source
= pa_source_new(m
->core
, &source_data
, PA_SOURCE_LATENCY
| PA_SOURCE_DYNAMIC_LATENCY
| PA_SOURCE_NETWORK
))) {
487 pa_log("Failed to create source.");
488 pa_source_new_data_done(&source_data
);
492 pa_source_new_data_done(&source_data
);
493 u
->source
->userdata
= u
;
494 u
->source
->parent
.process_msg
= source_process_msg_cb
;
495 u
->source
->update_requested_latency
= source_update_requested_latency_cb
;
497 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
->inq
);
499 if (!(u
->thread
= pa_thread_new("tunnel-source", thread_func
, u
))) {
500 pa_log("Failed to create thread.");
504 pa_source_put(u
->source
);
506 pa_xfree(default_source_name
);
514 if (default_source_name
)
515 pa_xfree(default_source_name
);
522 void pa__done(pa_module
*m
) {
527 if (!(u
= m
->userdata
))
531 pa_source_unlink(u
->source
);
534 pa_asyncmsgq_send(u
->thread_mq
->inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
535 pa_thread_free(u
->thread
);
539 pa_thread_mq_done(u
->thread_mq
);
540 pa_xfree(u
->thread_mq
);
543 if (u
->thread_mainloop
)
544 pa_mainloop_free(u
->thread_mainloop
);
546 if (u
->remote_source_name
)
547 pa_xfree(u
->remote_source_name
);
549 if (u
->remote_server
)
550 pa_xfree(u
->remote_server
);
553 pa_source_unref(u
->source
);