@@ -57,14 +57,15 @@ func TestGossip(t *testing.T) {
5757 sort .Ints (tcpPorts )
5858
5959 // wait for convergence
60- converge (5 * time .Second , nsqds , convergenceTester .c , func () bool {
60+ converged := converge (5 * time .Second , nsqds , convergenceTester .c , func () bool {
6161 for _ , nsqd := range nsqds {
6262 if len (nsqd .rdb .FindProducers ("client" , "" , "" )) != num {
6363 return false
6464 }
6565 }
6666 return true
6767 })
68+ equal (t , converged , true )
6869
6970 // all nodes in the cluster should have registrations
7071 for _ , nsqd := range nsqds {
@@ -85,7 +86,7 @@ func TestGossip(t *testing.T) {
8586 topic .GetChannel ("ch" )
8687 firstPort := nsqds [0 ].tcpListener .Addr ().(* net.TCPAddr ).Port
8788
88- converge (10 * time .Second , nsqds , convergenceTester .c , func () bool {
89+ converged = converge (10 * time .Second , nsqds , convergenceTester .c , func () bool {
8990 for _ , nsqd := range nsqds {
9091 if len (nsqd .rdb .FindProducers ("topic" , topicName , "" )) != 1 ||
9192 len (nsqd .rdb .FindProducers ("channel" , topicName , "ch" )) != 1 {
@@ -94,6 +95,7 @@ func TestGossip(t *testing.T) {
9495 }
9596 return true
9697 })
98+ equal (t , converged , true )
9799
98100 for _ , nsqd := range nsqds {
99101 producers := nsqd .rdb .FindProducers ("topic" , topicName , "" )
@@ -127,6 +129,10 @@ func TestGossipResync(t *testing.T) {
127129 opts .Logger = newTestLogger (t )
128130 opts .GossipAddress = addr .String ()
129131 opts .BroadcastAddress = "127.0.0.1"
132+ opts .GossipReapInterval = 200 * time .Millisecond
133+ opts .GossipReconnectTimeout = 100 * time .Millisecond
134+ opts .GossipSuspicionMult = 1
135+ opts .GossipProbeInterval = 100 * time .Millisecond
130136 opts .gossipDelegate = convergenceTester
131137 if seedNode != nil {
132138 opts .GossipSeedAddresses = []string {seedNode .getOpts ().GossipAddress }
@@ -150,7 +156,7 @@ func TestGossipResync(t *testing.T) {
150156 topic .GetChannel ("ch" )
151157 firstPort := nsqds [0 ].tcpListener .Addr ().(* net.TCPAddr ).Port
152158
153- converge (10 * time .Second , nsqds , convergenceTester .c , func () bool {
159+ converged := converge (10 * time .Second , nsqds , convergenceTester .c , func () bool {
154160 for _ , nsqd := range nsqds {
155161 if len (nsqd .rdb .FindProducers ("topic" , topicName , "" )) != 1 ||
156162 len (nsqd .rdb .FindProducers ("channel" , topicName , "ch" )) != 1 {
@@ -159,6 +165,7 @@ func TestGossipResync(t *testing.T) {
159165 }
160166 return true
161167 })
168+ equal (t , converged , true )
162169
163170 for _ , nsqd := range nsqds {
164171 producers := nsqd .rdb .FindProducers ("topic" , topicName , "" )
@@ -175,32 +182,34 @@ func TestGossipResync(t *testing.T) {
175182 stillAlive := nsqds [:num - 1 ]
176183
177184 // check that other nodes see it as closed
178- converge (10 * time .Second , stillAlive , convergenceTester .c , func () bool {
185+ converged = converge (10 * time .Second , stillAlive , convergenceTester .c , func () bool {
179186 for _ , nsqd := range stillAlive {
180187 if len (nsqd .serf .Members ()) != len (stillAlive ) {
181188 return false
182189 }
183190 }
184191 return true
185192 })
193+ equal (t , converged , true )
186194
187195 // restart stopped node
188196 _ , _ , nsqd := mustStartNSQD (nsqds [num - 1 ].getOpts ())
189197 defer nsqd .Exit ()
190198 nsqds [num - 1 ] = nsqd
191199
192200 // check that other nodes see it as back open
193- converge (10 * time .Second , nsqds , convergenceTester .c , func () bool {
201+ converged = converge (10 * time .Second , nsqds , convergenceTester .c , func () bool {
194202 for _ , nsqd := range nsqds {
195203 if len (nsqd .serf .Members ()) != len (nsqds ) {
196204 return false
197205 }
198206 }
199207 return true
200208 })
209+ equal (t , converged , true )
201210
202211 // check that all nodes see the restarted first node
203- converge (10 * time .Second , nsqds , convergenceTester .c , func () bool {
212+ converged = converge (10 * time .Second , nsqds , convergenceTester .c , func () bool {
204213 for _ , nsqd := range nsqds {
205214 if len (nsqd .rdb .FindProducers ("topic" , topicName , "" )) != 1 ||
206215 len (nsqd .rdb .FindProducers ("channel" , topicName , "ch" )) != 1 {
@@ -209,6 +218,7 @@ func TestGossipResync(t *testing.T) {
209218 }
210219 return true
211220 })
221+ equal (t , converged , true )
212222
213223 // we should have producers for the topic/channel back now
214224 for _ , nsqd := range nsqds {
@@ -266,7 +276,7 @@ func TestRegossip(t *testing.T) {
266276 topic .GetChannel ("ch" )
267277 firstPort := nsqds [0 ].tcpListener .Addr ().(* net.TCPAddr ).Port
268278
269- converge (10 * time .Second , nsqds , convergenceTester .c , func () bool {
279+ converged := converge (10 * time .Second , nsqds , convergenceTester .c , func () bool {
270280 for _ , nsqd := range nsqds {
271281 if len (nsqd .rdb .FindProducers ("topic" , topicName , "" )) != 1 ||
272282 len (nsqd .rdb .FindProducers ("channel" , topicName , "ch" )) != 1 {
@@ -275,6 +285,7 @@ func TestRegossip(t *testing.T) {
275285 }
276286 return true
277287 })
288+ equal (t , converged , true )
278289
279290 for _ , nsqd := range nsqds {
280291 producers := nsqd .rdb .FindProducers ("topic" , topicName , "" )
@@ -294,7 +305,7 @@ func TestRegossip(t *testing.T) {
294305 }
295306
296307 // wait for regossip
297- converge (10 * time .Second , nsqds , convergenceTester .c , func () bool {
308+ converged = converge (10 * time .Second , nsqds , convergenceTester .c , func () bool {
298309 for _ , nsqd := range nsqds {
299310 if len (nsqd .rdb .FindProducers ("topic" , topicName , "" )) != 1 ||
300311 len (nsqd .rdb .FindProducers ("channel" , topicName , "ch" )) != 1 {
@@ -303,6 +314,7 @@ func TestRegossip(t *testing.T) {
303314 }
304315 return true
305316 })
317+ equal (t , converged , true )
306318
307319 // we should have producers for the topic/channel back now on all nodes
308320 for _ , nsqd := range nsqds {
@@ -316,16 +328,17 @@ func TestRegossip(t *testing.T) {
316328 }
317329}
318330
319- func converge (timeout time.Duration , nsqds []* NSQD , notifyChan chan struct {}, isConverged func () bool ) {
320- // wait for convergence
321- converged := false
322- t := time .NewTimer (timeout )
323- for ! converged {
331+ func converge (timeout time.Duration , nsqds []* NSQD , notifyChan chan struct {}, isConverged func () bool ) bool {
332+ for {
324333 select {
325- case <- t . C :
326- converged = true
334+ case <- time . After ( timeout ) :
335+ return false
327336 case <- notifyChan :
328- converged = isConverged ()
337+ if isConverged () {
338+ goto exit
339+ }
329340 }
330341 }
342+ exit:
343+ return true
331344}
0 commit comments