@@ -612,3 +612,138 @@ func TestPool_ConnParker(t *testing.T) {
612612 require .Equal (t , 3 , tickCount )
613613 })
614614}
615+
616+ func TestEndpointsToConnections (t * testing.T ) {
617+ t .Run ("CreatesConnectionsForEndpoints" , func (t * testing.T ) {
618+ ctx := context .Background ()
619+ config := & mockConfig {
620+ dialTimeout : 5 * time .Second ,
621+ connectionTTL : 0 ,
622+ }
623+ pool := NewPool (ctx , config )
624+ defer func () {
625+ _ = pool .Release (ctx )
626+ }()
627+
628+ require .Equal (t , 0 , pool .conns .Len ())
629+
630+ e1 := endpoint .New ("e1:2135" )
631+ e2 := endpoint .New ("e2:2135" )
632+
633+ conns := EndpointsToConnections (pool , []endpoint.Endpoint {e1 , e2 })
634+
635+ require .Len (t , conns , 2 )
636+ require .Equal (t , 2 , pool .conns .Len ())
637+
638+ require .Equal (t , pool .Get (e1 ), conns [0 ])
639+ require .Equal (t , pool .Get (e2 ), conns [1 ])
640+ })
641+
642+ t .Run ("ReusesExistingConnections" , func (t * testing.T ) {
643+ ctx := context .Background ()
644+ config := & mockConfig {
645+ dialTimeout : 5 * time .Second ,
646+ connectionTTL : 0 ,
647+ }
648+ pool := NewPool (ctx , config )
649+ defer func () {
650+ _ = pool .Release (ctx )
651+ }()
652+
653+ e := endpoint .New ("reuse:2135" )
654+
655+ existing := pool .Get (e )
656+ require .NotNil (t , existing )
657+
658+ initialLen := pool .conns .Len ()
659+
660+ conns := EndpointsToConnections (pool , []endpoint.Endpoint {e })
661+
662+ require .Len (t , conns , 1 )
663+ require .Equal (t , existing , conns [0 ])
664+
665+ require .Equal (t , initialLen , pool .conns .Len ())
666+ })
667+
668+ t .Run ("IPv6AndHostOverrideUniqueKeys" , func (t * testing.T ) {
669+ ctx := context .Background ()
670+ config := & mockConfig {
671+ dialTimeout : 5 * time .Second ,
672+ connectionTTL : 0 ,
673+ }
674+ pool := NewPool (ctx , config )
675+ defer func () {
676+ _ = pool .Release (ctx )
677+ }()
678+
679+ // ensure empty pool
680+ require .Equal (t , 0 , pool .conns .Len ())
681+
682+ // address is a dns-style host:port, ipv6 provides resolved ip used in Key.Address()
683+ e1 := endpoint .New ("example.com:2135" , endpoint .WithIPV6 ([]string {"2001:db8::1" }))
684+ // if node is rebooted with different ssl name override, we need a different connection
685+ e2 := endpoint .New ("example.com:2135" , endpoint .WithIPV6 ([]string {"2001:db8::1" }), endpoint .WithSslTargetNameOverride ("override" ))
686+ // different ipv6 -> different Address()
687+ e3 := endpoint .New ("example.com:2135" , endpoint .WithIPV6 ([]string {"2001:db8::2" }), endpoint .WithID (2 ))
688+ // same ipv6 as e1 but different NodeID -> different Key.NodeID
689+ e4 := endpoint .New ("example.com:2135" , endpoint .WithIPV6 ([]string {"2001:db8::1" }), endpoint .WithID (1 ))
690+
691+ endpoints := []endpoint.Endpoint {e1 , e2 , e3 , e4 }
692+ conns := EndpointsToConnections (pool , endpoints )
693+
694+ require .Len (t , conns , len (endpoints ))
695+ require .Equal (t , 4 , pool .conns .Len ())
696+
697+ for i , e := range endpoints {
698+ got := conns [i ]
699+ require .NotNil (t , got )
700+ require .Equal (t , pool .Get (e ), got )
701+ cc , ok := pool .conns .Get (e .Key ())
702+ require .True (t , ok )
703+ require .Equal (t , cc , got )
704+ }
705+
706+ require .Equal (t , e2 .Key ().HostOverride , "override" )
707+ require .Equal (t , e4 .Key ().NodeID , uint32 (1 ))
708+ })
709+
710+ t .Run ("AddNewEndpointAndNodeIDVariation" , func (t * testing.T ) {
711+ ctx := context .Background ()
712+ config := & mockConfig {
713+ dialTimeout : 5 * time .Second ,
714+ connectionTTL : 0 ,
715+ }
716+ pool := NewPool (ctx , config )
717+ defer func () {
718+ _ = pool .Release (ctx )
719+ }()
720+
721+ // initial two endpoints with IPv6 and distinct NodeIDs
722+ e1 := endpoint .New ("e1.example:2135" , endpoint .WithIPV6 ([]string {"2001:db8::1" }), endpoint .WithID (1 ))
723+ e2 := endpoint .New ("e2.example:2135" , endpoint .WithIPV6 ([]string {"2001:db8::2" }), endpoint .WithID (2 ))
724+
725+ // create initial connections
726+ initialConns := EndpointsToConnections (pool , []endpoint.Endpoint {e1 , e2 })
727+ require .Len (t , initialConns , 2 )
728+ require .Equal (t , 2 , pool .conns .Len ())
729+ require .Equal (t , pool .Get (e1 ), initialConns [0 ])
730+ require .Equal (t , pool .Get (e2 ), initialConns [1 ])
731+
732+ // add a new unique endpoint e3 -> pool should grow
733+ e3 := endpoint .New ("e3.example:2135" , endpoint .WithIPV6 ([]string {"2001:db8::3" }), endpoint .WithID (3 ))
734+ connsAfterE3 := EndpointsToConnections (pool , []endpoint.Endpoint {e1 , e2 , e3 })
735+ require .Len (t , connsAfterE3 , 3 )
736+ require .Equal (t , 3 , pool .conns .Len ())
737+ require .Equal (t , pool .Get (e3 ), connsAfterE3 [2 ])
738+
739+ // now use same address as e1 but different NodeID (and same ipv6) -> should create new conn
740+ e1DifferentNode := endpoint .New ("e1.example:2135" , endpoint .WithIPV6 ([]string {"2001:db8::1" }), endpoint .WithID (99 ))
741+ connsAfterNodeChange := EndpointsToConnections (pool , []endpoint.Endpoint {e1DifferentNode })
742+ require .Len (t , connsAfterNodeChange , 1 )
743+ // pool size must increase by one
744+ require .Equal (t , 4 , pool .conns .Len ())
745+ // returned conn corresponds to the new endpoint key
746+ require .Equal (t , pool .Get (e1DifferentNode ), connsAfterNodeChange [0 ])
747+ require .Equal (t , pool .Get (e1 ), initialConns [0 ])
748+ })
749+ }
0 commit comments