From 9fce3e7a0d3eac7dfb82d4ae40b63d398a598040 Mon Sep 17 00:00:00 2001 From: Jasper Huzen Date: Mon, 28 Sep 2015 17:01:54 +0200 Subject: [PATCH 1/5] Move ingest creation to context object --- .../elasticsearch/jdbc/strategy/Sink.java | 7 --- .../strategy/standard/StandardContext.java | 61 +++++++++++++++++++ .../jdbc/strategy/standard/StandardSink.java | 45 +++----------- .../jdbc/strategy/mock/MockSink.java | 4 -- 4 files changed, 70 insertions(+), 47 deletions(-) diff --git a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/Sink.java b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/Sink.java index d651557b..5c16982b 100644 --- a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/Sink.java +++ b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/Sink.java @@ -143,13 +143,6 @@ public interface Sink { */ void delete(IndexableObject object) throws IOException; - /** - * Flush data to the sink - * - * @throws IOException when flush fails - */ - void flushIngest() throws IOException; - /** * Shutdown and release all resources, e.g. bulk processor and client * @throws IOException when shutdown fails diff --git a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardContext.java b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardContext.java index abc23cee..42d01c98 100644 --- a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardContext.java +++ b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardContext.java @@ -78,6 +78,8 @@ public class StandardContext implements Context { private Throwable throwable; + private Ingest ingest; + private final static List futures = new LinkedList<>(); private final static SourceMetric sourceMetric = new SourceMetric().start(); @@ -177,6 +179,24 @@ public Throwable getThrowable() { return throwable; } + public Ingest getIngest() { + return ingest; + } + + public Ingest getOrCreateIngest(Metric metric) throws IOException { + if (ingest == null) { + if (getIngestFactory() != null) { + ingest = getIngestFactory().create(); + if (ingest != null) { + ingest.setMetric(metric); + } + } else { + logger.warn("no ingest factory found"); + } + } + return ingest; + } + public DateTime getDateOfThrowable() { return dateOfThrowable; } @@ -230,7 +250,18 @@ public void afterFetch() throws Exception { logger.error(e.getMessage(), e); } try { + + logger.debug("afterFetch: flush ingest"); + flushIngest(); + getSink().afterFetch(); + + logger.debug("afterFetch: before ingest shutdown"); + if(ingest != null) { + ingest.shutdown(); + ingest = null; + } + } catch (Throwable e) { setThrowable(e); logger.error(e.getMessage(), e); @@ -250,6 +281,13 @@ public void shutdown() { logger.error(e.getMessage(), e); } } + + try { + flushIngest(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + if (sink != null) { try { sink.shutdown(); @@ -257,10 +295,31 @@ public void shutdown() { logger.error(e.getMessage(), e); } } + + if (ingest != null) { + logger.info("shutdown in progress"); + ingest.shutdown(); + ingest = null; + } + logger.info("shutdown completed"); writeState(); } + public void flushIngest() throws IOException { + if (ingest == null) { + return; + } + ingest.flushIngest(); + // wait for all outstanding bulk requests before continuing. Estimation is 60 seconds + try { + ingest.waitForResponses(TimeValue.timeValueSeconds(60)); + } catch (InterruptedException e) { + logger.warn("interrupted while waiting for responses"); + Thread.currentThread().interrupt(); + } + } + @Override public void resetCounter() { if (sourceMetric != null) { @@ -401,6 +460,8 @@ protected void prepareContext(S source, Sink sink) throws IOException { sink.setContext(this); } + + private IngestFactory createIngestFactory(final Settings settings) { return new IngestFactory() { @Override diff --git a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java index cb6503bd..6a34384a 100644 --- a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java +++ b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java @@ -46,8 +46,6 @@ public class StandardSink implements Sink { protected C context; - protected Ingest ingest; - protected Settings indexSettings; protected Map indexMappings; @@ -89,16 +87,8 @@ public Metric getMetric() { @Override public synchronized void beforeFetch() throws IOException { - if (ingest == null) { - if (context.getIngestFactory() != null) { - ingest = context.getIngestFactory().create(); - if(ingest != null) { - ingest.setMetric(metric); - } - } else { - logger.warn("no ingest factory found"); - } - } + Ingest ingest = context.getOrCreateIngest(metric); + if (ingest == null) { logger.warn("no ingest found"); return; @@ -121,31 +111,26 @@ public synchronized void beforeFetch() throws IOException { @Override public synchronized void afterFetch() throws IOException { + Ingest ingest = context.getIngest(); if (ingest == null) { return; } - logger.debug("afterFetch: flush ingest"); - flushIngest(); logger.debug("afterFetch: stop bulk"); ingest.stopBulk(index); logger.debug("afterFetch: refresh index"); ingest.refreshIndex(index); - logger.debug("afterFetch: before ingest shutdown"); - ingest.shutdown(); - ingest = null; + logger.debug("afterFetch: after ingest shutdown"); } @Override public synchronized void shutdown() { - if (ingest == null) { + Ingest ingest = context.getIngest(); + if(ingest == null) { return; } try { - logger.info("shutdown in progress"); - flushIngest(); ingest.stopBulk(index); - ingest.shutdown(); } catch (IOException e) { logger.error(e.getMessage(), e); } @@ -198,6 +183,7 @@ public String getId() { @Override public void index(IndexableObject object, boolean create) throws IOException { + Ingest ingest = context.getIngest(); if (ingest == null) { return; } @@ -238,6 +224,8 @@ public void index(IndexableObject object, boolean create) throws IOException { @Override public void delete(IndexableObject object) { + Ingest ingest = context.getIngest(); + if (ingest == null) { return; } @@ -270,19 +258,4 @@ public void delete(IndexableObject object) { ingest.bulkDelete(request); } - @Override - public void flushIngest() throws IOException { - if (ingest == null) { - return; - } - ingest.flushIngest(); - // wait for all outstanding bulk requests before continuing. Estimation is 60 seconds - try { - ingest.waitForResponses(TimeValue.timeValueSeconds(60)); - } catch (InterruptedException e) { - logger.warn("interrupted while waiting for responses"); - Thread.currentThread().interrupt(); - } - } - } diff --git a/src/test/java/org/xbib/elasticsearch/jdbc/strategy/mock/MockSink.java b/src/test/java/org/xbib/elasticsearch/jdbc/strategy/mock/MockSink.java index a025c6e7..617726d8 100644 --- a/src/test/java/org/xbib/elasticsearch/jdbc/strategy/mock/MockSink.java +++ b/src/test/java/org/xbib/elasticsearch/jdbc/strategy/mock/MockSink.java @@ -123,10 +123,6 @@ public String getId() { return null; } - @Override - public void flushIngest() throws IOException { - } - @Override public void shutdown() throws IOException { } From 9adc5dbccdcb758f2ba227cfdb22d20d02372e36 Mon Sep 17 00:00:00 2001 From: msimons Date: Tue, 29 Sep 2015 13:06:37 +0200 Subject: [PATCH 2/5] Support for partial document updates --- pom.xml | 4 ++-- .../util/SinkKeyValueStreamListener.java | 2 ++ .../jdbc/strategy/standard/StandardSink.java | 23 +++++++++++++++---- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/pom.xml b/pom.xml index 4465ad11..6028e04e 100644 --- a/pom.xml +++ b/pom.xml @@ -72,8 +72,8 @@ github UTF-8 1.7 - 1.7.0 - 1.7.0.0 + 1.7.2 + 1.7.2.1 diff --git a/src/main/java/org/xbib/elasticsearch/common/util/SinkKeyValueStreamListener.java b/src/main/java/org/xbib/elasticsearch/common/util/SinkKeyValueStreamListener.java index 74fa642d..55b3cf91 100644 --- a/src/main/java/org/xbib/elasticsearch/common/util/SinkKeyValueStreamListener.java +++ b/src/main/java/org/xbib/elasticsearch/common/util/SinkKeyValueStreamListener.java @@ -53,6 +53,8 @@ public SinkKeyValueStreamListener end(IndexableObject object) throws IOExc output.index(object, false); } else if ("index".equals(object.optype())) { output.index(object, false); + } else if ("update".equals(object.optype())) { + output.index(object, false); } else if ("create".equals(object.optype())) { output.index(object, true); } else if ("delete".equals(object.optype())) { diff --git a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java index 6a34384a..0f666fa0 100644 --- a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java +++ b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java @@ -17,18 +17,21 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.common.Strings; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.format.DateTimeFormat; +import org.elasticsearch.common.lang3.StringUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.VersionType; -import org.xbib.elasticsearch.jdbc.strategy.Sink; import org.xbib.elasticsearch.common.util.ControlKeys; import org.xbib.elasticsearch.common.util.IndexableObject; +import org.xbib.elasticsearch.jdbc.strategy.Sink; import org.xbib.elasticsearch.support.client.Ingest; import org.xbib.elasticsearch.support.client.Metric; @@ -196,6 +199,7 @@ public void index(IndexableObject object, boolean create) throws IOException { if (Strings.hasLength(object.id())) { setId(object.id()); } + IndexRequest request = Requests.indexRequest(this.index) .type(this.type) .id(getId()) @@ -216,10 +220,19 @@ public void index(IndexableObject object, boolean create) throws IOException { if (object.meta(ControlKeys._ttl.name()) != null) { request.ttl(Long.parseLong(object.meta(ControlKeys._ttl.name()))); } - if (logger.isTraceEnabled()) { - logger.trace("adding bulk index action {}", request.source().toUtf8()); + + ActionRequest req = request; + if(StringUtils.equals("update",object.optype())) { + req = new UpdateRequest(this.index,this.type,getId()) + .doc(object.build()) + .upsert(request); + + if (logger.isTraceEnabled()) { + logger.trace("adding bulk action {}", req.toString()); + } + + ingest.action(req); } - ingest.bulkIndex(request); } @Override @@ -255,7 +268,7 @@ public void delete(IndexableObject object) { if (logger.isTraceEnabled()) { logger.trace("adding bulk delete action {}/{}/{}", request.index(), request.type(), request.id()); } - ingest.bulkDelete(request); + ingest.action(request); } } From f0bdd16936ccd9a56b5b19decfa3043d13de4c05 Mon Sep 17 00:00:00 2001 From: msimons Date: Tue, 29 Sep 2015 13:59:35 +0200 Subject: [PATCH 3/5] Revert "Support for partial document updates" This reverts commit a29e027ee0cab81c91e1950308c1cad0ac8f0e01. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6028e04e..c4f7205b 100644 --- a/pom.xml +++ b/pom.xml @@ -73,7 +73,7 @@ UTF-8 1.7 1.7.2 - 1.7.2.1 + 1.7.3.0-SNAPSHOT From 8ede73cb3e9e1b4beadd039e29c8d78a728fe441 Mon Sep 17 00:00:00 2001 From: Jasper Huzen Date: Tue, 29 Sep 2015 14:10:13 +0200 Subject: [PATCH 4/5] Fix incorrect versions. Line up with origin --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index c4f7205b..4465ad11 100644 --- a/pom.xml +++ b/pom.xml @@ -72,8 +72,8 @@ github UTF-8 1.7 - 1.7.2 - 1.7.3.0-SNAPSHOT + 1.7.0 + 1.7.0.0 From e36b6490628f790a4bad3773d0584b79161f8097 Mon Sep 17 00:00:00 2001 From: Jasper Huzen Date: Tue, 29 Sep 2015 14:16:58 +0200 Subject: [PATCH 5/5] Revert "Support for partial document updates" This reverts commit 9adc5dbccdcb758f2ba227cfdb22d20d02372e36. --- .../util/SinkKeyValueStreamListener.java | 2 -- .../jdbc/strategy/standard/StandardSink.java | 23 ++++--------------- 2 files changed, 5 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/xbib/elasticsearch/common/util/SinkKeyValueStreamListener.java b/src/main/java/org/xbib/elasticsearch/common/util/SinkKeyValueStreamListener.java index 55b3cf91..74fa642d 100644 --- a/src/main/java/org/xbib/elasticsearch/common/util/SinkKeyValueStreamListener.java +++ b/src/main/java/org/xbib/elasticsearch/common/util/SinkKeyValueStreamListener.java @@ -53,8 +53,6 @@ public SinkKeyValueStreamListener end(IndexableObject object) throws IOExc output.index(object, false); } else if ("index".equals(object.optype())) { output.index(object, false); - } else if ("update".equals(object.optype())) { - output.index(object, false); } else if ("create".equals(object.optype())) { output.index(object, true); } else if ("delete".equals(object.optype())) { diff --git a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java index 0f666fa0..6a34384a 100644 --- a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java +++ b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java @@ -17,21 +17,18 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.common.Strings; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.format.DateTimeFormat; -import org.elasticsearch.common.lang3.StringUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.VersionType; +import org.xbib.elasticsearch.jdbc.strategy.Sink; import org.xbib.elasticsearch.common.util.ControlKeys; import org.xbib.elasticsearch.common.util.IndexableObject; -import org.xbib.elasticsearch.jdbc.strategy.Sink; import org.xbib.elasticsearch.support.client.Ingest; import org.xbib.elasticsearch.support.client.Metric; @@ -199,7 +196,6 @@ public void index(IndexableObject object, boolean create) throws IOException { if (Strings.hasLength(object.id())) { setId(object.id()); } - IndexRequest request = Requests.indexRequest(this.index) .type(this.type) .id(getId()) @@ -220,19 +216,10 @@ public void index(IndexableObject object, boolean create) throws IOException { if (object.meta(ControlKeys._ttl.name()) != null) { request.ttl(Long.parseLong(object.meta(ControlKeys._ttl.name()))); } - - ActionRequest req = request; - if(StringUtils.equals("update",object.optype())) { - req = new UpdateRequest(this.index,this.type,getId()) - .doc(object.build()) - .upsert(request); - - if (logger.isTraceEnabled()) { - logger.trace("adding bulk action {}", req.toString()); - } - - ingest.action(req); + if (logger.isTraceEnabled()) { + logger.trace("adding bulk index action {}", request.source().toUtf8()); } + ingest.bulkIndex(request); } @Override @@ -268,7 +255,7 @@ public void delete(IndexableObject object) { if (logger.isTraceEnabled()) { logger.trace("adding bulk delete action {}/{}/{}", request.index(), request.type(), request.id()); } - ingest.action(request); + ingest.bulkDelete(request); } }