Skip to content

Commit 356f9ba

Browse files
authored
Gets to the point of establishing subscriber peer connection. (#705)
Changes to handle the signal sequence number and also report last processed so that server can handle it properly.
1 parent 35a9c9b commit 356f9ba

File tree

9 files changed

+104
-32
lines changed

9 files changed

+104
-32
lines changed

engine.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,23 @@ func NewRTCEngine(engineHandler engineHandler, getLocalParticipantSID func() str
187187
SignalTransportHandler: e,
188188
SignalHandler: e.signalHandler,
189189
})
190+
/*
191+
e.signalling = signalling.NewSignallingv2(signalling.Signallingv2Params{
192+
Logger: e.log,
193+
})
194+
e.signalHandler = signalling.NewSignalHandlerv2(signalling.SignalHandlerv2Params{
195+
Logger: e.log,
196+
Processor: e,
197+
Signalling: e.signalling,
198+
})
199+
e.signalTransport = signalling.NewSignalTransportHybrid(signalling.SignalTransportHybridParams{
200+
Logger: e.log,
201+
Version: Version,
202+
Protocol: PROTOCOL,
203+
Signalling: e.signalling,
204+
SignalHandler: e.signalHandler,
205+
})
206+
*/
190207

191208
e.onClose = []func(){}
192209
return e
@@ -1085,14 +1102,18 @@ func (e *RTCEngine) validate(
10851102
case http.StatusUnauthorized:
10861103
errString = "unauthorized: "
10871104
case http.StatusNotFound:
1088-
errString = "not found: "
1105+
errString = "not found"
10891106
case http.StatusServiceUnavailable:
10901107
errString = "unavailable: "
10911108
}
1092-
body, err := io.ReadAll(hresp.Body)
1093-
if err == nil {
1094-
errString += string(body)
1109+
if hresp.StatusCode != http.StatusNotFound {
1110+
body, err := io.ReadAll(hresp.Body)
1111+
if err == nil {
1112+
errString += e.signalling.DecodeErrorResponse(body)
1113+
}
10951114
e.log.Errorw("validation error", errors.New(errString), "httpResponse", hresp)
1115+
} else {
1116+
e.log.Errorw("validation error", errors.New(errString))
10961117
}
10971118
return errors.New(errString)
10981119
}

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
1111
github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5
1212
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded
13-
github.com/livekit/protocol v1.39.4-0.20250721114233-52633eee694f
13+
github.com/livekit/protocol v1.39.4-0.20250725083335-7313a8195a4b
1414
github.com/magefile/mage v1.15.0
1515
github.com/pion/dtls/v3 v3.0.6
1616
github.com/pion/interceptor v0.1.40
@@ -51,7 +51,7 @@ require (
5151
github.com/klauspost/compress v1.18.0 // indirect
5252
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
5353
github.com/lithammer/shortuuid/v4 v4.2.0 // indirect
54-
github.com/livekit/psrpc v0.6.1-0.20250511053145-465289d72c3c // indirect
54+
github.com/livekit/psrpc v0.6.1-0.20250724161801-262a822e7cd7 // indirect
5555
github.com/nats-io/nats.go v1.43.0 // indirect
5656
github.com/nats-io/nkeys v0.4.11 // indirect
5757
github.com/nats-io/nuid v1.0.1 // indirect
@@ -81,7 +81,7 @@ require (
8181
golang.org/x/text v0.27.0 // indirect
8282
google.golang.org/genproto/googleapis/api v0.0.0-20250721164621-a45f3dfb1074 // indirect
8383
google.golang.org/genproto/googleapis/rpc v0.0.0-20250721164621-a45f3dfb1074 // indirect
84-
google.golang.org/grpc v1.73.0 // indirect
84+
google.golang.org/grpc v1.74.2 // indirect
8585
gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302 // indirect
8686
gopkg.in/yaml.v3 v3.0.1 // indirect
8787
)

go.sum

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,10 @@ github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5 h1:aFCwt/rticj5L
9999
github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5/go.mod h1:7ssWiG+U4xnbvLih9WiZbhQP6zIKMjgXdUtIE1bm/E8=
100100
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded h1:ylZPdnlX1RW9Z15SD4mp87vT2D2shsk0hpLJwSPcq3g=
101101
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
102-
github.com/livekit/protocol v1.39.4-0.20250721114233-52633eee694f h1:Cwe38+/ld3r5dnNmIZSALSoZPWNEMeYPZIi/qjpplLo=
103-
github.com/livekit/protocol v1.39.4-0.20250721114233-52633eee694f/go.mod h1:YlgUxAegtU8jZ0tVXoIV/4fHeHqqLvS+6JnPKDbpFPU=
104-
github.com/livekit/psrpc v0.6.1-0.20250511053145-465289d72c3c h1:WwEr0YBejYbKzk8LSaO9h8h0G9MnE7shyDu8yXQWmEc=
105-
github.com/livekit/psrpc v0.6.1-0.20250511053145-465289d72c3c/go.mod h1:kmD+AZPkWu0MaXIMv57jhNlbiSZZ/Jx4bzlxBDVmJes=
102+
github.com/livekit/protocol v1.39.4-0.20250725083335-7313a8195a4b h1:uxvoeGd0vmGDIL0JyLLV9h2o97tpt3rR9s4ikuLVz/g=
103+
github.com/livekit/protocol v1.39.4-0.20250725083335-7313a8195a4b/go.mod h1:YlgUxAegtU8jZ0tVXoIV/4fHeHqqLvS+6JnPKDbpFPU=
104+
github.com/livekit/psrpc v0.6.1-0.20250724161801-262a822e7cd7 h1:x50axcjXwfwnII7sMhJPyZ6f5LpPapZtsp75KJX8nIQ=
105+
github.com/livekit/psrpc v0.6.1-0.20250724161801-262a822e7cd7/go.mod h1:kmD+AZPkWu0MaXIMv57jhNlbiSZZ/Jx4bzlxBDVmJes=
106106
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
107107
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
108108
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
@@ -258,8 +258,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20250721164621-a45f3dfb1074 h1:
258258
google.golang.org/genproto/googleapis/api v0.0.0-20250721164621-a45f3dfb1074/go.mod h1:vYFwMYFbmA8vl6Z/krj/h7+U/AqpHknwJX4Uqgfyc7I=
259259
google.golang.org/genproto/googleapis/rpc v0.0.0-20250721164621-a45f3dfb1074 h1:qJW29YvkiJmXOYMu5Tf8lyrTp3dOS+K4z6IixtLaCf8=
260260
google.golang.org/genproto/googleapis/rpc v0.0.0-20250721164621-a45f3dfb1074/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
261-
google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok=
262-
google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc=
261+
google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4=
262+
google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM=
263263
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
264264
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
265265
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

signalling/interfaces.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type Signalling interface {
8383
connectParams *ConnectParams,
8484
participantSID string,
8585
) (*http.Request, error)
86+
DecodeErrorResponse(errorDetails []byte) string
8687

8788
SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message
8889
SignalICECandidate(trickle *livekit.TrickleRequest) proto.Message
@@ -100,6 +101,8 @@ type Signalling interface {
100101
AckMessageId(ackMessageId uint32)
101102
SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32)
102103

104+
PendingMessages() proto.Message
105+
103106
SignalConnectRequest(connectRequest *livekit.ConnectRequest) proto.Message
104107
}
105108

signalling/signalhandlerv2.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ func (s *signalhandlerv2) HandleMessage(msg proto.Message) error {
6767
switch msg := wireMessage.GetMessage().(type) {
6868
case *livekit.Signalv2WireMessage_Envelope:
6969
for _, serverMessage := range msg.Envelope.ServerMessages {
70-
/* SIGNALLING-V2-TODO: uncomment once server sends proper id
7170
sequencer := serverMessage.GetSequencer()
7271
if sequencer == nil || sequencer.MessageId == 0 {
7372
s.params.Logger.Warnw(
@@ -87,16 +86,14 @@ func (s *signalhandlerv2) HandleMessage(msg proto.Message) error {
8786
continue
8887
}
8988

89+
// SIGNALLING-V2-TODO: ask for replay if there are gaps
9090
if lprmi != 0 && sequencer.MessageId != lprmi+1 {
9191
s.params.Logger.Infow(
9292
"gap in message stream",
9393
"last", lprmi,
9494
"current", serverMessage.Sequencer.MessageId,
9595
)
9696
}
97-
*/
98-
99-
// SIGNALLING-V2-TODO: ask for replay if there are gaps
10097

10198
// SIGNALLING-V2-TODO: process messages
10299
switch payload := serverMessage.GetMessage().(type) {
@@ -116,11 +113,9 @@ func (s *signalhandlerv2) HandleMessage(msg proto.Message) error {
116113
)
117114
}
118115

119-
/* SIGNALLING-V2-TODO: uncomment once sequencer is implemented on both sides
120116
s.lastProcessedRemoteMessageId.Store(sequencer.MessageId)
121117
s.params.Signalling.AckMessageId(sequencer.LastProcessedRemoteMessageId)
122118
s.params.Signalling.SetLastProcessedRemoteMessageId(sequencer.MessageId)
123-
*/
124119
}
125120

126121
case *livekit.Signalv2WireMessage_Fragment:

signalling/signalling.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ func (s *signalling) HTTPRequestForValidate(
130130
return req, nil
131131
}
132132

133+
func (s *signalling) DecodeErrorResponse(errorDetails []byte) string {
134+
return string(errorDetails)
135+
}
136+
133137
func (s *signalling) SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message {
134138
return &livekit.SignalRequest{
135139
Message: &livekit.SignalRequest_Leave{

signalling/signallingunimplemented.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ func (s *signallingUnimplemented) HTTPRequestForValidate(
7676
return nil, ErrUnimplemented
7777
}
7878

79+
func (s *signallingUnimplemented) DecodeErrorResponse(errorDetails []byte) string {
80+
return ""
81+
}
82+
7983
func (s *signallingUnimplemented) SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message {
8084
return nil
8185
}
@@ -129,6 +133,10 @@ func (u *signallingUnimplemented) AckMessageId(ackMessageId uint32) {}
129133
func (u *signallingUnimplemented) SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32) {
130134
}
131135

136+
func (u *signallingUnimplemented) PendingMessages() proto.Message {
137+
return nil
138+
}
139+
132140
func (s *signallingUnimplemented) SignalConnectRequest(connectRequest *livekit.ConnectRequest) proto.Message {
133141
return nil
134142
}

signalling/signallingv2.go

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ package signalling
1717
import (
1818
"bytes"
1919
"context"
20+
"encoding/json"
2021
"net/http"
2122
"net/url"
2223
"runtime"
2324

2425
"github.com/livekit/protocol/livekit"
2526
"github.com/livekit/protocol/logger"
27+
protosignalling "github.com/livekit/protocol/signalling"
2628
"google.golang.org/protobuf/proto"
2729
)
2830

@@ -36,11 +38,16 @@ type signallingv2 struct {
3638
signallingUnimplemented
3739

3840
params Signallingv2Params
41+
42+
signalCache *protosignalling.Signalv2ClientMessageCache
3943
}
4044

4145
func NewSignallingv2(params Signallingv2Params) Signalling {
4246
return &signallingv2{
4347
params: params,
48+
signalCache: protosignalling.NewSignalv2ClientMessageCache(protosignalling.SignalCacheParams{
49+
Logger: params.Logger,
50+
}),
4451
}
4552
}
4653

@@ -133,12 +140,39 @@ func (s *signallingv2) HTTPRequestForValidate(
133140
return req, nil
134141
}
135142

143+
func (s *signallingv2) DecodeErrorResponse(details []byte) string {
144+
var errorDetails struct {
145+
Error string `json:"error"`
146+
}
147+
err := json.Unmarshal(details, &errorDetails)
148+
if err != nil {
149+
return string(details)
150+
}
151+
152+
return errorDetails.Error
153+
}
154+
136155
func (s *signallingv2) AckMessageId(ackMessageId uint32) {
137-
// SIGNALLING-V2-TODO s.signalCache.Clear(ackMessageId)
156+
s.signalCache.Clear(ackMessageId)
138157
}
139158

140159
func (s *signallingv2) SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32) {
141-
// SIGNALLING-V2-TODO s.signalCache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId)
160+
s.signalCache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId)
161+
}
162+
163+
func (s *signallingv2) PendingMessages() proto.Message {
164+
clientMessages := s.signalCache.GetFromFront()
165+
if len(clientMessages) == 0 {
166+
return nil
167+
}
168+
169+
return &livekit.Signalv2WireMessage{
170+
Message: &livekit.Signalv2WireMessage_Envelope{
171+
Envelope: &livekit.Envelope{
172+
ClientMessages: clientMessages,
173+
},
174+
},
175+
}
142176
}
143177

144178
func (s *signallingv2) SignalConnectRequest(connectRequest *livekit.ConnectRequest) proto.Message {
@@ -168,13 +202,20 @@ func (s *signallingv2) SignalSdpAnswer(answer *livekit.SessionDescription) proto
168202
return s.cacheAndReturnEnvelope(clientMessage)
169203
}
170204

205+
func (s *signallingv2) SignalICECandidate(trickle *livekit.TrickleRequest) proto.Message {
206+
clientMessage := &livekit.Signalv2ClientMessage{
207+
Message: &livekit.Signalv2ClientMessage_Trickle{
208+
Trickle: trickle,
209+
},
210+
}
211+
return s.cacheAndReturnEnvelope(clientMessage)
212+
}
213+
171214
func (s *signallingv2) cacheAndReturnEnvelope(cm *livekit.Signalv2ClientMessage) proto.Message {
172-
/* SIGNALLING-V2-TODO
173-
sm = s.signalCache.Add(sm)
174-
if sm == nil {
215+
cm = s.signalCache.Add(cm)
216+
if cm == nil {
175217
return nil
176218
}
177-
*/
178219

179220
return &livekit.Signalv2WireMessage{
180221
Message: &livekit.Signalv2WireMessage_Envelope{

signalling/signaltransport_http.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -208,21 +208,21 @@ func (s *signalTransportHttp) sendHttpRequest(
208208
return nil, err
209209
}
210210

211-
defer hresp.Body.Close()
212-
213-
if hresp.Header.Get("Content-type") != "application/x-protobuf" {
214-
return nil, fmt.Errorf("%w: %s", ErrUnsupportedContentType, hresp.Header.Get("Content-type"))
215-
}
216-
217211
s.params.Logger.Infow("http response received", "elapsed", time.Since(startedAt))
218212

213+
defer hresp.Body.Close()
214+
219215
body, err := io.ReadAll(hresp.Body)
220216
if err != nil {
221217
return nil, err
222218
}
223219

224220
if hresp.StatusCode != http.StatusOK {
225-
return nil, errors.New(string(body))
221+
return nil, errors.New(s.params.Signalling.DecodeErrorResponse(body))
222+
}
223+
224+
if hresp.Header.Get("Content-type") != "application/x-protobuf" {
225+
return nil, fmt.Errorf("%w: %s", ErrUnsupportedContentType, hresp.Header.Get("Content-type"))
226226
}
227227

228228
respWireMessage := &livekit.Signalv2WireMessage{}

0 commit comments

Comments
 (0)