Skip to content

Commit 6ef5e63

Browse files
authored
fix: use Source.unfoldResource of source for Querying [scala] (#199)
1 parent 58e3d53 commit 6ef5e63

File tree

10 files changed

+406
-202
lines changed

10 files changed

+406
-202
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ tree.txt
1414

1515
**/.openapi-generator*
1616
**/swagger2.yml
17-
**/response.json
17+
**/response.json
18+
**/test.txt

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
### Bug Fixes
77
1. [#196](https://github.com/influxdata/influxdb-client-java/issues/196): Removed badly licenced JSON-Java library
8+
1. [#199](https://github.com/influxdata/influxdb-client-java/pull/199): Correct implementation of Backpressure for Scala Querying
89

910
### CI
1011
1. [#203](https://github.com/influxdata/influxdb-client-java/pull/203): Updated stable image to `influxdb:latest` and nightly to `quay.io/influxdb/influxdb:nightly`

client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java

Lines changed: 181 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,13 @@
2121
*/
2222
package com.influxdb.internal;
2323

24+
import java.io.Closeable;
2425
import java.io.IOException;
26+
import java.io.InputStreamReader;
27+
import java.io.Reader;
28+
import java.nio.charset.StandardCharsets;
2529
import java.util.HashMap;
30+
import java.util.Iterator;
2631
import java.util.Map;
2732
import java.util.function.BiConsumer;
2833
import java.util.function.Consumer;
@@ -34,6 +39,7 @@
3439
import com.influxdb.Arguments;
3540
import com.influxdb.Cancellable;
3641
import com.influxdb.exceptions.InfluxException;
42+
import com.influxdb.query.FluxRecord;
3743
import com.influxdb.query.internal.FluxCsvParser;
3844
import com.influxdb.query.internal.FluxResultMapper;
3945

@@ -44,6 +50,9 @@
4450
import okhttp3.RequestBody;
4551
import okhttp3.ResponseBody;
4652
import okio.BufferedSource;
53+
import org.apache.commons.csv.CSVFormat;
54+
import org.apache.commons.csv.CSVParser;
55+
import org.apache.commons.csv.CSVRecord;
4756
import retrofit2.Call;
4857
import retrofit2.Callback;
4958
import retrofit2.Response;
@@ -72,7 +81,7 @@ public abstract class AbstractQueryApi extends AbstractRestClient {
7281
DEFAULT_DIALECT = new GsonBuilder().create().toJson(dialect);
7382
}
7483

75-
protected static final Consumer<Throwable> ERROR_CONSUMER = throwable -> {
84+
protected static final Consumer<Throwable> ERROR_CONSUMER = throwable -> {
7685
if (throwable instanceof InfluxException) {
7786
throw (InfluxException) throwable;
7887
} else {
@@ -113,6 +122,10 @@ protected void query(@Nonnull final Call<ResponseBody> queryCall,
113122
query(queryCall, consumer, onError, onComplete, asynchronously);
114123
}
115124

125+
protected FluxRecordIterator queryIterator(@Nonnull final Call<ResponseBody> queryCall) {
126+
return new FluxRecordIterator(queryCall, ERROR_CONSUMER);
127+
}
128+
116129
protected void queryRaw(@Nonnull final Call<ResponseBody> queryCall,
117130
@Nonnull final BiConsumer<Cancellable, String> onResponse,
118131
@Nonnull final Consumer<? super Throwable> onError,
@@ -131,11 +144,15 @@ protected void queryRaw(@Nonnull final Call<ResponseBody> queryCall,
131144
query(queryCall, consumer, onError, onComplete, asynchronously);
132145
}
133146

134-
protected void query(@Nonnull final Call<ResponseBody> query,
135-
@Nonnull final BiConsumer<Cancellable, BufferedSource> consumer,
136-
@Nonnull final Consumer<? super Throwable> onError,
137-
@Nonnull final Runnable onComplete,
138-
@Nonnull final Boolean asynchronously) {
147+
protected RawIterator queryRawIterator(@Nonnull final Call<ResponseBody> queryCall) {
148+
return new RawIterator(queryCall, ERROR_CONSUMER);
149+
}
150+
151+
private void query(@Nonnull final Call<ResponseBody> query,
152+
@Nonnull final BiConsumer<Cancellable, BufferedSource> consumer,
153+
@Nonnull final Consumer<? super Throwable> onError,
154+
@Nonnull final Runnable onComplete,
155+
@Nonnull final Boolean asynchronously) {
139156

140157
Arguments.checkNotNull(query, "query");
141158
Arguments.checkNotNull(consumer, "consumer");
@@ -145,6 +162,46 @@ protected void query(@Nonnull final Call<ResponseBody> query,
145162

146163
DefaultCancellable cancellable = new DefaultCancellable();
147164

165+
Consumer<ResponseBody> bodyConsumer = body -> {
166+
try {
167+
BufferedSource source = body.source();
168+
169+
//
170+
// Source has data => parse
171+
//
172+
while (source.isOpen() && !source.exhausted() && !cancellable.wasCancelled) {
173+
174+
consumer.accept(cancellable, source);
175+
}
176+
177+
if (!cancellable.wasCancelled) {
178+
onComplete.run();
179+
}
180+
181+
} catch (Exception e) {
182+
catchOrPropagateException(e, onError);
183+
184+
} finally {
185+
186+
body.close();
187+
}
188+
};
189+
190+
query(query, bodyConsumer, onError, onComplete, asynchronously);
191+
}
192+
193+
private void query(@Nonnull final Call<ResponseBody> query,
194+
@Nonnull final Consumer<ResponseBody> consumer,
195+
@Nonnull final Consumer<? super Throwable> onError,
196+
@Nonnull final Runnable onComplete,
197+
@Nonnull final Boolean asynchronously) {
198+
199+
Arguments.checkNotNull(query, "query");
200+
Arguments.checkNotNull(consumer, "consumer");
201+
Arguments.checkNotNull(onError, "onError");
202+
Arguments.checkNotNull(onComplete, "onComplete");
203+
Arguments.checkNotNull(asynchronously, "asynchronously");
204+
148205
Callback<ResponseBody> callback = new Callback<ResponseBody>() {
149206
@Override
150207
public void onResponse(@Nonnull final Call<ResponseBody> call,
@@ -160,28 +217,7 @@ public void onResponse(@Nonnull final Call<ResponseBody> call,
160217
return;
161218
}
162219

163-
try {
164-
BufferedSource source = body.source();
165-
166-
//
167-
// Source has data => parse
168-
//
169-
while (source.isOpen() && !source.exhausted() && !cancellable.wasCancelled) {
170-
171-
consumer.accept(cancellable, source);
172-
}
173-
174-
if (!cancellable.wasCancelled) {
175-
onComplete.run();
176-
}
177-
178-
} catch (Exception e) {
179-
catchOrPropagateException(e, onError);
180-
181-
} finally {
182-
183-
body.close();
184-
}
220+
consumer.accept(body);
185221
}
186222

187223
@Override
@@ -233,4 +269,121 @@ public boolean isCancelled() {
233269
}
234270
}
235271

272+
protected final class RawIterator implements Iterator<String>, Closeable, Consumer<ResponseBody> {
273+
274+
private String line = null;
275+
private boolean closed = false;
276+
private ResponseBody body;
277+
private BufferedSource source;
278+
private final Consumer<? super Throwable> onError;
279+
280+
private RawIterator(@Nonnull final Call<ResponseBody> call,
281+
@Nonnull final Consumer<? super Throwable> onError) {
282+
this.onError = onError;
283+
query(call, this, onError, EMPTY_ACTION, false);
284+
}
285+
286+
@Override
287+
public boolean hasNext() {
288+
return !closed && readNext();
289+
}
290+
291+
@Override
292+
public String next() {
293+
return line;
294+
}
295+
296+
@Override
297+
public void accept(final ResponseBody body) {
298+
this.body = body;
299+
this.source = body.source();
300+
}
301+
302+
@Override
303+
public void close() throws IOException {
304+
closed = true;
305+
if (body != null) {
306+
body.close();
307+
}
308+
}
309+
310+
private boolean readNext() {
311+
line = null;
312+
try {
313+
if (!closed && source.isOpen() && !source.exhausted()) {
314+
line = source.readUtf8Line();
315+
}
316+
} catch (IOException e) {
317+
catchOrPropagateException(e, onError);
318+
}
319+
320+
return line != null;
321+
}
322+
}
323+
324+
protected final class FluxRecordIterator implements Iterator<FluxRecord>, Closeable, Consumer<ResponseBody> {
325+
326+
private FluxRecord record = null;
327+
private boolean closed = false;
328+
private ResponseBody body;
329+
private CSVParser parser;
330+
private Iterator<CSVRecord> iterator;
331+
332+
private final FluxCsvParser.FluxCsvState state = new FluxCsvParser.FluxCsvState();
333+
private final Consumer<? super Throwable> onError;
334+
335+
public FluxRecordIterator(@Nonnull final Call<ResponseBody> call,
336+
@Nonnull final Consumer<? super Throwable> onError) {
337+
this.onError = onError;
338+
query(call, this, onError, EMPTY_ACTION, false);
339+
}
340+
341+
@Override
342+
public boolean hasNext() {
343+
return !closed && readNext();
344+
}
345+
346+
@Override
347+
public FluxRecord next() {
348+
return record;
349+
}
350+
351+
@Override
352+
public void accept(final ResponseBody body) {
353+
this.body = body;
354+
355+
Reader reader = new InputStreamReader(body.source().inputStream(), StandardCharsets.UTF_8);
356+
try {
357+
parser = new CSVParser(reader, CSVFormat.DEFAULT);
358+
} catch (IOException e) {
359+
catchOrPropagateException(e, onError);
360+
}
361+
iterator = parser.iterator();
362+
}
363+
364+
@Override
365+
public void close() throws IOException {
366+
closed = true;
367+
if (parser != null) {
368+
parser.close();
369+
}
370+
if (body != null) {
371+
body.close();
372+
}
373+
}
374+
375+
private boolean readNext() {
376+
377+
record = null;
378+
while (record == null && iterator.hasNext()) {
379+
state.csvRecord = iterator.next();
380+
FluxCsvParser.FluxRecordOrTable fluxRecordOrTable = fluxCsvParser.parseNextResponse(state);
381+
if (fluxRecordOrTable.record != null) {
382+
record = fluxRecordOrTable.record;
383+
}
384+
}
385+
386+
return record != null;
387+
}
388+
}
236389
}

0 commit comments

Comments
 (0)