Skip to content

Commit 9e4c006

Browse files
authored
fix: request-channel deadlock in some cases (#107)
* fix: request-channel deadlock in some cases * fix: ut
1 parent fe79057 commit 9e4c006

File tree

3 files changed

+46
-62
lines changed

3 files changed

+46
-62
lines changed

internal/socket/duplex.go

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,6 @@ func (dc *DuplexConnection) RequestChannel(sending flux.Flux) (ret flux.Flux) {
447447

448448
toBeReleased := queue.NewLKQueue()
449449

450-
sendResult := make(chan error)
451-
452450
ret = receiving.
453451
DoFinally(func(sig rx.SignalType) {
454452
dc.unregister(sid)
@@ -460,20 +458,6 @@ func (dc *DuplexConnection) RequestChannel(sending flux.Flux) (ret flux.Flux) {
460458
}
461459
next.(common.Releasable).Release()
462460
}
463-
// process sending result
464-
e, ok := <-sendResult
465-
if ok {
466-
dc.writeError(sid, e)
467-
} else {
468-
complete := framing.NewWriteablePayloadFrame(sid, nil, nil, core.FlagComplete)
469-
done := make(chan struct{})
470-
complete.HandleDone(func() {
471-
close(done)
472-
})
473-
if dc.sendFrame(complete) {
474-
<-done
475-
}
476-
}
477461
}).
478462
DoOnNext(func(next payload.Payload) error {
479463
if nextRelease := toBeReleased.Dequeue(); nextRelease != nil {
@@ -486,7 +470,9 @@ func (dc *DuplexConnection) RequestChannel(sending flux.Flux) (ret flux.Flux) {
486470
}).
487471
DoOnRequest(func(initN int) {
488472
n := ToUint32RequestN(initN)
489-
if !rcvRequested.CAS(false, true) {
473+
isFirstRequest := rcvRequested.CAS(false, true)
474+
if !isFirstRequest {
475+
// block send RequestN frame
490476
frameN := framing.NewWriteableRequestNFrame(sid, n, 0)
491477
done := make(chan struct{})
492478
frameN.HandleDone(func() {
@@ -498,13 +484,11 @@ func (dc *DuplexConnection) RequestChannel(sending flux.Flux) (ret flux.Flux) {
498484
return
499485
}
500486

501-
sub := requestChannelSubscriber{
502-
sid: sid,
503-
n: n,
504-
dc: dc,
505-
sndRequested: atomic.NewBool(false),
506-
rcv: receiving,
507-
result: sendResult,
487+
sub := &requestChannelSubscriber{
488+
sid: sid,
489+
n: n,
490+
dc: dc,
491+
rcv: receiving,
508492
}
509493
sending.SubscribeOn(dc.reqSche).SubscribeWith(dc.ctx, sub)
510494
})
@@ -645,13 +629,11 @@ func (dc *DuplexConnection) respondRequestChannel(req fragmentation.HeaderAndPay
645629
return nil
646630
}
647631

648-
receivingProcessor.Next(req)
649-
650632
// Ensure registering message success before func end.
651633
subscribed := make(chan struct{})
652634

653635
// Create subscriber
654-
sub := respondChannelSubscriber{
636+
sub := &respondChannelSubscriber{
655637
sid: sid,
656638
n: initRequestN,
657639
dc: dc,
@@ -664,6 +646,8 @@ func (dc *DuplexConnection) respondRequestChannel(req fragmentation.HeaderAndPay
664646
sending.SubscribeWith(dc.ctx, sub)
665647
})
666648

649+
receivingProcessor.Next(req)
650+
667651
<-subscribed
668652

669653
return nil

internal/socket/subscriber_request_channel.go

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,16 @@ import (
1313
"go.uber.org/atomic"
1414
)
1515

16-
type respondChannelSubscriber struct {
17-
sid uint32
18-
n uint32
19-
dc *DuplexConnection
20-
rcv flux.Processor
21-
subscribed chan<- struct{}
22-
calls *atomic.Int32
23-
}
24-
2516
type requestChannelSubscriber struct {
26-
sid uint32
27-
n uint32
28-
dc *DuplexConnection
29-
sndRequested *atomic.Bool
30-
rcv flux.Processor
31-
result chan<- error
17+
sid uint32
18+
n uint32
19+
dc *DuplexConnection
20+
requested atomic.Bool
21+
rcv flux.Processor
3222
}
3323

34-
func (r requestChannelSubscriber) OnNext(item payload.Payload) {
35-
if !r.sndRequested.CAS(false, true) {
24+
func (r *requestChannelSubscriber) OnNext(item payload.Payload) {
25+
if !r.requested.CAS(false, true) {
3626
r.dc.sendPayload(r.sid, item, core.FlagNext)
3727
return
3828
}
@@ -55,21 +45,22 @@ func (r requestChannelSubscriber) OnNext(item payload.Payload) {
5545
})
5646
}
5747

58-
func (r requestChannelSubscriber) OnError(err error) {
59-
defer func() {
60-
_ = recover()
61-
}()
62-
r.result <- err
48+
func (r *requestChannelSubscriber) OnError(err error) {
49+
r.dc.writeError(r.sid, err)
6350
}
6451

65-
func (r requestChannelSubscriber) OnComplete() {
66-
defer func() {
67-
_ = recover()
68-
}()
69-
close(r.result)
52+
func (r *requestChannelSubscriber) OnComplete() {
53+
complete := framing.NewWriteablePayloadFrame(r.sid, nil, nil, core.FlagComplete)
54+
done := make(chan struct{})
55+
complete.HandleDone(func() {
56+
close(done)
57+
})
58+
if r.dc.sendFrame(complete) {
59+
<-done
60+
}
7061
}
7162

72-
func (r requestChannelSubscriber) OnSubscribe(ctx context.Context, s rx.Subscription) {
63+
func (r *requestChannelSubscriber) OnSubscribe(ctx context.Context, s rx.Subscription) {
7364
select {
7465
case <-ctx.Done():
7566
r.OnError(reactor.ErrSubscribeCancelled)
@@ -83,18 +74,27 @@ func (r requestChannelSubscriber) OnSubscribe(ctx context.Context, s rx.Subscrip
8374
}
8475
}
8576

86-
func (r respondChannelSubscriber) OnNext(next payload.Payload) {
77+
type respondChannelSubscriber struct {
78+
sid uint32
79+
n uint32
80+
dc *DuplexConnection
81+
rcv flux.Processor
82+
subscribed chan<- struct{}
83+
calls *atomic.Int32
84+
}
85+
86+
func (r *respondChannelSubscriber) OnNext(next payload.Payload) {
8787
r.dc.sendPayload(r.sid, next, core.FlagNext)
8888
}
8989

90-
func (r respondChannelSubscriber) OnError(err error) {
90+
func (r *respondChannelSubscriber) OnError(err error) {
9191
if r.calls.Inc() == 2 {
9292
r.dc.unregister(r.sid)
9393
}
9494
r.dc.writeError(r.sid, err)
9595
}
9696

97-
func (r respondChannelSubscriber) OnComplete() {
97+
func (r *respondChannelSubscriber) OnComplete() {
9898
if r.calls.Inc() == 2 {
9999
r.dc.unregister(r.sid)
100100
}
@@ -108,7 +108,7 @@ func (r respondChannelSubscriber) OnComplete() {
108108
}
109109
}
110110

111-
func (r respondChannelSubscriber) OnSubscribe(ctx context.Context, s rx.Subscription) {
111+
func (r *respondChannelSubscriber) OnSubscribe(ctx context.Context, s rx.Subscription) {
112112
select {
113113
case <-ctx.Done():
114114
r.OnError(reactor.ErrSubscribeCancelled)

rsocket_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,13 +357,13 @@ func testAll(t *testing.T, proto string, clientTp transport.ClientTransporter, s
357357
RequestChannel(func(inputs flux.Flux) flux.Flux {
358358
received := new(int32)
359359
inputs.
360-
DoFinally(func(s rx.SignalType) {
361-
assert.Equal(t, channelElements, atomic.LoadInt32(received))
362-
}).
363360
DoOnNext(func(input payload.Payload) error {
364361
atomic.AddInt32(received, 1)
365362
return nil
366363
}).
364+
DoOnComplete(func() {
365+
assert.Equal(t, channelElements, atomic.LoadInt32(received))
366+
}).
367367
Subscribe(context.Background())
368368

369369
return flux.Create(func(ctx context.Context, s flux.Sink) {

0 commit comments

Comments
 (0)