Skip to content

Commit 98da5f9

Browse files
nMonchoGustavo De Micheli
andauthored
feat: Ingest Points continuously with Akka Stream (#461)
Co-authored-by: Gustavo De Micheli <[email protected]>
1 parent 319c2ef commit 98da5f9

File tree

2 files changed

+66
-22
lines changed

2 files changed

+66
-22
lines changed

client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
package com.influxdb.client.scala.internal
2323

2424
import akka.Done
25-
import akka.stream.scaladsl.{Flow, Keep, Sink}
25+
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
2626
import com.influxdb.client.InfluxDBClientOptions
2727
import com.influxdb.client.domain.WritePrecision
2828
import com.influxdb.client.internal.{AbstractWriteBlockingClient, AbstractWriteClient}
@@ -53,13 +53,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
5353
* @param org Specifies the destination organization for writes.
5454
* The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization`
5555
* if the `org` is not specified.
56-
* @return the sink that accept the record specified in InfluxDB Line Protocol. The `record` is considered as one batch unit.
56+
* @return the sink that accept the record specified in InfluxDB Line Protocol.
5757
*/
5858
override def writeRecord(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[String, Future[Done]] = {
5959
Flow[String]
6060
.map(record => Seq(new AbstractWriteClient.BatchWriteDataRecord(record)))
61-
.map(batch => writeHttp(precision, bucket, org, batch))
62-
.toMat(Sink.head)(Keep.right)
61+
.toMat(Sink.foreach(batch => writeHttp(precision, bucket, org, batch)))(Keep.right)
6362
}
6463

6564
/**
@@ -83,13 +82,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
8382
* Write Line Protocol records into specified bucket.
8483
*
8584
* @param parameters specify InfluxDB Write endpoint parameters
86-
* @return the sink that accept the records specified in InfluxDB Line Protocol. The `records` are considered as one batch unit.
85+
* @return the sink that accept the records specified in InfluxDB Line Protocol.
8786
*/
8887
override def writeRecords(parameters: WriteParameters): Sink[Seq[String], Future[Done]] = {
8988
Flow[Seq[String]]
9089
.map(records => records.map(record => new AbstractWriteClient.BatchWriteDataRecord(record)))
91-
.map(batch => writeHttp(parameters, batch))
92-
.toMat(Sink.head)(Keep.right)
90+
.toMat(Sink.foreach(batch => writeHttp(parameters, batch)))(Keep.right)
9391
}
9492

9593
/**
@@ -101,13 +99,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
10199
* @param org Specifies the destination organization for writes.
102100
* The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization`
103101
* if the `org` is not specified.
104-
* @return the sink that accept the Data points. The `point` is considered as one batch unit.
102+
* @return the sink that accept the Data points.
105103
*/
106104
override def writePoint(bucket: Option[String], org: Option[String]): Sink[Point, Future[Done]] = {
107105
Flow[Point]
108106
.map(point => (point.getPrecision, Seq(new AbstractWriteClient.BatchWriteDataPoint(point, options))))
109-
.map(batch => writeHttp(Some(batch._1), bucket, org, batch._2))
110-
.toMat(Sink.head)(Keep.right)
107+
.toMat(Sink.foreach(batch => writeHttp(Some(batch._1), bucket, org, batch._2)))(Keep.right)
111108
}
112109

113110
/**
@@ -119,7 +116,7 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
119116
* @param org Specifies the destination organization for writes.
120117
* The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization`
121118
* if the `org` is not specified.
122-
* @return the sink that accept the Data points. The `points` are considered as one batch unit.
119+
* @return the sink that accept the Data points.
123120
*/
124121
override def writePoints(bucket: Option[String], org: Option[String]): Sink[Seq[Point], Future[Done]] = {
125122
writePoints(new WriteParameters(bucket.orNull, org.orNull, null, null))
@@ -129,7 +126,7 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
129126
* Write Data points into specified bucket.
130127
*
131128
* @param parameters specify InfluxDB Write endpoint parameters
132-
* @return the sink that accept the Data points. The `points` are considered as one batch unit.
129+
* @return the sink that accept the Data points.
133130
*/
134131
override def writePoints(parameters: WriteParameters): Sink[Seq[Point], Future[Done]] = {
135132
Flow[Seq[Point]]
@@ -138,9 +135,8 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
138135
case (point, map) => map.updated(point.getPrecision, point +: map.getOrElse(point.getPrecision, Seq()))
139136
}.toList.reverse)
140137
.map(grouped => grouped.map(group => (group._1, group._2.map(point => new AbstractWriteClient.BatchWriteDataPoint(point, options)))))
141-
.map(batches => batches.foreach(batch => writeHttp(parameters.copy(batch._1, options), batch._2)))
142-
.map(_ => Done.done())
143-
.toMat(Sink.head)(Keep.right)
138+
.flatMapConcat(batches => Source(batches))
139+
.toMat(Sink.foreach(batch => writeHttp(parameters.copy(batch._1, options), batch._2)))(Keep.right)
144140
}
145141

146142
/**
@@ -155,16 +151,15 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
155151
* The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization`
156152
* if the `org` is not specified.
157153
* @tparam M the type of the measurement (POJO)
158-
* @return the sink that accept the measurement. The `measurement` is considered as one batch unit.
154+
* @return the sink that accept the measurement.
159155
*/
160156
override def writeMeasurement[M](precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[M, Future[Done]] = {
161157
Flow[M]
162158
.map(measurement => {
163159
val parameters = toWriteParameters(precision, bucket, org)
164160
Seq(toMeasurementBatch(measurement, parameters.precisionSafe(options)))
165161
})
166-
.map(batch => writeHttp(precision, bucket, org, batch))
167-
.toMat(Sink.head)(Keep.right)
162+
.toMat(Sink.foreach(batch => writeHttp(precision, bucket, org, batch)))(Keep.right)
168163
}
169164

170165
/**
@@ -179,7 +174,7 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
179174
* The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization`
180175
* if the `org` is not specified.
181176
* @tparam M the type of the measurement (POJO)
182-
* @return the sink that accept the measurements. The `measurements` are considered as one batch unit.
177+
* @return the sink that accept the measurements.
183178
*/
184179
override def writeMeasurements[M](precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[Seq[M], Future[Done]] = {
185180
writeMeasurements(toWriteParameters(precision, bucket, org))
@@ -190,13 +185,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
190185
*
191186
* @param parameters specify InfluxDB Write endpoint parameters
192187
* @tparam M the type of the measurement (POJO)
193-
* @return the sink that accept the measurements. The `measurements` are considered as one batch unit.
188+
* @return the sink that accept the measurements.
194189
*/
195190
override def writeMeasurements[M](parameters: WriteParameters): Sink[Seq[M], Future[Done]] = {
196191
Flow[Seq[M]]
197192
.map(records => records.map(record => toMeasurementBatch(record, parameters.precisionSafe(options))))
198-
.map(batch => writeHttp(parameters, batch))
199-
.toMat(Sink.head)(Keep.right)
193+
.toMat(Sink.foreach(batch => writeHttp(parameters, batch)))(Keep.right)
200194
}
201195

202196
private def writeHttp(precision: Option[WritePrecision], bucket: Option[String], org: Option[String], batch: Seq[AbstractWriteClient.BatchWriteData]): Done = {

client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,25 @@ class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter wi
7272
request.getRequestUrl.queryParameter("precision") should be("ns")
7373
}
7474

75+
test("write record as stream") {
76+
77+
utils.serverMockResponse()
78+
79+
val source = Source(List("m2m,tag=a value=1i 1", "m2m,tag=a value=2i 2"))
80+
val sink = client.getWriteScalaApi.writeRecord()
81+
val materialized = source.toMat(sink)(Keep.right)
82+
83+
Await.ready(materialized.run(), Duration.Inf)
84+
85+
utils.getRequestCount should be(2)
86+
val request = utils.serverTakeRequest()
87+
// check request
88+
request.getBody.readUtf8() should be("m2m,tag=a value=1i 1")
89+
request.getRequestUrl.queryParameter("bucket") should be("my-bucket")
90+
request.getRequestUrl.queryParameter("org") should be("my-org")
91+
request.getRequestUrl.queryParameter("precision") should be("ns")
92+
}
93+
7594
test("write records") {
7695

7796
utils.serverMockResponse()
@@ -142,6 +161,37 @@ class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter wi
142161
request.getRequestUrl.queryParameter("precision") should be("ns")
143162
}
144163

164+
test("write point as stream") {
165+
166+
utils.serverMockResponse()
167+
168+
val point = Point
169+
.measurement("h2o")
170+
.addTag("location", "europe")
171+
.addField("level", 1)
172+
.time(1L, WritePrecision.NS)
173+
174+
val point2 = Point
175+
.measurement("h2o")
176+
.addTag("location", "europe")
177+
.addField("level", 2)
178+
.time(2L, WritePrecision.NS)
179+
180+
val source = Source(List(point, point2))
181+
val sink = client.getWriteScalaApi.writePoint()
182+
val materialized = source.toMat(sink)(Keep.right)
183+
184+
Await.ready(materialized.run(), Duration.Inf)
185+
186+
utils.getRequestCount should be(2)
187+
val request = utils.serverTakeRequest()
188+
// check request
189+
request.getBody.readUtf8() should be("h2o,location=europe level=1i 1")
190+
request.getRequestUrl.queryParameter("bucket") should be("my-bucket")
191+
request.getRequestUrl.queryParameter("org") should be("my-org")
192+
request.getRequestUrl.queryParameter("precision") should be("ns")
193+
}
194+
145195
test("write points") {
146196

147197
utils.serverMockResponse()

0 commit comments

Comments
 (0)