Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 179 additions & 2 deletions daemon/jitter_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
#define DELAY_FACTOR 0x64
#define COMFORT_NOISE 0x0D

#define JB_ADAPTIVE_MIN_SAMPLES 0x0A // Minimum samples before calculating buffer size (10)
#define JB_ADAPTIVE_RECALC_INTERVAL 0x0A // Recalculate buffer size every N packets (10)
#define JB_MAX_BURST_SIZE 0x03E8 // Maximum burst size to prevent overflow (1000 packets = 20 sec)
#define JB_MAX_JITTER_US 0x1E8480 // Maximum jitter value (2000000 µs = 2 seconds)


static struct timerthread jitter_buffer_thread;

Expand Down Expand Up @@ -56,6 +61,11 @@ static void reset_jitter_buffer(struct jitter_buffer *jb) {
jb->clock_drift_val = 0;
jb->prev_seq_ts = rtpe_now;
jb->prev_seq = 0;
jb->jitter_mean = 0.0;
jb->jitter_variance = 0.0;
jb->jitter_m2 = 0.0;
jb->jitter_samples = 0;
jb->dynamic_capacity = 0;

jb->num_resets++;
if(g_tree_nnodes(jb->ttq.entries) > 0)
Expand Down Expand Up @@ -92,6 +102,69 @@ static int get_clock_rate(struct media_packet *mp, int payload_type) {
return clock_rate;
}


// jb is locked
static int get_target_capacity(struct jitter_buffer *jb) {
if (rtpe_config.jb_adaptive && jb->dynamic_capacity > 0)
return jb->dynamic_capacity;
return rtpe_config.jb_length;
}

// jb is locked
static void update_jitter_statistics(struct jitter_buffer *jb, int64_t jitter_sample_us) {
if (!rtpe_config.jb_adaptive)
return;

if (jitter_sample_us < 0 || jitter_sample_us > JB_MAX_JITTER_US) {
ilog(LOG_WARN, "Extreme jitter value detected: %lld µs, ignoring",
(long long)jitter_sample_us);
return;
}

jb->jitter_samples++;

double delta = (double)jitter_sample_us - jb->jitter_mean;
jb->jitter_mean += delta / (double)jb->jitter_samples;
double delta2 = (double)jitter_sample_us - jb->jitter_mean;
jb->jitter_m2 += delta * delta2;

if (jb->jitter_samples > 1)
jb->jitter_variance = jb->jitter_m2 / (double)(jb->jitter_samples - 1);
}

// jb is locked
static void calculate_adaptive_buffer_size(struct jitter_buffer *jb) {
if (!rtpe_config.jb_adaptive || jb->jitter_samples < JB_ADAPTIVE_MIN_SAMPLES)
return;

// Protect against negative variance due to floating-point errors
double std_dev_us = sqrt(jb->jitter_variance < 0.0 ? 0.0 : jb->jitter_variance);

double optimal_buffer_us = jb->jitter_mean + (4.0 * std_dev_us);

int optimal_buffer_ms = (int)(optimal_buffer_us / 1000.0);

int min_capacity = rtpe_config.jb_adaptive_min;
int max_capacity = rtpe_config.jb_adaptive_max;

if (max_capacity <= 0)
max_capacity = 300;
if (min_capacity < 0)
min_capacity = 0;
if (min_capacity > max_capacity)
min_capacity = max_capacity;

if (optimal_buffer_ms < min_capacity)
optimal_buffer_ms = min_capacity;
if (optimal_buffer_ms > max_capacity)
optimal_buffer_ms = max_capacity;

jb->dynamic_capacity = optimal_buffer_ms;

ilog(LOG_DEBUG, "Adaptive JB: mean=%.2fms, stddev=%.2fms, capacity=%dms (samples=%u)",
jb->jitter_mean / 1000.0, std_dev_us / 1000.0, jb->dynamic_capacity, jb->jitter_samples);
}

static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) {
if (!(mp->rtp = rtp_payload(&mp->payload, s, NULL)))
return NULL;
Expand All @@ -113,10 +186,100 @@ static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) {
return p;
}

// jb is locked (temporarily unlocked during operation, then relocked)
static int remove_oldest_packets(struct jitter_buffer *jb, int num_to_remove) {
if (num_to_remove <= 0)
return 0;

int removed = 0;
mutex_unlock(&jb->lock);

for (int i = 0; i < num_to_remove; i++) {
struct timerthread_queue_entry *ttqe = rtpe_g_tree_first(jb->ttq.entries);
if (!ttqe)
break;

g_tree_remove(jb->ttq.entries, ttqe);
if (jb->ttq.entry_free_func)
jb->ttq.entry_free_func(ttqe);
removed++;
}

mutex_lock(&jb->lock);
return removed;
}

// jb is locked
static int try_burst_aware_discard(struct jitter_buffer *jb, int current_buffer_size) {
if (!jb->rtptime_delta || !jb->clock_rate || !jb->prev_seq_ts) {
ilog(LOG_DEBUG, "Burst-aware discard: insufficient data for calculation");
return 0;
}

int64_t packetization_interval_us = ((int64_t)jb->rtptime_delta * 1000000) / jb->clock_rate;
if (packetization_interval_us <= 0) {
ilog(LOG_DEBUG, "Burst-aware discard: invalid packetization interval");
return 0;
}

int64_t delta_t = rtpe_now - jb->prev_seq_ts;
if (delta_t < 0) {
ilog(LOG_DEBUG, "Burst-aware discard: negative time delta");
return 0;
}

int64_t burst_calc = delta_t / packetization_interval_us;
if (burst_calc > JB_MAX_BURST_SIZE) {
ilog(LOG_DEBUG, "Burst size exceeds maximum (%d packets), capping", JB_MAX_BURST_SIZE);
burst_calc = JB_MAX_BURST_SIZE;
}
int estimated_burst_size = (int)burst_calc;

int target_capacity = get_target_capacity(jb);

int packets_to_remove = estimated_burst_size - target_capacity;

if (packets_to_remove <= 0) {
ilog(LOG_DEBUG, "Burst-aware discard: no removal needed (burst: %d, capacity: %d)",
estimated_burst_size, target_capacity);
return 1;
}

if (packets_to_remove >= current_buffer_size) {
ilog(LOG_DEBUG, "Burst-aware discard: would remove all packets (burst: %d, capacity: %d, buffer: %d)",
estimated_burst_size, target_capacity, current_buffer_size);
return 0;
}

int packets_saved = current_buffer_size - packets_to_remove;
ilog(LOG_DEBUG, "Burst-aware discard: burst of %d packets detected, removing %d (saving %d packets)",
estimated_burst_size, packets_to_remove, packets_saved);

int removed = remove_oldest_packets(jb, packets_to_remove);

if (removed > 0) {
ilog(LOG_DEBUG, "Burst-aware discard: successfully removed %d packets", removed);
return 1;
}

return 0;
}

// jb is locked
static void check_buffered_packets(struct jitter_buffer *jb) {
if (g_tree_nnodes(jb->ttq.entries) >= (3* rtpe_config.jb_length)) {
ilog(LOG_DEBUG, "Jitter reset due to buffer overflow");
int current_buffer_size = g_tree_nnodes(jb->ttq.entries);
int target_capacity = get_target_capacity(jb);

if (current_buffer_size > target_capacity) {
if (jb->rtptime_delta && jb->clock_rate && jb->prev_seq_ts) {
if (try_burst_aware_discard(jb, current_buffer_size)) {
return;
}
}
}

if (current_buffer_size >= (3 * rtpe_config.jb_length)) {
ilog(LOG_DEBUG, "Emergency buffer overflow at 3x capacity - forcing reset");
reset_jitter_buffer(jb);
}
}
Expand Down Expand Up @@ -311,6 +474,20 @@ int buffer_packet(struct media_packet *mp, const str *s) {
// packet consumed?
if (ret == 0)
p = NULL;

// Update adaptive jitter buffer statistics
if (rtpe_config.jb_adaptive && jb->first_send && jb->rtptime_delta && jb->clock_rate) {
unsigned long ts = ntohl(mp->rtp->timestamp);
long ts_diff = (uint32_t)ts - (uint32_t)jb->first_send_ts;
int64_t expected_arrival = jb->first_send + (ts_diff * 1000000LL / jb->clock_rate);

int64_t jitter_us = llabs(rtpe_now - expected_arrival);

update_jitter_statistics(jb, jitter_us);

if (jb->jitter_samples % JB_ADAPTIVE_RECALC_INTERVAL == 0)
calculate_adaptive_buffer_size(jb);
}

check_buffered_packets(jb);

Expand Down
23 changes: 23 additions & 0 deletions daemon/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,9 @@ static void options(int *argc, char ***argv, charp_ht templates) {
{ "endpoint-learning",0,0,G_OPTION_ARG_STRING, &endpoint_learning, "RTP endpoint learning algorithm", "delayed|immediate|off|heuristic" },
{ "jitter-buffer",0, 0, G_OPTION_ARG_INT, &rtpe_config.jb_length, "Size of jitter buffer", "INT" },
{ "jb-clock-drift",0,0, G_OPTION_ARG_NONE, &rtpe_config.jb_clock_drift,"Compensate for source clock drift",NULL },
{ "jb-adaptive",0,0, G_OPTION_ARG_NONE, &rtpe_config.jb_adaptive,"Enable adaptive jitter buffer sizing",NULL },
{ "jb-adaptive-min",0,0,G_OPTION_ARG_INT, &rtpe_config.jb_adaptive_min,"Minimum adaptive jitter buffer size (ms)","INT" },
{ "jb-adaptive-max",0,0,G_OPTION_ARG_INT, &rtpe_config.jb_adaptive_max,"Maximum adaptive jitter buffer size (ms)","INT" },
{ "debug-srtp",0,0, G_OPTION_ARG_NONE, &debug_srtp, "Log raw encryption details for SRTP", NULL },
{ "reject-invalid-sdp",0,0, G_OPTION_ARG_NONE, &rtpe_config.reject_invalid_sdp,"Refuse to process SDP bodies with broken syntax", NULL },
{ "dtls-rsa-key-size",0, 0, G_OPTION_ARG_INT,&rtpe_config.dtls_rsa_key_size,"Size of RSA key for DTLS", "INT" },
Expand Down Expand Up @@ -1268,6 +1271,26 @@ static void options(int *argc, char ***argv, charp_ht templates) {

if (rtpe_config.jb_length < 0)
die("Invalid negative jitter buffer size");

// Validate adaptive jitter buffer parameters
if (rtpe_config.jb_adaptive) {
if (rtpe_config.jb_adaptive_min < 0)
die("Invalid negative --jb-adaptive-min (%d)", rtpe_config.jb_adaptive_min);
if (rtpe_config.jb_adaptive_max < 0)
die("Invalid negative --jb-adaptive-max (%d)", rtpe_config.jb_adaptive_max);
if (rtpe_config.jb_adaptive_max > 1000)
die("--jb-adaptive-max too large (%d ms, maximum 1000 ms)", rtpe_config.jb_adaptive_max);
if (rtpe_config.jb_adaptive_min > rtpe_config.jb_adaptive_max)
die("--jb-adaptive-min (%d) must be <= --jb-adaptive-max (%d)",
rtpe_config.jb_adaptive_min, rtpe_config.jb_adaptive_max);

// Set reasonable defaults if not specified
if (rtpe_config.jb_adaptive_max == 0)
rtpe_config.jb_adaptive_max = 300; // Default max: 300ms

ilog(LOG_INFO, "Adaptive jitter buffer enabled: min=%dms, max=%dms",
rtpe_config.jb_adaptive_min, rtpe_config.jb_adaptive_max);
}

if (silence_detect > 0) {
rtpe_config.silence_detect_double = silence_detect / 100.0;
Expand Down
5 changes: 5 additions & 0 deletions include/jitter_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ struct jitter_buffer {
int clock_drift_val;
call_t *call;
int disabled;
int dynamic_capacity;
double jitter_mean;
double jitter_variance;
double jitter_m2;
unsigned int jitter_samples;
};

void jitter_buffer_init(void);
Expand Down
3 changes: 3 additions & 0 deletions include/main.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ enum endpoint_learning {
X(mysql_port) \
X(dtmf_digit_delay) \
X(jb_length) \
X(jb_adaptive_min) \
X(jb_adaptive_max) \
X(dtls_rsa_key_size) \
X(dtls_mtu) \
X(http_threads) \
Expand Down Expand Up @@ -123,6 +125,7 @@ enum endpoint_learning {
X(dtmf_no_suppress) \
X(dtmf_no_log_injects) \
X(jb_clock_drift) \
X(jb_adaptive) \
X(player_cache) \
X(poller_per_thread) \
X(redis_resolve_on_reconnect) \
Expand Down