Skip to content

Commit 43ab32b

Browse files
authored
tests for replacing ipv6 endpoints (#1921)
1 parent 3fd8eff commit 43ab32b

File tree

4 files changed

+198
-10
lines changed

4 files changed

+198
-10
lines changed

internal/balancer/balancer.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
207207
)
208208
}()
209209

210-
connections := endpointsToConnections(b.pool, newest)
210+
connections := conn.EndpointsToConnections(b.pool, newest)
211211
for _, c := range connections {
212212
b.pool.Allow(ctx, c)
213213
c.Endpoint().Touch()
@@ -453,12 +453,3 @@ func (b *Balancer) nextConn(ctx context.Context) (c conn.Conn, err error) {
453453

454454
return c, nil
455455
}
456-
457-
func endpointsToConnections(p *conn.Pool, endpoints []endpoint.Endpoint) []conn.Conn {
458-
conns := make([]conn.Conn, 0, len(endpoints))
459-
for _, e := range endpoints {
460-
conns = append(conns, p.Get(e))
461-
}
462-
463-
return conns
464-
}

internal/balancer/balancer_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import (
1414
"google.golang.org/grpc/test/bufconn"
1515

1616
"github.com/ydb-platform/ydb-go-sdk/v3/config"
17+
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
19+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1720
)
1821

1922
func TestBalancer_discoveryConn(t *testing.T) {
@@ -71,6 +74,52 @@ func TestBalancer_discoveryConn(t *testing.T) {
7174
require.NoError(t, err)
7275
}
7376

77+
func TestApplyDiscoveredEndpoints(t *testing.T) {
78+
ctx := context.Background()
79+
80+
cfg := config.New()
81+
pool := conn.NewPool(ctx, cfg)
82+
defer func() { _ = pool.Release(ctx) }()
83+
84+
b := &Balancer{
85+
driverConfig: cfg,
86+
pool: pool,
87+
balancerConfig: balancerConfig.Config{},
88+
}
89+
90+
initial := newConnectionsState(nil, b.balancerConfig.Filter, balancerConfig.Info{}, b.balancerConfig.AllowFallback)
91+
b.connectionsState.Store(initial)
92+
93+
e1 := endpoint.New("e1.example:2135", endpoint.WithIPV6([]string{"2001:db8::1"}), endpoint.WithID(1))
94+
e2 := endpoint.New("e2.example:2135", endpoint.WithIPV6([]string{"2001:db8::2"}), endpoint.WithID(2))
95+
96+
// call with two endpoints
97+
b.applyDiscoveredEndpoints(ctx, []endpoint.Endpoint{e1, e2}, "")
98+
99+
// connectionsState should be updated and reflect the endpoints
100+
after := b.connections()
101+
require.NotNil(t, after)
102+
all := after.All()
103+
require.Equal(t, 2, len(all))
104+
require.Equal(t, e1.Address(), all[0].Address())
105+
require.Equal(t, e1.NodeID(), all[0].NodeID())
106+
require.Equal(t, e2.Address(), all[1].Address())
107+
require.Equal(t, e2.NodeID(), all[1].NodeID())
108+
109+
// partially replace endpoints
110+
e3 := endpoint.New("e3.example:2135", endpoint.WithIPV6([]string{"2001:db8::3"}), endpoint.WithID(1))
111+
b.applyDiscoveredEndpoints(ctx, []endpoint.Endpoint{e2, e3}, "")
112+
// connectionsState should be updated and reflect the endpoints
113+
after = b.connections()
114+
require.NotNil(t, after)
115+
all = after.All()
116+
require.Equal(t, 2, len(all))
117+
require.Equal(t, e2.Address(), all[0].Address())
118+
require.Equal(t, e2.NodeID(), all[0].NodeID())
119+
require.Equal(t, e3.Address(), all[1].Address())
120+
require.Equal(t, e3.NodeID(), all[1].NodeID())
121+
}
122+
74123
// Mock resolver
75124
//
76125

internal/conn/pool.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ type Pool struct {
3030
done chan struct{}
3131
}
3232

33+
func EndpointsToConnections(p *Pool, endpoints []endpoint.Endpoint) []Conn {
34+
conns := make([]Conn, 0, len(endpoints))
35+
for _, e := range endpoints {
36+
conns = append(conns, p.Get(e))
37+
}
38+
39+
return conns
40+
}
41+
3342
func (p *Pool) DialTimeout() time.Duration {
3443
return p.config.DialTimeout()
3544
}

internal/conn/pool_test.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,3 +612,142 @@ func TestPool_ConnParker(t *testing.T) {
612612
require.Equal(t, 3, tickCount)
613613
})
614614
}
615+
616+
func TestEndpointsToConnections(t *testing.T) {
617+
t.Run("CreatesConnectionsForEndpoints", func(t *testing.T) {
618+
ctx := context.Background()
619+
config := &mockConfig{
620+
dialTimeout: 5 * time.Second,
621+
connectionTTL: 0,
622+
}
623+
pool := NewPool(ctx, config)
624+
defer func() {
625+
_ = pool.Release(ctx)
626+
}()
627+
628+
require.Equal(t, 0, pool.conns.Len())
629+
630+
e1 := endpoint.New("e1:2135")
631+
e2 := endpoint.New("e2:2135")
632+
633+
conns := EndpointsToConnections(pool, []endpoint.Endpoint{e1, e2})
634+
635+
require.Len(t, conns, 2)
636+
require.Equal(t, 2, pool.conns.Len())
637+
638+
require.Equal(t, pool.Get(e1), conns[0])
639+
require.Equal(t, pool.Get(e2), conns[1])
640+
})
641+
642+
t.Run("ReusesExistingConnections", func(t *testing.T) {
643+
ctx := context.Background()
644+
config := &mockConfig{
645+
dialTimeout: 5 * time.Second,
646+
connectionTTL: 0,
647+
}
648+
pool := NewPool(ctx, config)
649+
defer func() {
650+
_ = pool.Release(ctx)
651+
}()
652+
653+
e := endpoint.New("reuse:2135")
654+
655+
existing := pool.Get(e)
656+
require.NotNil(t, existing)
657+
658+
initialLen := pool.conns.Len()
659+
660+
conns := EndpointsToConnections(pool, []endpoint.Endpoint{e})
661+
662+
require.Len(t, conns, 1)
663+
require.Equal(t, existing, conns[0])
664+
665+
require.Equal(t, initialLen, pool.conns.Len())
666+
})
667+
668+
t.Run("IPv6AndHostOverrideUniqueKeys", func(t *testing.T) {
669+
ctx := context.Background()
670+
config := &mockConfig{
671+
dialTimeout: 5 * time.Second,
672+
connectionTTL: 0,
673+
}
674+
pool := NewPool(ctx, config)
675+
defer func() {
676+
_ = pool.Release(ctx)
677+
}()
678+
679+
// ensure empty pool
680+
require.Equal(t, 0, pool.conns.Len())
681+
682+
// address is a dns-style host:port, ipv6 provides resolved ip used in Key.Address()
683+
e1 := endpoint.New("example.com:2135", endpoint.WithIPV6([]string{"2001:db8::1"}))
684+
// if node is rebooted with different ssl name override, we need a different connection
685+
e2 := endpoint.New(
686+
"example.com:2135",
687+
endpoint.WithIPV6([]string{"2001:db8::1"}),
688+
endpoint.WithSslTargetNameOverride("override"),
689+
)
690+
// different ipv6 -> different Address()
691+
e3 := endpoint.New("example.com:2135", endpoint.WithIPV6([]string{"2001:db8::2"}), endpoint.WithID(2))
692+
// same ipv6 as e1 but different NodeID -> different Key.NodeID
693+
e4 := endpoint.New("example.com:2135", endpoint.WithIPV6([]string{"2001:db8::1"}), endpoint.WithID(1))
694+
695+
endpoints := []endpoint.Endpoint{e1, e2, e3, e4}
696+
conns := EndpointsToConnections(pool, endpoints)
697+
698+
require.Len(t, conns, len(endpoints))
699+
require.Equal(t, 4, pool.conns.Len())
700+
701+
for i, e := range endpoints {
702+
got := conns[i]
703+
require.NotNil(t, got)
704+
require.Equal(t, pool.Get(e), got)
705+
cc, ok := pool.conns.Get(e.Key())
706+
require.True(t, ok)
707+
require.Equal(t, cc, got)
708+
}
709+
710+
require.Equal(t, e2.Key().HostOverride, "override")
711+
require.Equal(t, e4.Key().NodeID, uint32(1))
712+
})
713+
714+
t.Run("AddNewEndpointAndNodeIDVariation", func(t *testing.T) {
715+
ctx := context.Background()
716+
config := &mockConfig{
717+
dialTimeout: 5 * time.Second,
718+
connectionTTL: 0,
719+
}
720+
pool := NewPool(ctx, config)
721+
defer func() {
722+
_ = pool.Release(ctx)
723+
}()
724+
725+
// initial two endpoints with IPv6 and distinct NodeIDs
726+
e1 := endpoint.New("e1.example:2135", endpoint.WithIPV6([]string{"2001:db8::1"}), endpoint.WithID(1))
727+
e2 := endpoint.New("e2.example:2135", endpoint.WithIPV6([]string{"2001:db8::2"}), endpoint.WithID(2))
728+
729+
// create initial connections
730+
initialConns := EndpointsToConnections(pool, []endpoint.Endpoint{e1, e2})
731+
require.Len(t, initialConns, 2)
732+
require.Equal(t, 2, pool.conns.Len())
733+
require.Equal(t, pool.Get(e1), initialConns[0])
734+
require.Equal(t, pool.Get(e2), initialConns[1])
735+
736+
// add a new unique endpoint e3 -> pool should grow
737+
e3 := endpoint.New("e3.example:2135", endpoint.WithIPV6([]string{"2001:db8::3"}), endpoint.WithID(3))
738+
connsAfterE3 := EndpointsToConnections(pool, []endpoint.Endpoint{e1, e2, e3})
739+
require.Len(t, connsAfterE3, 3)
740+
require.Equal(t, 3, pool.conns.Len())
741+
require.Equal(t, pool.Get(e3), connsAfterE3[2])
742+
743+
// now use same address as e1 but different NodeID (and same ipv6) -> should create new conn
744+
e1DifferentNode := endpoint.New("e1.example:2135", endpoint.WithIPV6([]string{"2001:db8::1"}), endpoint.WithID(99))
745+
connsAfterNodeChange := EndpointsToConnections(pool, []endpoint.Endpoint{e1DifferentNode})
746+
require.Len(t, connsAfterNodeChange, 1)
747+
// pool size must increase by one
748+
require.Equal(t, 4, pool.conns.Len())
749+
// returned conn corresponds to the new endpoint key
750+
require.Equal(t, pool.Get(e1DifferentNode), connsAfterNodeChange[0])
751+
require.Equal(t, pool.Get(e1), initialConns[0])
752+
})
753+
}

0 commit comments

Comments
 (0)