Skip to content

Commit 35a9c9b

Browse files
authored
Beginnings of HTTP signal transport. (#704)
* WIP * handling connect response * comments * v1 handlers by default
1 parent 12378ef commit 35a9c9b

15 files changed

+965
-222
lines changed

auth.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,5 @@ func (b authBase) withAuth(ctx context.Context, opt authOption, options ...authO
6161
return nil, err
6262
}
6363

64-
return twirp.WithHTTPRequestHeaders(ctx, signalling.NewHeaderWithToken(token))
64+
return twirp.WithHTTPRequestHeaders(ctx, signalling.NewHTTPHeaderWithToken(token))
6565
}

engine.go

Lines changed: 166 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ package lksdk
1717
import (
1818
"context"
1919
"errors"
20-
"fmt"
20+
"io"
21+
"net/http"
2122
"sync"
2223
"time"
2324

@@ -54,7 +55,13 @@ type engineHandler interface {
5455
OnResuming()
5556
OnResumed()
5657
OnTranscription(*livekit.Transcription)
57-
OnSignalClientConnected(*livekit.JoinResponse)
58+
OnRoomJoined(
59+
room *livekit.Room,
60+
participant *livekit.ParticipantInfo,
61+
otherParticipants []*livekit.ParticipantInfo,
62+
serverInfo *livekit.ServerInfo,
63+
sifTrailer []byte,
64+
)
5865
OnRpcRequest(callerIdentity, requestId, method, payload string, responseTimeout time.Duration, version uint32)
5966
OnRpcAck(requestId string)
6067
OnRpcResponse(requestId string, payload *string, error *RpcError)
@@ -176,6 +183,7 @@ func NewRTCEngine(engineHandler engineHandler, getLocalParticipantSID func() str
176183
Logger: e.log,
177184
Version: Version,
178185
Protocol: PROTOCOL,
186+
Signalling: e.signalling,
179187
SignalTransportHandler: e,
180188
SignalHandler: e.signalHandler,
181189
})
@@ -203,49 +211,24 @@ func (e *RTCEngine) JoinContext(
203211
url string,
204212
token string,
205213
connectParams *signalling.ConnectParams,
206-
) (proto.Message, error) {
207-
msg, err := e.signalTransport.Join(ctx, url, token, *connectParams)
208-
if err != nil {
209-
return nil, err
210-
}
211-
212-
res, ok := msg.(*livekit.JoinResponse)
213-
if !ok {
214-
e.log.Warnw(
215-
"unknown message type", nil,
216-
"messageType", fmt.Sprintf("%T", msg),
217-
)
218-
return nil, ErrInvalidMessageType
219-
}
220-
214+
) (bool, error) {
221215
e.url = url
222216
e.token.Store(token)
223217
e.connParams = connectParams
224218

225-
err = e.configure(res.IceServers, res.ClientConfiguration, proto.Bool(res.SubscriberPrimary))
219+
err := e.signalTransport.Join(ctx, url, token, *connectParams)
226220
if err != nil {
227-
e.log.Warnw("could not configure", err)
228-
return nil, err
229-
}
230-
231-
e.engineHandler.OnSignalClientConnected(res)
232-
233-
e.signalTransport.Start()
234-
235-
// send offer
236-
if !res.SubscriberPrimary || res.FastPublish {
237-
if publisher, ok := e.Publisher(); ok {
238-
publisher.Negotiate()
239-
} else {
240-
e.log.Warnw("no publisher peer connection", ErrNoPeerConnection)
221+
if verr := e.validate(ctx, url, token, connectParams, ""); verr != nil {
222+
return false, verr
241223
}
224+
return false, err
242225
}
243226

244227
if err = e.waitUntilConnected(); err != nil {
245-
return nil, err
228+
return false, err
246229
}
247230
e.hasConnected.Store(true)
248-
return res, err
231+
return true, err
249232
}
250233

251234
func (e *RTCEngine) OnClose(onClose func()) {
@@ -364,7 +347,7 @@ func (e *RTCEngine) configure(
364347
if subscriberPrimary != nil {
365348
e.subscriberPrimary = *subscriberPrimary
366349
}
367-
e.subscriber.OnRemoteDescriptionSettled(e.createPublisherAnswerAndSend)
350+
e.subscriber.OnRemoteDescriptionSettled(e.createSubscriberPCAnswerAndSend)
368351

369352
e.publisher.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
370353
if candidate == nil {
@@ -692,34 +675,25 @@ func (e *RTCEngine) handleDisconnect(fullReconnect bool) {
692675
}
693676

694677
func (e *RTCEngine) resumeConnection() error {
695-
msg, err := e.signalTransport.Reconnect(
678+
err := e.signalTransport.Reconnect(
696679
e.url,
697680
e.token.Load(),
698681
*e.connParams,
699682
e.cbGetLocalParticipantSID(),
700683
)
701684
if err != nil {
685+
if verr := e.validate(
686+
context.TODO(),
687+
e.url,
688+
e.token.Load(),
689+
e.connParams,
690+
e.cbGetLocalParticipantSID(),
691+
); verr != nil {
692+
return verr
693+
}
702694
return err
703695
}
704696

705-
if msg != nil {
706-
reconnect, ok := msg.(*livekit.ReconnectResponse)
707-
if ok {
708-
configuration := e.makeRTCConfiguration(reconnect.IceServers, reconnect.ClientConfiguration)
709-
e.pclock.Lock()
710-
if err = e.publisher.SetConfiguration(configuration); err != nil {
711-
logger.Errorw("could not set rtc configuration for publisher", err)
712-
e.pclock.Unlock()
713-
return err
714-
}
715-
if err = e.subscriber.SetConfiguration(configuration); err != nil {
716-
logger.Errorw("could not set rtc configuration for subscriber", err)
717-
e.pclock.Unlock()
718-
return err
719-
}
720-
e.pclock.Unlock()
721-
}
722-
}
723697
e.signalTransport.Start()
724698

725699
// send offer if publisher enabled
@@ -750,34 +724,11 @@ func (e *RTCEngine) restartConnection() error {
750724
}
751725
e.signalTransport.Close()
752726

753-
msg, err := e.JoinContext(context.TODO(), e.url, e.token.Load(), e.connParams)
754-
if err != nil {
755-
return err
756-
}
757-
758-
res, ok := msg.(*livekit.SignalResponse)
759-
if !ok {
760-
e.log.Warnw(
761-
"unknown message type", nil,
762-
"messageType", fmt.Sprintf("%T", msg),
763-
)
764-
return ErrInvalidMessageType
765-
}
766-
767-
joinResponse := res.GetJoin()
768-
if joinResponse == nil {
769-
e.log.Warnw(
770-
"unknown message type", nil,
771-
"messageType", fmt.Sprintf("%T", res),
772-
)
773-
return ErrInvalidMessageType
774-
}
775-
776-
e.engineHandler.OnRestarted(joinResponse.Room, joinResponse.Participant, joinResponse.OtherParticipants)
777-
return nil
727+
_, err := e.JoinContext(context.TODO(), e.url, e.token.Load(), e.connParams)
728+
return err
778729
}
779730

780-
func (e *RTCEngine) createPublisherAnswerAndSend() error {
731+
func (e *RTCEngine) createSubscriberPCAnswerAndSend() error {
781732
answer, err := e.subscriber.pc.CreateAnswer(nil)
782733
if err != nil {
783734
e.log.Errorw("could not create answer", err)
@@ -1096,21 +1047,83 @@ func (e *RTCEngine) Simulate(scenario SimulateScenario) {
10961047
}
10971048
}
10981049

1050+
func (e *RTCEngine) validate(
1051+
ctx context.Context,
1052+
urlPrefix string,
1053+
token string,
1054+
connectParams *signalling.ConnectParams,
1055+
participantSID string,
1056+
) error {
1057+
req, err := e.signalling.HTTPRequestForValidate(
1058+
ctx,
1059+
Version,
1060+
PROTOCOL,
1061+
urlPrefix,
1062+
token,
1063+
connectParams,
1064+
participantSID,
1065+
)
1066+
if err != nil {
1067+
return err
1068+
}
1069+
1070+
hresp, err := http.DefaultClient.Do(req)
1071+
if err != nil {
1072+
e.log.Errorw("error getting validation", err, "httpResponse", hresp)
1073+
return signalling.ErrCannotDialSignal
1074+
}
1075+
defer hresp.Body.Close()
1076+
1077+
if hresp.StatusCode == http.StatusOK {
1078+
// no specific errors to return if validate succeeds
1079+
e.log.Infow("validate succeeded")
1080+
return nil
1081+
}
1082+
1083+
var errString string
1084+
switch hresp.StatusCode {
1085+
case http.StatusUnauthorized:
1086+
errString = "unauthorized: "
1087+
case http.StatusNotFound:
1088+
errString = "not found: "
1089+
case http.StatusServiceUnavailable:
1090+
errString = "unavailable: "
1091+
}
1092+
body, err := io.ReadAll(hresp.Body)
1093+
if err == nil {
1094+
errString += string(body)
1095+
e.log.Errorw("validation error", errors.New(errString), "httpResponse", hresp)
1096+
}
1097+
return errors.New(errString)
1098+
}
1099+
10991100
// signalling.SignalTransportHandler implementation
11001101
func (e *RTCEngine) OnTransportClose() {
11011102
e.handleDisconnect(false)
11021103
}
11031104

11041105
// signalling.SignalProcessor implementation
1105-
func (e *RTCEngine) OnJoinResponse(res *livekit.JoinResponse) {
1106-
/* SIGNALLING-V2-TODO: make this async
1106+
func (e *RTCEngine) OnJoinResponse(res *livekit.JoinResponse) error {
1107+
isRestarting := false
1108+
if e.reconnecting.Load() && e.requiresFullReconnect.Load() {
1109+
isRestarting = true
1110+
}
1111+
1112+
e.signalTransport.SetParticipantResource(e.url, res.GetParticipant().Sid, e.token.Load())
1113+
11071114
err := e.configure(res.IceServers, res.ClientConfiguration, proto.Bool(res.SubscriberPrimary))
11081115
if err != nil {
11091116
e.log.Warnw("could not configure", err)
1110-
return
1117+
return err
11111118
}
11121119

1113-
e.engineHandler.OnSignalClientConnected(res)
1120+
e.engineHandler.OnRoomJoined(
1121+
res.Room,
1122+
res.Participant,
1123+
res.OtherParticipants,
1124+
res.ServerInfo,
1125+
res.SifTrailer,
1126+
)
11141127

11151128
e.signalTransport.Start()
11161129

@@ -1122,7 +1135,30 @@ func (e *RTCEngine) OnJoinResponse(res *livekit.JoinResponse) {
11221135
e.log.Warnw("no publisher peer connection", ErrNoPeerConnection)
11231136
}
11241137
}
1125-
*/
1138+
1139+
if isRestarting {
1140+
e.engineHandler.OnRestarted(res.Room, res.Participant, res.OtherParticipants)
1141+
}
1142+
return nil
1143+
}
1144+
1145+
func (e *RTCEngine) OnReconnectResponse(res *livekit.ReconnectResponse) error {
1146+
configuration := e.makeRTCConfiguration(res.IceServers, res.ClientConfiguration)
1147+
1148+
e.pclock.Lock()
1149+
defer e.pclock.Unlock()
1150+
1151+
if err := e.publisher.SetConfiguration(configuration); err != nil {
1152+
e.log.Errorw("could not set rtc configuration for publisher", err)
1153+
return err
1154+
}
1155+
1156+
if err := e.subscriber.SetConfiguration(configuration); err != nil {
1157+
e.log.Errorw("could not set rtc configuration for subscriber", err)
1158+
return err
1159+
}
1160+
1161+
return nil
11261162
}
11271163

11281164
func (e *RTCEngine) OnAnswer(sd webrtc.SessionDescription, answerId uint32) {
@@ -1212,6 +1248,7 @@ func (e *RTCEngine) OnTrackRemoteMuted(request *livekit.MuteTrackRequest) {
12121248

12131249
func (e *RTCEngine) OnTokenRefresh(refreshToken string) {
12141250
e.token.Store(refreshToken)
1251+
e.signalTransport.UpdateParticipantToken(refreshToken)
12151252
}
12161253

12171254
func (e *RTCEngine) OnLeave(leave *livekit.LeaveRequest) {
@@ -1240,3 +1277,47 @@ func (e *RTCEngine) OnLocalTrackSubscribed(trackSubscribed *livekit.TrackSubscri
12401277
func (e *RTCEngine) OnSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) {
12411278
e.engineHandler.OnSubscribedQualityUpdate(subscribedQualityUpdate)
12421279
}
1280+
1281+
func (e *RTCEngine) OnConnectResponse(res *livekit.ConnectResponse) error {
1282+
isRestarting := false
1283+
if e.reconnecting.Load() && e.requiresFullReconnect.Load() {
1284+
isRestarting = true
1285+
}
1286+
1287+
e.signalTransport.SetParticipantResource(e.url, res.GetParticipant().Sid, e.token.Load())
1288+
1289+
err := e.configure(res.IceServers, res.ClientConfiguration, proto.Bool(true))
1290+
if err != nil {
1291+
e.log.Warnw("could not configure", err)
1292+
return err
1293+
}
1294+
1295+
e.engineHandler.OnRoomJoined(
1296+
res.Room,
1297+
res.Participant,
1298+
res.OtherParticipants,
1299+
res.ServerInfo,
1300+
res.SifTrailer,
1301+
)
1302+
1303+
e.signalTransport.Start()
1304+
1305+
// SIGNALLING-V2-TODO: should send publisher offer in connect request itself
1306+
// send offer
1307+
if res.FastPublish {
1308+
if publisher, ok := e.Publisher(); ok {
1309+
publisher.Negotiate()
1310+
} else {
1311+
e.log.Warnw("no publisher peer connection", ErrNoPeerConnection)
1312+
}
1313+
}
1314+
1315+
if res.SubscriberSdp != nil {
1316+
e.OnOffer(protosignalling.FromProtoSessionDescription(res.SubscriberSdp))
1317+
}
1318+
1319+
if isRestarting {
1320+
e.engineHandler.OnRestarted(res.Room, res.Participant, res.OtherParticipants)
1321+
}
1322+
return nil
1323+
}

0 commit comments

Comments
 (0)