Skip to content

Commit 7859cc0

Browse files
committed
Add NADA implementation
This change adds the NADA congestion control implementation. The binding to cc is left TODO.
1 parent 09051cd commit 7859cc0

File tree

13 files changed

+656
-3
lines changed

13 files changed

+656
-3
lines changed

interceptor.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ type Factory interface {
1717
// Interceptor can be used to add functionality to you PeerConnections by modifying any incoming/outgoing rtp/rtcp
1818
// packets, or sending your own packets as needed.
1919
type Interceptor interface {
20-
2120
// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
2221
// change in the future. The returned method will be called once per packet batch.
2322
BindRTCPReader(reader RTCPReader) RTCPReader

pkg/nack/retainable_packet.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ func (m *packetManager) releasePacket(header *rtp.Header, payload *[]byte) {
6060
}
6161
}
6262

63-
type noOpPacketFactory struct {
64-
}
63+
type noOpPacketFactory struct{}
6564

6665
func (f *noOpPacketFactory) NewPacket(header *rtp.Header, payload []byte) (*retainablePacket, error) {
6766
return &retainablePacket{

pkg/nada/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# [RFC 8698] NADA: A Unified Congestion Control Scheme for Real-Time Media
2+
3+
Notes:
4+
5+
* The receiver in this implementation assumes a monotonically ordered sequence of packets.

pkg/nada/config.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package nada
2+
3+
import "time"
4+
5+
// Bits represents a unit of one bit.
6+
type Bits uint32
7+
8+
// BitsPerSecond represents a unit of one bit per second.
9+
type BitsPerSecond float64
10+
11+
const (
12+
// Kbps represents 1 kbps.
13+
Kbps = BitsPerSecond(1_000)
14+
// Mbps represents 1 Mbps.
15+
Mbps = BitsPerSecond(1_000_000)
16+
)
17+
18+
// Config represents the configuration of a NADA bandwidth estimator.
19+
type Config struct {
20+
// Weight of priority of the flow
21+
Priority float64
22+
// Minimum rate of the application supported by the media encoder
23+
MinimumRate BitsPerSecond // RMIN
24+
// Maximum rate of the application supported by media encoder
25+
MaximumRate BitsPerSecond // RMAX
26+
// Reference congestion level
27+
ReferenceCongestionLevel time.Duration // XREF
28+
// Scaling parameter for gradual rate update calculation
29+
Kappa float64
30+
// Scaling parameter for gradual rate update calculation
31+
Eta float64
32+
// Upper bound of RTT in gradual rate update calculation
33+
Tau time.Duration
34+
// Target feedback interval
35+
Delta time.Duration
36+
37+
// Observation window in time for calculating packet summary statistics at receiver
38+
LogWindow time.Duration // LOGWIN
39+
// Threshold for determining queuing delay build up at receiver
40+
QueueingDelayThreshold time.Duration
41+
// Bound on filtering delay
42+
FilteringDelay time.Duration // DFILT
43+
// Upper bound on rate increase ratio for accelerated ramp-up
44+
GammaMax float64
45+
// Upper bound on self-inflicted queueing delay during ramp up
46+
QueueBound time.Duration // QBOUND
47+
48+
// Multiplier for self-scaling the expiration threshold of the last observed loss
49+
// (loss_exp) based on measured average loss interval (loss_int)
50+
LossMultiplier float64 // MULTILOSS
51+
// Delay threshold for invoking non-linear warping
52+
DelayThreshold time.Duration // QTH
53+
// Scaling parameter in the exponent of non-linear warping
54+
Lambda float64
55+
56+
// Reference packet loss ratio
57+
ReferencePacketLossRatio float64 // PLRREF
58+
// Reference packet marking ratio
59+
ReferencePacketMarkingRatio float64 // PMRREF
60+
// Reference delay penalty for loss when lacket loss ratio is at least PLRREF
61+
ReferenceDelayLoss time.Duration // DLOSS
62+
// Reference delay penalty for ECN marking when packet marking is at PMRREF
63+
ReferenceDelayMarking time.Duration // DMARK
64+
65+
// Frame rate of incoming video
66+
FrameRate float64 // FRAMERATE
67+
// Scaling parameter for modulating outgoing sending rate
68+
BetaSending float64
69+
// Scaling parameter for modulating video encoder target rate
70+
BetaVideoEncoder float64
71+
// Smoothing factor in exponential smoothing of packet loss and marking rate
72+
Alpha float64
73+
}
74+
75+
// DefaultConfig returns the default configuration recommended by the specification.
76+
func DefaultConfig() Config {
77+
return Config{
78+
Priority: 1.0,
79+
MinimumRate: 150 * Kbps,
80+
MaximumRate: 1500 * Kbps,
81+
ReferenceCongestionLevel: 10 * time.Millisecond,
82+
Kappa: 0.5,
83+
Eta: 2.0,
84+
Tau: 500 * time.Millisecond,
85+
Delta: 100 * time.Millisecond,
86+
87+
LogWindow: 500 * time.Millisecond,
88+
QueueingDelayThreshold: 10 * time.Millisecond,
89+
FilteringDelay: 120 * time.Millisecond,
90+
GammaMax: 0.5,
91+
QueueBound: 50 * time.Millisecond,
92+
93+
LossMultiplier: 7.0,
94+
DelayThreshold: 50 * time.Millisecond,
95+
Lambda: 0.5,
96+
97+
ReferencePacketLossRatio: 0.01,
98+
ReferencePacketMarkingRatio: 0.01,
99+
ReferenceDelayLoss: 10 * time.Millisecond,
100+
ReferenceDelayMarking: 2 * time.Millisecond,
101+
102+
FrameRate: 30.0,
103+
BetaSending: 0.1,
104+
BetaVideoEncoder: 0.1,
105+
Alpha: 0.1,
106+
}
107+
}

pkg/nada/ecn/ecn.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Package ecn provides ExplicitCongestionNotification (ECN) support.
2+
package ecn
3+
4+
import (
5+
"errors"
6+
"syscall"
7+
)
8+
9+
var errNoECN = errors.New("no ECN control message")
10+
11+
// CheckExplicitCongestionNotification checks if the given oob data includes an ECN bit set.
12+
func CheckExplicitCongestionNotification(oob []byte) (uint8, error) {
13+
ctrlMsgs, err := syscall.ParseSocketControlMessage(oob)
14+
if err != nil {
15+
return 0, err
16+
}
17+
for _, ctrlMsg := range ctrlMsgs {
18+
if ctrlMsg.Header.Type == syscall.IP_TOS {
19+
return (ctrlMsg.Data[0] & 0x3), nil
20+
}
21+
}
22+
return 0, errNoECN
23+
}

pkg/nada/ecn/ecn_darwin.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package ecn
2+
3+
import (
4+
"net"
5+
)
6+
7+
// EnableExplicitCongestionNotification enables ECN on the given connection.
8+
func EnableExplicitCongestionNotification(conn *net.UDPConn) {
9+
// noop.
10+
}

pkg/nada/ecn/ecn_linux.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package ecn
2+
3+
import (
4+
"net"
5+
"reflect"
6+
"syscall"
7+
)
8+
9+
// EnableExplicitCongestionNotification enables ECN on the given connection.
10+
func EnableExplicitCongestionNotification(conn *net.UDPConn) {
11+
ptrVal := reflect.ValueOf(*conn)
12+
fdmember := reflect.Indirect(ptrVal).FieldByName("fd")
13+
pfdmember := reflect.Indirect(fdmember).FieldByName("pfd")
14+
netfdmember := reflect.Indirect(pfdmember).FieldByName("Sysfd")
15+
fd := int(netfdmember.Int())
16+
if err := syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_RECVTOS, 1); err != nil {
17+
panic(err)
18+
}
19+
}

pkg/nada/nada.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
// Package nada provides an implementation of the NADA congestion control algorithm.
2+
package nada

pkg/nada/packet_stream.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package nada
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"sync"
7+
"time"
8+
)
9+
10+
type packet struct {
11+
ts time.Time
12+
seq uint16
13+
ecn bool
14+
size Bits
15+
queueingDelay bool
16+
}
17+
18+
// String returns a string representation of the packet.
19+
func (p *packet) String() string {
20+
return fmt.Sprintf("%v@%v", p.seq, p.ts.Nanosecond()%1000)
21+
}
22+
23+
type packetStream struct {
24+
sync.Mutex
25+
26+
window time.Duration
27+
packets []*packet
28+
markCount uint16
29+
totalSize Bits
30+
queueingDelayCount uint16
31+
}
32+
33+
func newPacketStream(window time.Duration) *packetStream {
34+
return &packetStream{
35+
window: window,
36+
}
37+
}
38+
39+
var errTimeOrder = errors.New("invalid packet timestamp ordering")
40+
41+
// add writes a packet to the underlying stream.
42+
func (ps *packetStream) add(ts time.Time, seq uint16, ecn bool, size Bits, queueingDelay bool) error {
43+
ps.Lock()
44+
defer ps.Unlock()
45+
46+
if len(ps.packets) > 0 && ps.packets[len(ps.packets)-1].ts.After(ts) {
47+
return errTimeOrder
48+
}
49+
// check if the packet seq already exists.
50+
for _, p := range ps.packets {
51+
if p.seq == seq {
52+
return errTimeOrder
53+
}
54+
}
55+
ps.packets = append(ps.packets, &packet{
56+
ts: ts,
57+
seq: seq,
58+
ecn: ecn,
59+
size: size,
60+
queueingDelay: queueingDelay,
61+
})
62+
if ecn {
63+
ps.markCount++
64+
}
65+
ps.totalSize += size
66+
if queueingDelay {
67+
ps.queueingDelayCount++
68+
}
69+
return nil
70+
}
71+
72+
// prune removes packets that are older than the window and returns the loss and marking rate.
73+
func (ps *packetStream) prune(now time.Time) (loss float64, marking float64, receivingRate BitsPerSecond, hasQueueingDelay bool) {
74+
ps.Lock()
75+
defer ps.Unlock()
76+
77+
startTS := now.Add(-ps.window)
78+
start := 0
79+
for ; start < len(ps.packets) && ps.packets[start].ts.Before(startTS); start++ {
80+
// decrement mark count if ecn.
81+
if ps.packets[start].ecn {
82+
ps.markCount--
83+
}
84+
ps.totalSize -= ps.packets[start].size
85+
if ps.packets[start].queueingDelay {
86+
ps.queueingDelayCount--
87+
}
88+
}
89+
if start > 0 {
90+
ps.packets = ps.packets[start:]
91+
}
92+
seqs := make([]uint16, len(ps.packets))
93+
for i, p := range ps.packets {
94+
seqs[i] = p.seq
95+
}
96+
begin, end := getSeqRange(seqs)
97+
loss = 1 - float64(len(ps.packets))/float64(end-begin+1)
98+
marking = float64(ps.markCount) / float64(end-begin+1)
99+
return loss, marking, BitsPerSecond(float64(ps.totalSize) / ps.window.Seconds()), ps.queueingDelayCount > 0
100+
}
101+
102+
func getSeqRange(seqs []uint16) (uint16, uint16) {
103+
minDelta := 0
104+
maxDelta := 0
105+
seq0 := seqs[0]
106+
for _, seq := range seqs {
107+
delta := int(seq - seq0)
108+
if seq-seq0 >= 16384 {
109+
delta -= (1 << 16)
110+
if delta < minDelta {
111+
minDelta = delta
112+
}
113+
} else if delta > maxDelta {
114+
maxDelta = delta
115+
}
116+
}
117+
return seq0 + uint16(minDelta), seq0 + uint16(maxDelta)
118+
}

0 commit comments

Comments
 (0)