diff --git a/daemon/jitter_buffer.c b/daemon/jitter_buffer.c index 6e36fa4ae..251f801be 100644 --- a/daemon/jitter_buffer.c +++ b/daemon/jitter_buffer.c @@ -18,6 +18,11 @@ #define DELAY_FACTOR 0x64 #define COMFORT_NOISE 0x0D +#define JB_ADAPTIVE_MIN_SAMPLES 0x0A // Minimum samples before calculating buffer size (10) +#define JB_ADAPTIVE_RECALC_INTERVAL 0x0A // Recalculate buffer size every N packets (10) +#define JB_MAX_BURST_SIZE 0x03E8 // Maximum burst size to prevent overflow (1000 packets = 20 sec) +#define JB_MAX_JITTER_US 0x1E8480 // Maximum jitter value (2000000 µs = 2 seconds) + static struct timerthread jitter_buffer_thread; @@ -56,6 +61,11 @@ static void reset_jitter_buffer(struct jitter_buffer *jb) { jb->clock_drift_val = 0; jb->prev_seq_ts = rtpe_now; jb->prev_seq = 0; + jb->jitter_mean = 0.0; + jb->jitter_variance = 0.0; + jb->jitter_m2 = 0.0; + jb->jitter_samples = 0; + jb->dynamic_capacity = 0; jb->num_resets++; if(g_tree_nnodes(jb->ttq.entries) > 0) @@ -92,6 +102,69 @@ static int get_clock_rate(struct media_packet *mp, int payload_type) { return clock_rate; } + +// jb is locked +static int get_target_capacity(struct jitter_buffer *jb) { + if (rtpe_config.jb_adaptive && jb->dynamic_capacity > 0) + return jb->dynamic_capacity; + return rtpe_config.jb_length; +} + +// jb is locked +static void update_jitter_statistics(struct jitter_buffer *jb, int64_t jitter_sample_us) { + if (!rtpe_config.jb_adaptive) + return; + + if (jitter_sample_us < 0 || jitter_sample_us > JB_MAX_JITTER_US) { + ilog(LOG_WARN, "Extreme jitter value detected: %lld µs, ignoring", + (long long)jitter_sample_us); + return; + } + + jb->jitter_samples++; + + double delta = (double)jitter_sample_us - jb->jitter_mean; + jb->jitter_mean += delta / (double)jb->jitter_samples; + double delta2 = (double)jitter_sample_us - jb->jitter_mean; + jb->jitter_m2 += delta * delta2; + + if (jb->jitter_samples > 1) + jb->jitter_variance = jb->jitter_m2 / (double)(jb->jitter_samples - 1); +} + +// jb is locked +static void calculate_adaptive_buffer_size(struct jitter_buffer *jb) { + if (!rtpe_config.jb_adaptive || jb->jitter_samples < JB_ADAPTIVE_MIN_SAMPLES) + return; + + // Protect against negative variance due to floating-point errors + double std_dev_us = sqrt(jb->jitter_variance < 0.0 ? 0.0 : jb->jitter_variance); + + double optimal_buffer_us = jb->jitter_mean + (4.0 * std_dev_us); + + int optimal_buffer_ms = (int)(optimal_buffer_us / 1000.0); + + int min_capacity = rtpe_config.jb_adaptive_min; + int max_capacity = rtpe_config.jb_adaptive_max; + + if (max_capacity <= 0) + max_capacity = 300; + if (min_capacity < 0) + min_capacity = 0; + if (min_capacity > max_capacity) + min_capacity = max_capacity; + + if (optimal_buffer_ms < min_capacity) + optimal_buffer_ms = min_capacity; + if (optimal_buffer_ms > max_capacity) + optimal_buffer_ms = max_capacity; + + jb->dynamic_capacity = optimal_buffer_ms; + + ilog(LOG_DEBUG, "Adaptive JB: mean=%.2fms, stddev=%.2fms, capacity=%dms (samples=%u)", + jb->jitter_mean / 1000.0, std_dev_us / 1000.0, jb->dynamic_capacity, jb->jitter_samples); +} + static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) { if (!(mp->rtp = rtp_payload(&mp->payload, s, NULL))) return NULL; @@ -113,10 +186,100 @@ static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) { return p; } +// jb is locked (temporarily unlocked during operation, then relocked) +static int remove_oldest_packets(struct jitter_buffer *jb, int num_to_remove) { + if (num_to_remove <= 0) + return 0; + + int removed = 0; + mutex_unlock(&jb->lock); + + for (int i = 0; i < num_to_remove; i++) { + struct timerthread_queue_entry *ttqe = rtpe_g_tree_first(jb->ttq.entries); + if (!ttqe) + break; + + g_tree_remove(jb->ttq.entries, ttqe); + if (jb->ttq.entry_free_func) + jb->ttq.entry_free_func(ttqe); + removed++; + } + + mutex_lock(&jb->lock); + return removed; +} + +// jb is locked +static int try_burst_aware_discard(struct jitter_buffer *jb, int current_buffer_size) { + if (!jb->rtptime_delta || !jb->clock_rate || !jb->prev_seq_ts) { + ilog(LOG_DEBUG, "Burst-aware discard: insufficient data for calculation"); + return 0; + } + + int64_t packetization_interval_us = ((int64_t)jb->rtptime_delta * 1000000) / jb->clock_rate; + if (packetization_interval_us <= 0) { + ilog(LOG_DEBUG, "Burst-aware discard: invalid packetization interval"); + return 0; + } + + int64_t delta_t = rtpe_now - jb->prev_seq_ts; + if (delta_t < 0) { + ilog(LOG_DEBUG, "Burst-aware discard: negative time delta"); + return 0; + } + + int64_t burst_calc = delta_t / packetization_interval_us; + if (burst_calc > JB_MAX_BURST_SIZE) { + ilog(LOG_DEBUG, "Burst size exceeds maximum (%d packets), capping", JB_MAX_BURST_SIZE); + burst_calc = JB_MAX_BURST_SIZE; + } + int estimated_burst_size = (int)burst_calc; + + int target_capacity = get_target_capacity(jb); + + int packets_to_remove = estimated_burst_size - target_capacity; + + if (packets_to_remove <= 0) { + ilog(LOG_DEBUG, "Burst-aware discard: no removal needed (burst: %d, capacity: %d)", + estimated_burst_size, target_capacity); + return 1; + } + + if (packets_to_remove >= current_buffer_size) { + ilog(LOG_DEBUG, "Burst-aware discard: would remove all packets (burst: %d, capacity: %d, buffer: %d)", + estimated_burst_size, target_capacity, current_buffer_size); + return 0; + } + + int packets_saved = current_buffer_size - packets_to_remove; + ilog(LOG_DEBUG, "Burst-aware discard: burst of %d packets detected, removing %d (saving %d packets)", + estimated_burst_size, packets_to_remove, packets_saved); + + int removed = remove_oldest_packets(jb, packets_to_remove); + + if (removed > 0) { + ilog(LOG_DEBUG, "Burst-aware discard: successfully removed %d packets", removed); + return 1; + } + + return 0; +} + // jb is locked static void check_buffered_packets(struct jitter_buffer *jb) { - if (g_tree_nnodes(jb->ttq.entries) >= (3* rtpe_config.jb_length)) { - ilog(LOG_DEBUG, "Jitter reset due to buffer overflow"); + int current_buffer_size = g_tree_nnodes(jb->ttq.entries); + int target_capacity = get_target_capacity(jb); + + if (current_buffer_size > target_capacity) { + if (jb->rtptime_delta && jb->clock_rate && jb->prev_seq_ts) { + if (try_burst_aware_discard(jb, current_buffer_size)) { + return; + } + } + } + + if (current_buffer_size >= (3 * rtpe_config.jb_length)) { + ilog(LOG_DEBUG, "Emergency buffer overflow at 3x capacity - forcing reset"); reset_jitter_buffer(jb); } } @@ -311,6 +474,20 @@ int buffer_packet(struct media_packet *mp, const str *s) { // packet consumed? if (ret == 0) p = NULL; + + // Update adaptive jitter buffer statistics + if (rtpe_config.jb_adaptive && jb->first_send && jb->rtptime_delta && jb->clock_rate) { + unsigned long ts = ntohl(mp->rtp->timestamp); + long ts_diff = (uint32_t)ts - (uint32_t)jb->first_send_ts; + int64_t expected_arrival = jb->first_send + (ts_diff * 1000000LL / jb->clock_rate); + + int64_t jitter_us = llabs(rtpe_now - expected_arrival); + + update_jitter_statistics(jb, jitter_us); + + if (jb->jitter_samples % JB_ADAPTIVE_RECALC_INTERVAL == 0) + calculate_adaptive_buffer_size(jb); + } check_buffered_packets(jb); diff --git a/daemon/main.c b/daemon/main.c index 8d198cfde..5f383043d 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -796,6 +796,9 @@ static void options(int *argc, char ***argv, charp_ht templates) { { "endpoint-learning",0,0,G_OPTION_ARG_STRING, &endpoint_learning, "RTP endpoint learning algorithm", "delayed|immediate|off|heuristic" }, { "jitter-buffer",0, 0, G_OPTION_ARG_INT, &rtpe_config.jb_length, "Size of jitter buffer", "INT" }, { "jb-clock-drift",0,0, G_OPTION_ARG_NONE, &rtpe_config.jb_clock_drift,"Compensate for source clock drift",NULL }, + { "jb-adaptive",0,0, G_OPTION_ARG_NONE, &rtpe_config.jb_adaptive,"Enable adaptive jitter buffer sizing",NULL }, + { "jb-adaptive-min",0,0,G_OPTION_ARG_INT, &rtpe_config.jb_adaptive_min,"Minimum adaptive jitter buffer size (ms)","INT" }, + { "jb-adaptive-max",0,0,G_OPTION_ARG_INT, &rtpe_config.jb_adaptive_max,"Maximum adaptive jitter buffer size (ms)","INT" }, { "debug-srtp",0,0, G_OPTION_ARG_NONE, &debug_srtp, "Log raw encryption details for SRTP", NULL }, { "reject-invalid-sdp",0,0, G_OPTION_ARG_NONE, &rtpe_config.reject_invalid_sdp,"Refuse to process SDP bodies with broken syntax", NULL }, { "dtls-rsa-key-size",0, 0, G_OPTION_ARG_INT,&rtpe_config.dtls_rsa_key_size,"Size of RSA key for DTLS", "INT" }, @@ -1268,6 +1271,26 @@ static void options(int *argc, char ***argv, charp_ht templates) { if (rtpe_config.jb_length < 0) die("Invalid negative jitter buffer size"); + + // Validate adaptive jitter buffer parameters + if (rtpe_config.jb_adaptive) { + if (rtpe_config.jb_adaptive_min < 0) + die("Invalid negative --jb-adaptive-min (%d)", rtpe_config.jb_adaptive_min); + if (rtpe_config.jb_adaptive_max < 0) + die("Invalid negative --jb-adaptive-max (%d)", rtpe_config.jb_adaptive_max); + if (rtpe_config.jb_adaptive_max > 1000) + die("--jb-adaptive-max too large (%d ms, maximum 1000 ms)", rtpe_config.jb_adaptive_max); + if (rtpe_config.jb_adaptive_min > rtpe_config.jb_adaptive_max) + die("--jb-adaptive-min (%d) must be <= --jb-adaptive-max (%d)", + rtpe_config.jb_adaptive_min, rtpe_config.jb_adaptive_max); + + // Set reasonable defaults if not specified + if (rtpe_config.jb_adaptive_max == 0) + rtpe_config.jb_adaptive_max = 300; // Default max: 300ms + + ilog(LOG_INFO, "Adaptive jitter buffer enabled: min=%dms, max=%dms", + rtpe_config.jb_adaptive_min, rtpe_config.jb_adaptive_max); + } if (silence_detect > 0) { rtpe_config.silence_detect_double = silence_detect / 100.0; diff --git a/include/jitter_buffer.h b/include/jitter_buffer.h index 1440e4099..eb31a8331 100644 --- a/include/jitter_buffer.h +++ b/include/jitter_buffer.h @@ -37,6 +37,11 @@ struct jitter_buffer { int clock_drift_val; call_t *call; int disabled; + int dynamic_capacity; + double jitter_mean; + double jitter_variance; + double jitter_m2; + unsigned int jitter_samples; }; void jitter_buffer_init(void); diff --git a/include/main.h b/include/main.h index ac4a825b3..d22d8eaf5 100644 --- a/include/main.h +++ b/include/main.h @@ -68,6 +68,8 @@ enum endpoint_learning { X(mysql_port) \ X(dtmf_digit_delay) \ X(jb_length) \ + X(jb_adaptive_min) \ + X(jb_adaptive_max) \ X(dtls_rsa_key_size) \ X(dtls_mtu) \ X(http_threads) \ @@ -123,6 +125,7 @@ enum endpoint_learning { X(dtmf_no_suppress) \ X(dtmf_no_log_injects) \ X(jb_clock_drift) \ + X(jb_adaptive) \ X(player_cache) \ X(poller_per_thread) \ X(redis_resolve_on_reconnect) \