@@ -20,28 +20,45 @@ package com.expedia.www.haystack.trace.storage.backends.cassandra.client
20
20
import java .nio .ByteBuffer
21
21
import java .util .Date
22
22
23
- import com .datastax .driver .core .BatchStatement .Type
24
23
import com .datastax .driver .core ._
24
+ import com .datastax .driver .core .exceptions .NoHostAvailableException
25
25
import com .datastax .driver .core .querybuilder .QueryBuilder
26
26
import com .expedia .www .haystack .trace .storage .backends .cassandra .config .entities .{ClientConfiguration , KeyspaceConfiguration }
27
27
import org .slf4j .LoggerFactory
28
28
import com .expedia .www .haystack .trace .storage .backends .cassandra .client .CassandraTableSchema ._
29
29
30
30
import scala .collection .JavaConverters ._
31
- import scala .util .Try
31
+ import scala .util .{ Failure , Success , Try }
32
32
33
- class CassandraSession ( config : ClientConfiguration , factory : ClusterFactory ) {
33
+ object CassandraSession {
34
34
private val LOGGER = LoggerFactory .getLogger(classOf [CassandraSession ])
35
35
36
+ def connect (config : ClientConfiguration ,
37
+ factory : ClusterFactory ): (Cluster , Session ) = this .synchronized {
38
+ def tryConnect (): (Cluster , Session ) = {
39
+ val cluster = factory.buildCluster(config)
40
+ Try (cluster.connect()) match {
41
+ case Success (session) => (cluster, session)
42
+ case Failure (e : NoHostAvailableException ) =>
43
+ LOGGER .warn(" Failed to connect to cassandra. Will try again" , e)
44
+ Thread .sleep(5000 )
45
+ tryConnect()
46
+ case Failure (e) => throw e
47
+ }
48
+ }
49
+
50
+ tryConnect()
51
+ }
52
+ }
53
+
54
+ class CassandraSession (config : ClientConfiguration , factory : ClusterFactory ) {
55
+ import CassandraSession ._
56
+
36
57
/**
37
58
* builds a session object to interact with cassandra cluster
38
59
* Also ensure that keyspace and table names exists in cassandra.
39
60
*/
40
- private val (cluster, session) = {
41
- val cluster = factory.buildCluster(config)
42
- val newSession = cluster.connect()
43
- (cluster, newSession)
44
- }
61
+ lazy val (cluster, session) = connect(config, factory)
45
62
46
63
def ensureKeyspace (keyspace : KeyspaceConfiguration ): Unit = {
47
64
LOGGER .info(" ensuring kespace exists with {}" , keyspace)
@@ -118,6 +135,4 @@ class CassandraSession(config: ClientConfiguration, factory: ClusterFactory) {
118
135
* @return future object of ResultSet
119
136
*/
120
137
def executeAsync (statement : Statement ): ResultSetFuture = session.executeAsync(statement)
121
-
122
-
123
138
}
0 commit comments