#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
-#include <arpa/inet.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
-#include <poll.h>
+#include <math.h>
#include <pulse/rtclock.h>
#include <pulse/timeval.h>
#include <pulsecore/namereg.h>
#include <pulsecore/sample-util.h>
#include <pulsecore/macro.h>
-#include <pulsecore/atomic.h>
-#include <pulsecore/atomic.h>
-#include <pulsecore/time-smoother.h>
#include <pulsecore/socket-util.h>
+#include <pulsecore/atomic.h>
#include <pulsecore/once.h>
+#include <pulsecore/poll.h>
+#include <pulsecore/arpa-inet.h>
#include "module-rtp-recv-symdef.h"
PA_MODULE_AUTHOR("Lennart Poettering");
PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
PA_MODULE_VERSION(PACKAGE_VERSION);
-PA_MODULE_LOAD_ONCE(FALSE);
+PA_MODULE_LOAD_ONCE(false);
PA_MODULE_USAGE(
"sink=<name of the sink> "
"sap_address=<multicast address to listen on> "
pa_sink_input *sink_input;
pa_memblockq *memblockq;
- pa_bool_t first_packet;
+ bool first_packet;
uint32_t ssrc;
uint32_t offset;
pa_atomic_t timestamp;
- pa_smoother *smoother;
pa_usec_t intended_latency;
pa_usec_t sink_latency;
pa_usec_t last_rate_update;
+ pa_usec_t last_latency;
+ double estimated_rate;
+ double avg_estimated_rate;
};
struct userdata {
pa_sink_input_assert_ref(i);
pa_assert_se(s = i->userdata);
+ pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
session_free(s);
}
/* Called from IO context */
-static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) {
+static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
struct session *s;
pa_sink_input_assert_ref(i);
pa_assert_se(s = i->userdata);
- if (b) {
- pa_smoother_pause(s->smoother, pa_rtclock_now());
+ if (b)
pa_memblockq_flush_read(s->memblockq);
- } else
- s->first_packet = FALSE;
+ else
+ s->first_packet = false;
}
/* Called from I/O thread context */
}
if (!s->first_packet) {
- s->first_packet = TRUE;
+ s->first_packet = true;
s->ssrc = s->rtp_context.ssrc;
s->offset = s->rtp_context.timestamp;
else
delta = j;
- pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
+ pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, true);
if (now.tv_sec == 0) {
PA_ONCE_BEGIN {
} else
pa_rtclock_from_wallclock(&now);
- pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec));
-
- /* Tell the smoother that we are rolling now, in case it is still paused */
- pa_smoother_resume(s->smoother, pa_timeval_load(&now), TRUE);
-
if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
pa_log_warn("Queue overrun");
- pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
+ pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
}
/* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
pa_atomic_store(&s->timestamp, (int) now.tv_sec);
if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
- pa_usec_t wi, ri, render_delay, sink_delay = 0, latency, fix;
- unsigned fix_samples;
+ pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
+ uint32_t base_rate = s->sink_input->sink->sample_spec.rate;
+ uint32_t current_rate = s->sink_input->sample_spec.rate;
+ uint32_t new_rate;
+ double estimated_rate, alpha = 0.02;
pa_log_debug("Updating sample rate");
- wi = pa_smoother_get(s->smoother, pa_timeval_load(&now));
+ wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
else
latency = wi - ri;
- pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
-
- /* Calculate deviation */
- if (latency < s->intended_latency)
- fix = s->intended_latency - latency;
- else
- fix = latency - s->intended_latency;
-
- /* How many samples is this per second? */
- fix_samples = (unsigned) (fix * (pa_usec_t) s->sink_input->thread_info.sample_spec.rate / (pa_usec_t) RATE_UPDATE_INTERVAL);
-
- /* Check if deviation is in bounds */
- if (fix_samples > s->sink_input->sample_spec.rate*.50)
- pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples);
- else {
- /* Fix up rate */
- if (latency < s->intended_latency)
- s->sink_input->sample_spec.rate -= fix_samples;
- else
- s->sink_input->sample_spec.rate += fix_samples;
-
- if (s->sink_input->sample_spec.rate > PA_RATE_MAX)
- s->sink_input->sample_spec.rate = PA_RATE_MAX;
+ pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
+
+ /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
+ * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
+ * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
+ * T
+ * R̂ = ─────────────── Rⁿ . (1)
+ * T - (Lⁿ - Lⁿ⁻ⁱ)
+ *
+ * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
+ * is correct). But there is also the requirement to keep the buffer at a predefined target
+ * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
+ * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
+ * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
+ * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
+ * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
+ * ʲ⁼ⁱ R̂ a a
+ * Solving for Rⁿ⁺ⁱ gives
+ * T - ²∕ₐ₊₁(L̂ - Lⁿ)
+ * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
+ * T
+ * In the code below a = 7 is used.
+ *
+ * Equation (1) is not directly used in (2), but instead an exponentially weighted average
+ * of the estimated rate R̂ is used. This average R̅ is defined as
+ * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
+ * Because it is difficult to find a fixed value for the coefficient α such that the
+ * averaging is without significant lag but oscillations are filtered out, a heuristic is
+ * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
+ * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
+ */
+ estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
+ if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
+ double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
+ alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
+ }
+ s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
+ s->estimated_rate = estimated_rate;
+ pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
+ new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
+ s->last_latency = latency;
+
+ if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
+ pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
+ new_rate = base_rate;
+ } else {
+ if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
+ new_rate = base_rate;
+ /* Do the adjustment in small steps; 2‰ can be considered inaudible */
+ if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
+ pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
+ new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
+ }
}
+ s->sink_input->sample_spec.rate = new_rate;
pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
if (pa_memblockq_is_readable(s->memblockq) &&
s->sink_input->thread_info.underrun_for > 0) {
pa_log_debug("Requesting rewind due to end of underrun");
- pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
+ pa_sink_input_request_rewind(s->sink_input,
+ (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
+ false, true, false);
}
return 1;
pa_assert_se(s = i->userdata);
pa_assert(!s->rtpoll_item);
- s->rtpoll_item = pa_rtpoll_item_new(i->sink->rtpoll, PA_RTPOLL_LATE, 1);
+ s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
p->fd = s->rtp_context.fd;
pa_assert(salen > 0);
af = sa->sa_family;
- if ((fd = socket(af, SOCK_DGRAM, 0)) < 0) {
+ if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
pa_log("Failed to create socket: %s", pa_cstrerror(errno));
goto fail;
}
pa_make_udp_socket_low_delay(fd);
+#ifdef SO_TIMESTAMP
one = 1;
if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
goto fail;
}
+#else
+ pa_log("SO_TIMESTAMP unsupported on this platform");
+ goto fail;
+#endif
one = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
#ifdef HAVE_IPV6
- } else {
+ } else if (af == AF_INET6) {
struct ipv6_mreq mr6;
memset(&mr6, 0, sizeof(mr6));
mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
#endif
- }
+ } else
+ pa_assert_not_reached();
if (r < 0) {
pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
s = pa_xnew0(struct session, 1);
s->userdata = u;
- s->first_packet = FALSE;
+ s->first_packet = false;
s->sdp_info = *sdp_info;
s->rtpoll_item = NULL;
s->intended_latency = LATENCY_USEC;
- s->smoother = pa_smoother_new(
- PA_USEC_PER_SEC*5,
- PA_USEC_PER_SEC*2,
- TRUE,
- TRUE,
- 10,
- pa_timeval_load(&now),
- TRUE);
s->last_rate_update = pa_timeval_load(&now);
+ s->last_latency = LATENCY_USEC;
+ s->estimated_rate = (double) sink->sample_spec.rate;
+ s->avg_estimated_rate = (double) sink->sample_spec.rate;
pa_atomic_store(&s->timestamp, (int) now.tv_sec);
if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
goto fail;
pa_sink_input_new_data_init(&data);
- data.sink = sink;
+ pa_sink_input_new_data_set_sink(&data, sink, false);
data.driver = __FILE__;
pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
data.module = u->module;
pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
+ data.flags = PA_SINK_INPUT_VARIABLE_RATE;
- pa_sink_input_new(&s->sink_input, u->module->core, &data, PA_SINK_INPUT_VARIABLE_RATE);
+ pa_sink_input_new(&s->sink_input, u->module->core, &data);
pa_sink_input_new_data_done(&data);
if (!s->sink_input) {
s->intended_latency = s->sink_latency*2;
s->memblockq = pa_memblockq_new(
+ "module-rtp-recv memblockq",
0,
MEMBLOCKQ_MAXLENGTH,
MEMBLOCKQ_MAXLENGTH,
- pa_frame_size(&s->sink_input->sample_spec),
+ &s->sink_input->sample_spec,
pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
0,
0,
PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
pa_assert(s->userdata->n_sessions >= 1);
s->userdata->n_sessions--;
- pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
pa_memblockq_free(s->memblockq);
pa_sdp_info_destroy(&s->sdp_info);
pa_rtp_context_destroy(&s->rtp_context);
- pa_smoother_free(s->smoother);
-
pa_xfree(s);
}
static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
struct userdata *u = userdata;
- pa_bool_t goodbye = FALSE;
+ bool goodbye = false;
pa_sdp_info info;
struct session *s;
if (goodbye) {
- if ((s = pa_hashmap_get(u->by_origin, info.origin)))
+ if ((s = pa_hashmap_remove(u->by_origin, info.origin)))
session_free(s);
pa_sdp_info_destroy(&info);
k = pa_atomic_load(&s->timestamp);
- if (k + DEATH_TIMEOUT < now.tv_sec)
+ if (k + DEATH_TIMEOUT < now.tv_sec) {
+ pa_hashmap_remove(u->by_origin, s->sdp_info.origin);
session_free(s);
+ }
}
/* Restart timer */
PA_LLIST_HEAD_INIT(struct session, u->sessions);
u->n_sessions = 0;
- u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
+ u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
void pa__done(pa_module*m) {
struct userdata *u;
- struct session *s;
pa_assert(m);
pa_sap_context_destroy(&u->sap_context);
- if (u->by_origin) {
- while ((s = pa_hashmap_first(u->by_origin)))
- session_free(s);
-
- pa_hashmap_free(u->by_origin, NULL, NULL);
- }
+ if (u->by_origin)
+ pa_hashmap_free(u->by_origin);
pa_xfree(u->sink_name);
pa_xfree(u);