Skip to content

Commit d05c6d2

Browse files
authored
feat: add HTTP connection interceptor which TTL configuration (#643)
1 parent 997599d commit d05c6d2

File tree

3 files changed

+317
-0
lines changed

3 files changed

+317
-0
lines changed

CHANGELOG.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,76 @@
11
## 6.12.0 [unreleased]
22

3+
### Features
4+
1. [#643](https://github.com/influxdata/influxdb-client-java/pull/643): `ConnectionClosingInterceptor` interceptor closes connections that exceed
5+
a specified maximum lifetime age (TTL). It's beneficial for scenarios where your application requires establishing new connections to the same host after
6+
a predetermined interval.
7+
8+
The connection to the InfluxDB Enterprise with the `ConnectionClosingInterceptor` can be configured as follows:
9+
```java
10+
package example;
11+
12+
import java.time.Duration;
13+
import java.util.Collections;
14+
15+
import okhttp3.OkHttpClient;
16+
import okhttp3.Protocol;
17+
18+
import com.influxdb.client.InfluxDBClient;
19+
import com.influxdb.client.InfluxDBClientFactory;
20+
import com.influxdb.client.InfluxDBClientOptions;
21+
import com.influxdb.client.domain.WriteConsistency;
22+
import com.influxdb.rest.ConnectionClosingInterceptor;
23+
24+
public class InfluxQLExample {
25+
26+
public static void main(final String[] args) throws InterruptedException {
27+
28+
//
29+
// Credentials to connect to InfluxDB Enterprise
30+
//
31+
String url = "https://localhost:8086";
32+
String username = "admin";
33+
String password = "password";
34+
String database = "database";
35+
WriteConsistency consistency = WriteConsistency.ALL;
36+
37+
//
38+
// Configure underlying HTTP client
39+
//
40+
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder()
41+
.protocols(Collections.singletonList(Protocol.HTTP_1_1));
42+
43+
//
44+
// Use new Connection TTL feature
45+
//
46+
Duration connectionMaxAge = Duration.ofMinutes(1);
47+
ConnectionClosingInterceptor interceptor = new ConnectionClosingInterceptor(connectionMaxAge);
48+
okHttpClientBuilder
49+
.addNetworkInterceptor(interceptor)
50+
.eventListenerFactory(call -> interceptor);
51+
52+
//
53+
// Configure InfluxDB client
54+
//
55+
InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder()
56+
.url(url)
57+
.org("-")
58+
.authenticateToken(String.format("%s:%s", username, password).toCharArray())
59+
.bucket(String.format("%s/%s", database, ""))
60+
.consistency(consistency)
61+
.okHttpClient(okHttpClientBuilder);
62+
63+
//
64+
// Create client and write data
65+
//
66+
try (InfluxDBClient client = InfluxDBClientFactory.create(optionsBuilder.build())) {
67+
68+
// ...
69+
}
70+
}
71+
}
72+
```
73+
374
## 6.11.0 [2023-12-05]
475

576
### Features
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* The MIT License
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy
5+
* of this software and associated documentation files (the "Software"), to deal
6+
* in the Software without restriction, including without limitation the rights
7+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
* copies of the Software, and to permit persons to whom the Software is
9+
* furnished to do so, subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in
12+
* all copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
* THE SOFTWARE.
21+
*/
22+
package com.influxdb.rest;
23+
24+
import java.io.IOException;
25+
import java.time.Duration;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.ConcurrentMap;
28+
import java.util.logging.Logger;
29+
import javax.annotation.Nonnull;
30+
31+
import okhttp3.Call;
32+
import okhttp3.Connection;
33+
import okhttp3.EventListener;
34+
import okhttp3.Interceptor;
35+
import okhttp3.Response;
36+
import okhttp3.internal.connection.RealConnection;
37+
38+
/**
39+
* This interceptor closes connections that exceed a specified maximum lifetime age (TTL). It's beneficial for
40+
* scenarios where your application requires establishing new connections to the same host after a predetermined
41+
* interval. This interceptor is most effective in applications that use a single connection, meaning requests
42+
* are not made in parallel.
43+
* <p>
44+
* Caution is advised, as setting a very short interval can lead to performance issues because
45+
* establishing new connections is a resource-intensive operation.
46+
*/
47+
public class ConnectionClosingInterceptor extends EventListener implements Interceptor {
48+
49+
private static final Logger LOG = Logger.getLogger(ConnectionClosingInterceptor.class.getName());
50+
51+
private final ConcurrentMap<Connection, Long> connectionTimes = new ConcurrentHashMap<>();
52+
private final long connectionMaxAgeMillis;
53+
54+
/**
55+
* Create a new interceptor that will close connections older than the given max age.
56+
*
57+
* @param connectionMaxAge the max age of connections, the precision is milliseconds
58+
*/
59+
public ConnectionClosingInterceptor(@Nonnull final Duration connectionMaxAge) {
60+
this.connectionMaxAgeMillis = connectionMaxAge.toMillis();
61+
}
62+
63+
@Override
64+
@Nonnull
65+
public Response intercept(@Nonnull final Chain chain) throws IOException {
66+
Connection connection = chain.connection();
67+
68+
//
69+
// If the connection is old, mark it to not be reused.
70+
//
71+
if (connection != null && isConnectionOld(connection)) {
72+
if (connection instanceof RealConnection) {
73+
LOG.fine("Marking connection to not be reused: " + connection);
74+
((RealConnection) connection).noNewExchanges$okhttp();
75+
connectionTimes.remove(connection);
76+
} else {
77+
LOG.warning("Unable to mark connection to not be reused: " + connection);
78+
}
79+
}
80+
81+
return chain.proceed(chain.request());
82+
}
83+
84+
@Override
85+
public void connectionAcquired(@Nonnull final Call call, @Nonnull final Connection connection) {
86+
connectionTimes.putIfAbsent(connection, System.currentTimeMillis());
87+
}
88+
89+
/**
90+
* Check if the connection is older than the max age.
91+
*
92+
* @param connection the connection to check
93+
* @return true if the connection is older than the max age
94+
*/
95+
private boolean isConnectionOld(@Nonnull final Connection connection) {
96+
Long time = connectionTimes.get(connection);
97+
if (time == null) {
98+
return false;
99+
}
100+
long age = System.currentTimeMillis() - time;
101+
return age > connectionMaxAgeMillis;
102+
}
103+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* The MIT License
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy
5+
* of this software and associated documentation files (the "Software"), to deal
6+
* in the Software without restriction, including without limitation the rights
7+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
* copies of the Software, and to permit persons to whom the Software is
9+
* furnished to do so, subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in
12+
* all copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
* THE SOFTWARE.
21+
*/
22+
package com.influxdb.rest;
23+
24+
import java.io.IOException;
25+
import java.time.Duration;
26+
import java.util.Collections;
27+
import java.util.HashSet;
28+
import java.util.Set;
29+
import java.util.logging.Logger;
30+
import javax.annotation.Nonnull;
31+
32+
import okhttp3.Call;
33+
import okhttp3.Connection;
34+
import okhttp3.EventListener;
35+
import okhttp3.OkHttpClient;
36+
import okhttp3.Protocol;
37+
import okhttp3.Request;
38+
import okhttp3.Response;
39+
import org.assertj.core.api.Assertions;
40+
import org.jetbrains.annotations.NotNull;
41+
import org.junit.jupiter.api.AfterEach;
42+
import org.junit.jupiter.api.BeforeEach;
43+
import org.junit.jupiter.api.Test;
44+
45+
import com.influxdb.test.AbstractMockServerTest;
46+
47+
class ITConnectionClosingInterceptor extends AbstractMockServerTest {
48+
49+
private static final Logger LOG = Logger.getLogger(ITConnectionClosingInterceptor.class.getName());
50+
51+
private String url;
52+
private OkHttpClient client;
53+
private ConnectionsListener connectionsListener;
54+
55+
@BeforeEach
56+
void setUp() {
57+
connectionsListener = new ConnectionsListener();
58+
url = startMockServer();
59+
}
60+
61+
@AfterEach
62+
void tearDown() {
63+
client.connectionPool().evictAll();
64+
client.dispatcher().executorService().shutdown();
65+
}
66+
67+
@Test
68+
public void withoutTTLonConnection() throws Exception {
69+
70+
client = new OkHttpClient.Builder()
71+
.eventListener(connectionsListener)
72+
.build();
73+
74+
callApi(5, 3);
75+
76+
Assertions.assertThat(connectionsListener.connections).hasSize(1);
77+
Assertions.assertThat(client.connectionPool().connectionCount()).isEqualTo(1);
78+
}
79+
80+
@Test
81+
public void withTTLonConnection() throws Exception {
82+
83+
// Use connection TTL of 2 second
84+
ConnectionClosingInterceptor interceptor = new ConnectionClosingInterceptor(Duration.ofSeconds(2)) {
85+
86+
@Override
87+
public void connectionAcquired(@NotNull Call call, @NotNull Connection connection) {
88+
super.connectionAcquired(call, connection);
89+
90+
// count the number of connections, the okhttp client can have only one listener => we have to use this
91+
connectionsListener.connections.add(connection);
92+
}
93+
};
94+
95+
client = new OkHttpClient.Builder()
96+
.addNetworkInterceptor(interceptor)
97+
.eventListener(interceptor)
98+
.protocols(Collections.singletonList(Protocol.HTTP_1_1))
99+
.build();
100+
101+
callApi(5, 3);
102+
103+
Assertions.assertThat(connectionsListener.connections).hasSize(3);
104+
Assertions.assertThat(client.connectionPool().connectionCount()).isEqualTo(1);
105+
}
106+
107+
/**
108+
* Call API by specified times.
109+
*
110+
* @param times the number of times to call API
111+
* @param sleepSeconds the number of seconds to sleep between calls
112+
* @throws IOException if an error occurs
113+
*/
114+
private void callApi(final int times, final int sleepSeconds) throws Exception {
115+
for (int i = 0; i < times; i++) {
116+
mockServer.enqueue(createResponse(""));
117+
118+
Request request = new Request.Builder()
119+
.url(url)
120+
.build();
121+
122+
LOG.info(String.format("Calling API %d", i));
123+
try (Response response = client.newCall(request).execute()) {
124+
Assertions.assertThat(response.isSuccessful()).isTrue();
125+
}
126+
127+
LOG.info(String.format("Sleeping %d seconds; connection counts: %d", sleepSeconds, connectionsListener.connections.size()));
128+
Thread.sleep(sleepSeconds * 1000L);
129+
}
130+
}
131+
132+
/**
133+
* Event listener that store acquired connections.
134+
*/
135+
private static class ConnectionsListener extends EventListener {
136+
private final Set<Connection> connections = new HashSet<>();
137+
138+
@Override
139+
public void connectionAcquired(@Nonnull final Call call, @Nonnull final Connection connection) {
140+
connections.add(connection);
141+
}
142+
}
143+
}

0 commit comments

Comments
 (0)