Skip to content

Commit ab10a3f

Browse files
committed
Add NADA implementation
This change adds the NADA congestion control implementation. The binding to cc is left TODO.
1 parent 09051cd commit ab10a3f

File tree

10 files changed

+630
-0
lines changed

10 files changed

+630
-0
lines changed

pkg/nada/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# [RFC 8698] NADA: A Unified Congestion Control Scheme for Real-Time Media
2+
3+
Notes:
4+
5+
* The receiver in this implementation assumes a monotonically ordered sequence of packets.

pkg/nada/config.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package rfc8698
2+
3+
import "time"
4+
5+
type Bits uint32
6+
7+
type BitsPerSecond float64
8+
9+
const Kbps = BitsPerSecond(1_000)
10+
const Mbps = BitsPerSecond(1_000_000)
11+
12+
type Config struct {
13+
// Weight of priority of the flow
14+
Priority float64
15+
// Minimum rate of the application supported by the media encoder
16+
MinimumRate BitsPerSecond // RMIN
17+
// Maximum rate of the application supported by media encoder
18+
MaximumRate BitsPerSecond // RMAX
19+
// Reference congestion level
20+
ReferenceCongestionLevel time.Duration // XREF
21+
// Scaling parameter for gradual rate update calculation
22+
κ float64
23+
// Scaling parameter for gradual rate update calculation
24+
η float64
25+
// Upper bound of RTT in gradual rate update calculation
26+
τ time.Duration
27+
// Target feedback interval
28+
δ time.Duration
29+
30+
// Observation window in time for calculating packet summary statistics at receiver
31+
LogWindow time.Duration // LOGWIN
32+
// Threshold for determining queuing delay build up at receiver
33+
QueueingDelayThreshold time.Duration
34+
// Bound on filtering delay
35+
FilteringDelay time.Duration // DFILT
36+
// Upper bound on rate increase ratio for accelerated ramp-up
37+
γ_max float64
38+
// Upper bound on self-inflicted queueing delay during ramp up
39+
QueueBound time.Duration // QBOUND
40+
41+
// Multiplier for self-scaling the expiration threshold of the last observed loss
42+
// (loss_exp) based on measured average loss interval (loss_int)
43+
LossMultiplier float64 // MULTILOSS
44+
// Delay threshold for invoking non-linear warping
45+
DelayThreshold time.Duration // QTH
46+
// Scaling parameter in the exponent of non-linear warping
47+
λ float64
48+
49+
// Reference packet loss ratio
50+
ReferencePacketLossRatio float64 // PLRREF
51+
// Reference packet marking ratio
52+
ReferencePacketMarkingRatio float64 // PMRREF
53+
// Reference delay penalty for loss when lacket loss ratio is at least PLRREF
54+
ReferenceDelayLoss time.Duration // DLOSS
55+
// Reference delay penalty for ECN marking when packet marking is at PMRREF
56+
ReferenceDelayMarking time.Duration // DMARK
57+
58+
// Frame rate of incoming video
59+
FrameRate float64 // FRAMERATE
60+
// Scaling parameter for modulating outgoing sending rate
61+
β_s float64
62+
// Scaling parameter for modulating video encoder target rate
63+
β_v float64
64+
// Smoothing factor in exponential smoothing of packet loss and marking rate
65+
α float64
66+
}
67+
68+
var DefaultConfig = Config{
69+
Priority: 1.0,
70+
MinimumRate: 150 * Kbps,
71+
MaximumRate: 1500 * Kbps,
72+
ReferenceCongestionLevel: 10 * time.Millisecond,
73+
κ: 0.5,
74+
η: 2.0,
75+
τ: 500 * time.Millisecond,
76+
δ: 100 * time.Millisecond,
77+
78+
LogWindow: 500 * time.Millisecond,
79+
QueueingDelayThreshold: 10 * time.Millisecond,
80+
FilteringDelay: 120 * time.Millisecond,
81+
γ_max: 0.5,
82+
QueueBound: 50 * time.Millisecond,
83+
84+
LossMultiplier: 7.0,
85+
DelayThreshold: 50 * time.Millisecond,
86+
λ: 0.5,
87+
88+
ReferencePacketLossRatio: 0.01,
89+
ReferencePacketMarkingRatio: 0.01,
90+
ReferenceDelayLoss: 10 * time.Millisecond,
91+
ReferenceDelayMarking: 2 * time.Millisecond,
92+
93+
FrameRate: 30.0,
94+
β_s: 0.1,
95+
β_v: 0.1,
96+
α: 0.1,
97+
}

pkg/nada/ecn/ecn.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package ecn
2+
3+
import (
4+
"errors"
5+
"syscall"
6+
)
7+
8+
// CheckExplicitCongestionNotification checks if the given oob data includes an ECN bit set.
9+
func CheckExplicitCongestionNotification(oob []byte) (uint8, error) {
10+
ctrlMsgs, err := syscall.ParseSocketControlMessage(oob)
11+
if err != nil {
12+
return 0, err
13+
}
14+
for _, ctrlMsg := range ctrlMsgs {
15+
if ctrlMsg.Header.Type == syscall.IP_TOS {
16+
return (ctrlMsg.Data[0] & 0x3), nil
17+
}
18+
}
19+
return 0, errors.New("no ECN control message")
20+
}

pkg/nada/ecn/ecn_darwin.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package ecn
2+
3+
import (
4+
"net"
5+
)
6+
7+
// EnableExplicitCongestionNotification enables ECN on the given connection.
8+
func EnableExplicitCongestionNotification(conn *net.UDPConn) {
9+
// noop.
10+
}

pkg/nada/ecn/ecn_linux.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package ecn
2+
3+
import (
4+
"net"
5+
"reflect"
6+
"syscall"
7+
)
8+
9+
// EnableExplicitCongestionNotification enables ECN on the given connection.
10+
func EnableExplicitCongestionNotification(conn *net.UDPConn) {
11+
ptrVal := reflect.ValueOf(*conn)
12+
fdmember := reflect.Indirect(ptrVal).FieldByName("fd")
13+
pfdmember := reflect.Indirect(fdmember).FieldByName("pfd")
14+
netfdmember := reflect.Indirect(pfdmember).FieldByName("Sysfd")
15+
fd := int(netfdmember.Int())
16+
syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_RECVTOS, 1)
17+
}

pkg/nada/packet_stream.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package rfc8698
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"sync"
7+
"time"
8+
)
9+
10+
type packet struct {
11+
ts time.Time
12+
seq uint16
13+
ecn bool
14+
size Bits
15+
queueingDelay bool
16+
}
17+
18+
// String returns a string representation of the packet.
19+
func (p *packet) String() string {
20+
return fmt.Sprintf("%v@%v", p.seq, p.ts.Nanosecond()%1000)
21+
}
22+
23+
type packetStream struct {
24+
sync.Mutex
25+
26+
window time.Duration
27+
packets []*packet
28+
markCount uint16
29+
totalSize Bits
30+
queueingDelayCount uint16
31+
32+
tail uint16
33+
}
34+
35+
func newPacketStream(window time.Duration) *packetStream {
36+
return &packetStream{
37+
window: window,
38+
}
39+
}
40+
41+
var errTimeOrder = errors.New("invalid packet timestamp ordering")
42+
43+
// add writes a packet to the underlying stream.
44+
func (ps *packetStream) add(ts time.Time, seq uint16, ecn bool, size Bits, queueingDelay bool) error {
45+
ps.Lock()
46+
defer ps.Unlock()
47+
48+
if len(ps.packets) > 0 && ps.packets[len(ps.packets)-1].ts.After(ts) {
49+
return errTimeOrder
50+
}
51+
// check if the packet seq already exists.
52+
for _, p := range ps.packets {
53+
if p.seq == seq {
54+
return errTimeOrder
55+
}
56+
}
57+
ps.packets = append(ps.packets, &packet{
58+
ts: ts,
59+
seq: seq,
60+
ecn: ecn,
61+
size: size,
62+
queueingDelay: queueingDelay,
63+
})
64+
if ecn {
65+
ps.markCount++
66+
}
67+
ps.totalSize += size
68+
if queueingDelay {
69+
ps.queueingDelayCount++
70+
}
71+
return nil
72+
}
73+
74+
// prune removes packets that are older than the window and returns the loss and marking rate.
75+
func (ps *packetStream) prune(now time.Time) (loss float64, marking float64, receivingRate BitsPerSecond, hasQueueingDelay bool) {
76+
ps.Lock()
77+
defer ps.Unlock()
78+
79+
startTs := now.Add(-ps.window)
80+
start := 0
81+
for ; start < len(ps.packets) && ps.packets[start].ts.Before(startTs); start++ {
82+
// decrement mark count if ecn.
83+
if ps.packets[start].ecn {
84+
ps.markCount--
85+
}
86+
ps.totalSize -= ps.packets[start].size
87+
if ps.packets[start].queueingDelay {
88+
ps.queueingDelayCount--
89+
}
90+
}
91+
if start > 0 {
92+
ps.packets = ps.packets[start:]
93+
}
94+
seqs := make([]uint16, len(ps.packets))
95+
for i, p := range ps.packets {
96+
seqs[i] = p.seq
97+
}
98+
begin, end := GetSeqRange(seqs)
99+
loss = 1 - float64(len(ps.packets))/float64(end-begin+1)
100+
marking = float64(ps.markCount) / float64(end-begin+1)
101+
return loss, marking, BitsPerSecond(float64(ps.totalSize) / ps.window.Seconds()), ps.queueingDelayCount > 0
102+
}
103+
104+
105+
func GetSeqRange(seqs []uint16) (uint16, uint16) {
106+
minDelta := 0
107+
maxDelta := 0
108+
seq0 := seqs[0]
109+
for _, seq := range seqs {
110+
delta := int(seq - seq0)
111+
if seq-seq0 >= 16384 {
112+
delta -= (1 << 16)
113+
if delta < minDelta {
114+
minDelta = delta
115+
}
116+
} else {
117+
if delta > maxDelta {
118+
maxDelta = delta
119+
}
120+
}
121+
}
122+
return seq0 + uint16(minDelta), seq0 + uint16(maxDelta)
123+
}

0 commit comments

Comments
 (0)