19
19
package org .neo4j .driver .v1 .integration ;
20
20
21
21
import org .junit .After ;
22
+ import org .junit .AfterClass ;
22
23
import org .junit .Rule ;
23
24
import org .junit .Test ;
24
25
35
36
36
37
import org .neo4j .driver .internal .cluster .RoutingSettings ;
37
38
import org .neo4j .driver .internal .retry .RetrySettings ;
38
- import org .neo4j .driver .internal .util .Clock ;
39
- import org .neo4j .driver .internal .util .ConnectionTrackingDriverFactory ;
40
39
import org .neo4j .driver .internal .util .FakeClock ;
41
40
import org .neo4j .driver .internal .util .ThrowingConnection ;
42
41
import org .neo4j .driver .internal .util .ThrowingConnectionDriverFactory ;
70
69
import static org .hamcrest .Matchers .instanceOf ;
71
70
import static org .hamcrest .Matchers .startsWith ;
72
71
import static org .junit .Assert .assertEquals ;
72
+ import static org .junit .Assert .assertFalse ;
73
73
import static org .junit .Assert .assertNotNull ;
74
74
import static org .junit .Assert .assertNull ;
75
75
import static org .junit .Assert .assertThat ;
@@ -97,6 +97,12 @@ public void tearDown()
97
97
}
98
98
}
99
99
100
+ @ AfterClass
101
+ public static void stopSharedCluster ()
102
+ {
103
+ ClusterRule .stopSharedCluster ();
104
+ }
105
+
100
106
@ Test
101
107
public void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfLeader () throws Exception
102
108
{
@@ -236,7 +242,7 @@ public Void apply( Session session )
236
242
}
237
243
238
244
@ Test
239
- public void shouldDropBrokenOldSessions () throws Exception
245
+ public void shouldDropBrokenOldConnections () throws Exception
240
246
{
241
247
Cluster cluster = clusterRule .getCluster ();
242
248
@@ -249,19 +255,24 @@ public void shouldDropBrokenOldSessions() throws Exception
249
255
.toConfig ();
250
256
251
257
FakeClock clock = new FakeClock ();
252
- ConnectionTrackingDriverFactory driverFactory = new ConnectionTrackingDriverFactory ( clock );
258
+ ThrowingConnectionDriverFactory driverFactory = new ThrowingConnectionDriverFactory ( clock );
253
259
254
260
URI routingUri = cluster .leader ().getRoutingUri ();
255
261
AuthToken auth = clusterRule .getDefaultAuthToken ();
256
262
RetrySettings retrySettings = RetrySettings .DEFAULT ;
257
263
258
264
try ( Driver driver = driverFactory .newInstance ( routingUri , auth , defaultRoutingSettings (), retrySettings , config ) )
259
265
{
260
- // create nodes in different threads using different sessions
266
+ // create nodes in different threads using different sessions and connections
261
267
createNodesInDifferentThreads ( concurrentSessionsCount , driver );
262
268
263
- // now pool contains many sessions, make them all invalid
264
- driverFactory .closeConnections ();
269
+ // now pool contains many connections, make them all invalid
270
+ List <ThrowingConnection > oldConnections = driverFactory .pollConnections ();
271
+ for ( ThrowingConnection oldConnection : oldConnections )
272
+ {
273
+ oldConnection .setNextResetError ( new ServiceUnavailableException ( "Unable to reset" ) );
274
+ }
275
+
265
276
// move clock forward more than configured liveness check timeout
266
277
clock .progress ( MINUTES .toMillis ( livenessCheckTimeoutMinutes + 1 ) );
267
278
@@ -273,6 +284,12 @@ public void shouldDropBrokenOldSessions() throws Exception
273
284
assertEquals ( 1 , records .size () );
274
285
assertEquals ( concurrentSessionsCount , records .get ( 0 ).get ( 0 ).asInt () );
275
286
}
287
+
288
+ // all old connections failed to reset and should be closed
289
+ for ( ThrowingConnection connection : oldConnections )
290
+ {
291
+ assertFalse ( connection .isOpen () );
292
+ }
276
293
}
277
294
}
278
295
@@ -573,7 +590,7 @@ public void shouldRediscoverWhenConnectionsToAllCoresBreak()
573
590
// make all those connections throw and seem broken
574
591
for ( ThrowingConnection connection : driverFactory .getConnections () )
575
592
{
576
- connection .setNextRunFailure ( new ServiceUnavailableException ( "Disconnected" ) );
593
+ connection .setNextRunError ( new ServiceUnavailableException ( "Disconnected" ) );
577
594
}
578
595
579
596
// observe that connection towards writer is broken
@@ -620,7 +637,7 @@ public void shouldKeepOperatingWhenConnectionsBreak() throws Exception
620
637
String value = "Tony Stark" ;
621
638
Cluster cluster = clusterRule .getCluster ();
622
639
623
- ConnectionTrackingDriverFactory driverFactory = new ConnectionTrackingDriverFactory ( Clock . SYSTEM );
640
+ ThrowingConnectionDriverFactory driverFactory = new ThrowingConnectionDriverFactory ( );
624
641
AtomicBoolean stop = new AtomicBoolean ();
625
642
executor = newExecutor ();
626
643
@@ -632,18 +649,22 @@ public void shouldKeepOperatingWhenConnectionsBreak() throws Exception
632
649
// launch writers and readers that use transaction functions and thus should never fail
633
650
for ( int i = 0 ; i < 3 ; i ++ )
634
651
{
635
- results .add ( executor .submit ( countNodesCallable ( driver , label , property , value , stop ) ) );
652
+ results .add ( executor .submit ( readNodesCallable ( driver , label , property , value , stop ) ) );
636
653
}
637
654
for ( int i = 0 ; i < 2 ; i ++ )
638
655
{
639
656
results .add ( executor .submit ( createNodesCallable ( driver , label , property , value , stop ) ) );
640
657
}
641
658
642
- // terminate connections while reads and writes are in progress
659
+ // make connections throw while reads and writes are in progress
643
660
long deadline = System .currentTimeMillis () + MINUTES .toMillis ( 1 );
644
661
while ( System .currentTimeMillis () < deadline && !stop .get () )
645
662
{
646
- driverFactory .closeConnections ();
663
+ List <ThrowingConnection > connections = driverFactory .pollConnections ();
664
+ for ( ThrowingConnection connection : connections )
665
+ {
666
+ connection .setNextRunError ( new ServiceUnavailableException ( "Unable to execute query" ) );
667
+ }
647
668
SECONDS .sleep ( 5 ); // sleep a bit to allow readers and writers to progress
648
669
}
649
670
stop .set ( true );
@@ -676,7 +697,7 @@ private static void setupLastConnectionToThrow( ThrowingConnectionDriverFactory
676
697
{
677
698
List <ThrowingConnection > connections = factory .getConnections ();
678
699
ThrowingConnection lastConnection = connections .get ( connections .size () - 1 );
679
- lastConnection .setNextRunFailure ( error );
700
+ lastConnection .setNextRunError ( error );
680
701
}
681
702
682
703
private int executeWriteAndReadThroughBolt ( ClusterMember member ) throws TimeoutException , InterruptedException
@@ -951,7 +972,7 @@ public Void call() throws Exception
951
972
};
952
973
}
953
974
954
- private static Callable <Void > countNodesCallable ( final Driver driver , final String label , final String property , final String value ,
975
+ private static Callable <Void > readNodesCallable ( final Driver driver , final String label , final String property , final String value ,
955
976
final AtomicBoolean stop )
956
977
{
957
978
return new Callable <Void >()
@@ -963,7 +984,7 @@ public Void call() throws Exception
963
984
{
964
985
try ( Session session = driver .session ( AccessMode .READ ) )
965
986
{
966
- countNodes ( session , label , property , value );
987
+ readNodeIds ( session , label , property , value );
967
988
}
968
989
catch ( Throwable t )
969
990
{
@@ -989,6 +1010,26 @@ public Void execute( Transaction tx )
989
1010
} );
990
1011
}
991
1012
1013
+ private static List <Long > readNodeIds ( final Session session , final String label , final String property , final String value )
1014
+ {
1015
+ return session .readTransaction ( new TransactionWork <List <Long >>()
1016
+ {
1017
+ @ Override
1018
+ public List <Long > execute ( Transaction tx )
1019
+ {
1020
+ StatementResult result = tx .run ( "MATCH (n:" + label + " {" + property + ": $value}) RETURN n LIMIT 5" ,
1021
+ parameters ( "value" , value ) );
1022
+
1023
+ List <Long > ids = new ArrayList <>();
1024
+ while ( result .hasNext () )
1025
+ {
1026
+ ids .add ( result .next ().get ( 0 ).asNode ().id () );
1027
+ }
1028
+ return ids ;
1029
+ }
1030
+ } );
1031
+ }
1032
+
992
1033
private static void updateNode ( final Session session , final String label , final String property , final String oldValue , final String newValue )
993
1034
{
994
1035
session .writeTransaction ( new TransactionWork <Void >()
0 commit comments