Skip to content

Commit efd4b71

Browse files
committed
Do not close connections concurrently in tests
Couple CC integration tests tried to simulate connection breakage by closing connections concurrently from a different thread. This was wrong because connections are not thread safe. It resulted in flaky tests. This commit replaces concurrent closing with just throwing errors from RUN and RESET. Connections with such errors will still be considered broken.
1 parent 8c72e38 commit efd4b71

File tree

5 files changed

+73
-123
lines changed

5 files changed

+73
-123
lines changed

driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingConnector.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java

Lines changed: 0 additions & 57 deletions
This file was deleted.

driver/src/test/java/org/neo4j/driver/internal/util/ThrowingConnection.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.util;
2020

2121
import java.util.Map;
22+
import java.util.concurrent.atomic.AtomicReference;
2223

2324
import org.neo4j.driver.internal.net.BoltServerAddress;
2425
import org.neo4j.driver.internal.spi.Collector;
@@ -29,11 +30,14 @@
2930
public class ThrowingConnection implements Connection
3031
{
3132
private final Connection realConnection;
32-
private RuntimeException nextRunFailure;
33+
private final AtomicReference<RuntimeException> nextRunError;
34+
private final AtomicReference<RuntimeException> nextResetError;
3335

3436
public ThrowingConnection( Connection realConnection )
3537
{
3638
this.realConnection = realConnection;
39+
this.nextRunError = new AtomicReference<>();
40+
this.nextResetError = new AtomicReference<>();
3741
}
3842

3943
@Override
@@ -45,12 +49,7 @@ public void init( String clientName, Map<String,Value> authToken )
4549
@Override
4650
public void run( String statement, Map<String,Value> parameters, Collector collector )
4751
{
48-
if ( nextRunFailure != null )
49-
{
50-
RuntimeException error = nextRunFailure;
51-
nextRunFailure = null;
52-
throw error;
53-
}
52+
throwErrorIfExists( nextRunError );
5453
realConnection.run( statement, parameters, collector );
5554
}
5655

@@ -69,6 +68,7 @@ public void pullAll( Collector collector )
6968
@Override
7069
public void reset()
7170
{
71+
throwErrorIfExists( nextResetError );
7272
realConnection.reset();
7373
}
7474

@@ -132,8 +132,22 @@ public BoltServerAddress boltServerAddress()
132132
return realConnection.boltServerAddress();
133133
}
134134

135-
public void setNextRunFailure( RuntimeException nextRunFailure )
135+
public void setNextRunError( RuntimeException error )
136+
{
137+
nextRunError.set( error );
138+
}
139+
140+
public void setNextResetError( RuntimeException error )
136141
{
137-
this.nextRunFailure = nextRunFailure;
142+
nextResetError.set( error );
143+
}
144+
145+
private static void throwErrorIfExists( AtomicReference<RuntimeException> errorReference )
146+
{
147+
RuntimeException error = errorReference.getAndSet( null );
148+
if ( error != null )
149+
{
150+
throw error;
151+
}
138152
}
139153
}

driver/src/test/java/org/neo4j/driver/internal/util/ThrowingConnectionDriverFactory.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,25 @@
3030

3131
public class ThrowingConnectionDriverFactory extends DriverFactory
3232
{
33+
private final Clock clock;
3334
private final List<ThrowingConnection> connections = new CopyOnWriteArrayList<>();
3435

36+
public ThrowingConnectionDriverFactory()
37+
{
38+
this( Clock.SYSTEM );
39+
}
40+
41+
public ThrowingConnectionDriverFactory( Clock clock )
42+
{
43+
this.clock = clock;
44+
}
45+
46+
@Override
47+
protected Clock createClock()
48+
{
49+
return clock;
50+
}
51+
3552
@Override
3653
protected Connector createConnector( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
3754
Logging logging )
@@ -44,4 +61,11 @@ public List<ThrowingConnection> getConnections()
4461
{
4562
return new ArrayList<>( connections );
4663
}
64+
65+
public List<ThrowingConnection> pollConnections()
66+
{
67+
List<ThrowingConnection> result = new ArrayList<>( connections );
68+
connections.clear();
69+
return result;
70+
}
4771
}

driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535

3636
import org.neo4j.driver.internal.cluster.RoutingSettings;
3737
import org.neo4j.driver.internal.retry.RetrySettings;
38-
import org.neo4j.driver.internal.util.Clock;
39-
import org.neo4j.driver.internal.util.ConnectionTrackingDriverFactory;
4038
import org.neo4j.driver.internal.util.FakeClock;
4139
import org.neo4j.driver.internal.util.ThrowingConnection;
4240
import org.neo4j.driver.internal.util.ThrowingConnectionDriverFactory;
@@ -70,6 +68,7 @@
7068
import static org.hamcrest.Matchers.instanceOf;
7169
import static org.hamcrest.Matchers.startsWith;
7270
import static org.junit.Assert.assertEquals;
71+
import static org.junit.Assert.assertFalse;
7372
import static org.junit.Assert.assertNotNull;
7473
import static org.junit.Assert.assertNull;
7574
import static org.junit.Assert.assertThat;
@@ -236,7 +235,7 @@ public Void apply( Session session )
236235
}
237236

238237
@Test
239-
public void shouldDropBrokenOldSessions() throws Exception
238+
public void shouldDropBrokenOldConnections() throws Exception
240239
{
241240
Cluster cluster = clusterRule.getCluster();
242241

@@ -249,19 +248,24 @@ public void shouldDropBrokenOldSessions() throws Exception
249248
.toConfig();
250249

251250
FakeClock clock = new FakeClock();
252-
ConnectionTrackingDriverFactory driverFactory = new ConnectionTrackingDriverFactory( clock );
251+
ThrowingConnectionDriverFactory driverFactory = new ThrowingConnectionDriverFactory( clock );
253252

254253
URI routingUri = cluster.leader().getRoutingUri();
255254
AuthToken auth = clusterRule.getDefaultAuthToken();
256255
RetrySettings retrySettings = RetrySettings.DEFAULT;
257256

258257
try ( Driver driver = driverFactory.newInstance( routingUri, auth, defaultRoutingSettings(), retrySettings, config ) )
259258
{
260-
// create nodes in different threads using different sessions
259+
// create nodes in different threads using different sessions and connections
261260
createNodesInDifferentThreads( concurrentSessionsCount, driver );
262261

263-
// now pool contains many sessions, make them all invalid
264-
driverFactory.closeConnections();
262+
// now pool contains many connections, make them all invalid
263+
List<ThrowingConnection> oldConnections = driverFactory.pollConnections();
264+
for ( ThrowingConnection oldConnection : oldConnections )
265+
{
266+
oldConnection.setNextResetError( new ServiceUnavailableException( "Unable to reset" ) );
267+
}
268+
265269
// move clock forward more than configured liveness check timeout
266270
clock.progress( MINUTES.toMillis( livenessCheckTimeoutMinutes + 1 ) );
267271

@@ -273,6 +277,12 @@ public void shouldDropBrokenOldSessions() throws Exception
273277
assertEquals( 1, records.size() );
274278
assertEquals( concurrentSessionsCount, records.get( 0 ).get( 0 ).asInt() );
275279
}
280+
281+
// all old connections failed to reset and should be closed
282+
for ( ThrowingConnection connection : oldConnections )
283+
{
284+
assertFalse( connection.isOpen() );
285+
}
276286
}
277287
}
278288

@@ -573,7 +583,7 @@ public void shouldRediscoverWhenConnectionsToAllCoresBreak()
573583
// make all those connections throw and seem broken
574584
for ( ThrowingConnection connection : driverFactory.getConnections() )
575585
{
576-
connection.setNextRunFailure( new ServiceUnavailableException( "Disconnected" ) );
586+
connection.setNextRunError( new ServiceUnavailableException( "Disconnected" ) );
577587
}
578588

579589
// observe that connection towards writer is broken
@@ -620,7 +630,7 @@ public void shouldKeepOperatingWhenConnectionsBreak() throws Exception
620630
String value = "Tony Stark";
621631
Cluster cluster = clusterRule.getCluster();
622632

623-
ConnectionTrackingDriverFactory driverFactory = new ConnectionTrackingDriverFactory( Clock.SYSTEM );
633+
ThrowingConnectionDriverFactory driverFactory = new ThrowingConnectionDriverFactory();
624634
AtomicBoolean stop = new AtomicBoolean();
625635
executor = newExecutor();
626636

@@ -639,11 +649,15 @@ public void shouldKeepOperatingWhenConnectionsBreak() throws Exception
639649
results.add( executor.submit( createNodesCallable( driver, label, property, value, stop ) ) );
640650
}
641651

642-
// terminate connections while reads and writes are in progress
652+
// make connections throw while reads and writes are in progress
643653
long deadline = System.currentTimeMillis() + MINUTES.toMillis( 1 );
644654
while ( System.currentTimeMillis() < deadline && !stop.get() )
645655
{
646-
driverFactory.closeConnections();
656+
List<ThrowingConnection> connections = driverFactory.pollConnections();
657+
for ( ThrowingConnection connection : connections )
658+
{
659+
connection.setNextRunError( new ServiceUnavailableException( "Unable to execute query" ) );
660+
}
647661
SECONDS.sleep( 5 ); // sleep a bit to allow readers and writers to progress
648662
}
649663
stop.set( true );
@@ -676,7 +690,7 @@ private static void setupLastConnectionToThrow( ThrowingConnectionDriverFactory
676690
{
677691
List<ThrowingConnection> connections = factory.getConnections();
678692
ThrowingConnection lastConnection = connections.get( connections.size() - 1 );
679-
lastConnection.setNextRunFailure( error );
693+
lastConnection.setNextRunError( error );
680694
}
681695

682696
private int executeWriteAndReadThroughBolt( ClusterMember member ) throws TimeoutException, InterruptedException

0 commit comments

Comments
 (0)