diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD index d30028651af..df8c94d5111 100644 --- a/heron/api/src/java/BUILD +++ b/heron/api/src/java/BUILD @@ -1,5 +1,4 @@ licenses(["notice"]) - package(default_visibility = ["//visibility:public"]) load("//tools/rules:build_defs.bzl", "DOCLINT_HTML_AND_SYNTAX") @@ -16,6 +15,7 @@ api_deps_files = \ heron_java_api_proto_files() + [ ":classification", "//heron/common/src/java:basics-java", + "//third_party/java:guava" ] # Low Level Api diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java index 24eea584f6a..c060ad23306 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java @@ -42,7 +42,7 @@ public FilterOperator(SerializablePredicate filterFn) { public void execute(Tuple tuple) { R obj = (R) tuple.getValue(0); if (filterFn.test(obj)) { - collector.emit(new Values(obj)); + collector.emit(tuple, new Values(obj)); } collector.ack(tuple); } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java index dce70e7426b..5bc0b6c9c04 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java @@ -44,7 +44,7 @@ public void execute(Tuple tuple) { R obj = (R) tuple.getValue(0); Iterable result = flatMapFn.apply(obj); for (T o : result) { - collector.emit(new Values(o)); + collector.emit(tuple, new Values(o)); } collector.ack(tuple); } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java index b3a2cc2fdf6..372f6da9019 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java @@ -77,7 +77,8 @@ public void execute(TupleWindow inputWindow) { for (K key : reduceMap.keySet()) { Window window = new Window(startWindow, endWindow, windowCountMap.get(key)); KeyedWindow keyedWindow = new KeyedWindow<>(key, window); - collector.emit(new Values(new KeyValue<>(keyedWindow, reduceMap.get(key)))); + collector.emit(inputWindow.get(), new Values(new KeyValue<>(keyedWindow, + reduceMap.get(key)))); } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyOperator.java index 462b0a2e9fa..8736a78ee7d 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyOperator.java @@ -62,7 +62,7 @@ public void execute(Tuple tuple) { T newValue = reduceFn.apply(oldValue, obj); reduceMap.put(key, newValue); - collector.emit(new Values(new KeyValue(key, newValue))); + collector.emit(tuple, new Values(new KeyValue(key, newValue))); collector.ack(tuple); } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java index 157b487088d..a0dfe84fa02 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java @@ -171,7 +171,7 @@ private void innerJoinAndEmit(K key, TupleWindow tupleWindow, Pair, Lis KeyedWindow keyedWindow = getKeyedWindow(key, tupleWindow); for (V1 val1 : val.getFirst()) { for (V2 val2 : val.getSecond()) { - collector.emit(new Values(new KeyValue<>(keyedWindow, + collector.emit(tupleWindow.get(), new Values(new KeyValue<>(keyedWindow, joinFn.apply(val1, val2)))); } } @@ -180,7 +180,7 @@ private void innerJoinAndEmit(K key, TupleWindow tupleWindow, Pair, Lis private void outerLeftJoinAndEmit(K key, TupleWindow tupleWindow, Pair, List> val) { KeyedWindow keyedWindow = getKeyedWindow(key, tupleWindow); for (V1 val1 : val.getFirst()) { - collector.emit(new Values(new KeyValue<>(keyedWindow, + collector.emit(tupleWindow.get(), new Values(new KeyValue<>(keyedWindow, joinFn.apply(val1, null)))); } } @@ -188,7 +188,7 @@ private void outerLeftJoinAndEmit(K key, TupleWindow tupleWindow, Pair, private void outerRightJoinAndEmit(K key, TupleWindow tupleWindow, Pair, List> val) { KeyedWindow keyedWindow = getKeyedWindow(key, tupleWindow); for (V2 val2 : val.getSecond()) { - collector.emit(new Values(new KeyValue<>(keyedWindow, + collector.emit(tupleWindow.get(), new Values(new KeyValue<>(keyedWindow, joinFn.apply(null, val2)))); } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/KeyByOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/KeyByOperator.java index 911c0812c67..ac072f4dfd6 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/KeyByOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/KeyByOperator.java @@ -47,7 +47,7 @@ public void execute(Tuple tuple) { K key = keyExtractor.apply(obj); V value = valueExtractor.apply(obj); - collector.emit(new Values(new KeyValue<>(key, value))); + collector.emit(tuple, new Values(new KeyValue<>(key, value))); collector.ack(tuple); } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java index 73bfbf585b6..6f3f6a9289c 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java @@ -41,7 +41,7 @@ public MapOperator(SerializableFunction mapFn) { public void execute(Tuple tuple) { R obj = (R) tuple.getValue(0); T result = mapFn.apply(obj); - collector.emit(new Values(result)); + collector.emit(tuple, new Values(result)); collector.ack(tuple); } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java index 6649848868f..4617190ad4f 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java @@ -77,7 +77,8 @@ public void execute(TupleWindow inputWindow) { for (K key : reduceMap.keySet()) { Window window = new Window(startWindow, endWindow, windowCountMap.get(key)); KeyedWindow keyedWindow = new KeyedWindow<>(key, window); - collector.emit(new Values(new KeyValue<>(keyedWindow, reduceMap.get(key)))); + collector.emit(inputWindow.get(), + new Values(new KeyValue<>(keyedWindow, reduceMap.get(key)))); } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyOperator.java index 5830a7c75f8..4cb3d3f9066 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyOperator.java @@ -67,7 +67,7 @@ public void execute(Tuple tuple) { } reduceMap.put(key, newValue); - collector.emit(new Values(new KeyValue(key, newValue))); + collector.emit(tuple, new Values(new KeyValue(key, newValue))); collector.ack(tuple); } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/SplitOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/SplitOperator.java index c07893ec668..78f1d58a966 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/SplitOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/SplitOperator.java @@ -46,7 +46,7 @@ public void execute(Tuple tuple) { R obj = (R) tuple.getValue(0); for (Map.Entry> entry: splitFns.entrySet()) { if (entry.getValue().test(obj)) { - collector.emit(entry.getKey(), new Values(obj)); + collector.emit(entry.getKey(), tuple, new Values(obj)); } } collector.ack(tuple); diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java index 2379a3b0e5e..e017cbe6c29 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java @@ -79,7 +79,7 @@ public void prepare(Map map, @Override public void execute(Tuple tuple) { R obj = (R) tuple.getValue(0); - serializableTransformer.transform(obj, x -> collector.emit(new Values(x))); + serializableTransformer.transform(obj, x -> collector.emit(tuple, new Values(x))); collector.ack(tuple); } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java index 4d56bf14c77..fc68e0561bd 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java @@ -36,7 +36,7 @@ public UnionOperator() { @Override public void execute(Tuple tuple) { I obj = (I) tuple.getValue(0); - collector.emit(new Values(obj)); + collector.emit(tuple, new Values(obj)); collector.ack(tuple); } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sources/ComplexSource.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sources/ComplexSource.java index 237eb814363..d05746a7913 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/sources/ComplexSource.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sources/ComplexSource.java @@ -20,7 +20,12 @@ import java.io.Serializable; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.cache.Cache; import org.apache.heron.api.spout.SpoutOutputCollector; import org.apache.heron.api.state.State; @@ -38,9 +43,17 @@ public class ComplexSource extends StreamletSource { private static final long serialVersionUID = -5086763670301450007L; + private static final Logger LOG = Logger.getLogger(ComplexSource.class.getName()); private Source generator; private State state; + // protected used to allow unit test access + protected Cache msgIdCache; + protected String msgId; + // taskIds are collected to facilitate units tests + protected List taskIds; + private Level logLevel = Level.INFO; + public ComplexSource(Source generator) { this.generator = generator; } @@ -57,13 +70,40 @@ public void open(Map map, TopologyContext topologyContext, super.open(map, topologyContext, outputCollector); Context context = new ContextImpl(topologyContext, map, state); generator.setup(context); + ackingEnabled = isAckingEnabled(map, topologyContext); + msgIdCache = createCache(); } @Override public void nextTuple() { Collection tuples = generator.get(); + msgId = null; if (tuples != null) { - tuples.forEach(tuple -> collector.emit(new Values(tuple))); + for (R tuple : tuples) { + if (ackingEnabled) { + msgId = getUniqueMessageId(); + msgIdCache.put(msgId, tuple); + taskIds = collector.emit(new Values(tuple), msgId); + } else { + taskIds = collector.emit(new Values(tuple)); + } + LOG.log(logLevel, "emitting: [" + msgId + "]"); + } + } + } + + @Override public void ack(Object mid) { + if (ackingEnabled) { + msgIdCache.invalidate(mid); + LOG.log(logLevel, "acked: [" + mid + "]"); + } + } + + @Override public void fail(Object mid) { + if (ackingEnabled) { + Values values = new Values(msgIdCache.getIfPresent(mid)); + taskIds = collector.emit(values, mid); + LOG.log(logLevel, "re-emit: [" + mid + "]"); } } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sources/StreamletSource.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sources/StreamletSource.java index 19c8ceb7c14..6f91f2abf95 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/sources/StreamletSource.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sources/StreamletSource.java @@ -20,6 +20,10 @@ import java.io.Serializable; import java.util.Map; +import java.util.UUID; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.heron.api.spout.BaseRichSpout; import org.apache.heron.api.spout.SpoutOutputCollector; @@ -28,6 +32,10 @@ import org.apache.heron.api.topology.OutputFieldsDeclarer; import org.apache.heron.api.topology.TopologyContext; import org.apache.heron.api.tuple.Fields; +import org.apache.heron.streamlet.impl.ContextImpl; + +import static org.apache.heron.api.Config.TOPOLOGY_RELIABILITY_MODE; +import static org.apache.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE; /** * StreamletSource is the base class for all streamlet sources. @@ -39,6 +47,7 @@ public abstract class StreamletSource extends BaseRichSpout private static final long serialVersionUID = 8583965332619565343L; private static final String OUTPUT_FIELD_NAME = "output"; + protected boolean ackingEnabled = false; protected SpoutOutputCollector collector; @Override @@ -54,6 +63,12 @@ public void open(Map map, TopologyContext topologyContext, collector = outputCollector; } + // a convenience method for creating cache + // TODO set appropriate properties in builder + Cache createCache() { + return CacheBuilder.newBuilder().build(); + } + /** * The sources implementing streamlet functionality have some properties. * 1. They all output only one stream @@ -64,4 +79,23 @@ public void open(Map map, TopologyContext topologyContext, public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(OUTPUT_FIELD_NAME)); } + + /** + * Return a unique message ID for use with ATLEAST_ONCE topologies. + * + * @return a unique message id string. + */ + public String getUniqueMessageId() { + return UUID.randomUUID().toString(); + } + + /** + * Determine if streamlet acknowledgments (i.e., ATLEAST_ONCE) are set. + * + * @return true if acking is enabled; false otherwise. + */ + public boolean isAckingEnabled(Map map, TopologyContext topologyContext) { + ContextImpl context = new ContextImpl(topologyContext, map, null); + return context.getConfig().get(TOPOLOGY_RELIABILITY_MODE).equals(ATLEAST_ONCE.toString()); + } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sources/SupplierSource.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sources/SupplierSource.java index d60a34f3ede..58aa6ae3bfb 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/sources/SupplierSource.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sources/SupplierSource.java @@ -18,6 +18,15 @@ */ package org.apache.heron.streamlet.impl.sources; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.cache.Cache; + +import org.apache.heron.api.spout.SpoutOutputCollector; +import org.apache.heron.api.topology.TopologyContext; import org.apache.heron.api.tuple.Values; import org.apache.heron.streamlet.SerializableSupplier; @@ -29,14 +38,56 @@ public class SupplierSource extends StreamletSource { private static final long serialVersionUID = 6476611751545430216L; + private static final Logger LOG = Logger.getLogger(SupplierSource.class.getName()); + private SerializableSupplier supplier; + protected SpoutOutputCollector collector; + + // protected used to allow unit test access + protected Cache msgIdCache; + protected String msgId; + private Level logLevel = Level.INFO; public SupplierSource(SerializableSupplier supplier) { this.supplier = supplier; } + // The emit methods return a list of taskIds. They are collected to facilitate unit testing. + protected List taskIds; + + @SuppressWarnings("rawtypes") @Override - public void nextTuple() { - collector.emit(new Values(supplier.get())); + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector outputCollector) { + collector = outputCollector; + ackingEnabled = isAckingEnabled(map, topologyContext); + msgIdCache = createCache(); + } + + @Override public void nextTuple() { + msgId = null; + R data = supplier.get(); + if (ackingEnabled) { + msgId = getUniqueMessageId(); + msgIdCache.put(msgId, data); + taskIds = collector.emit(new Values(data), msgId); + LOG.log(logLevel, "emitted: [" + data + ": " + msgId + "]"); + } else { + taskIds = collector.emit(new Values(data)); + } + } + + @Override public void ack(Object mid) { + if (ackingEnabled) { + msgIdCache.invalidate(mid); + LOG.log(logLevel, "acked: [" + mid + "]"); + } + } + + @Override public void fail(Object mid) { + if (ackingEnabled) { + Values values = new Values(msgIdCache.getIfPresent(mid)); + taskIds = collector.emit(values, mid); + LOG.log(logLevel, "re-emit: [" + mid + "]"); + } } } diff --git a/heron/api/tests/java/BUILD b/heron/api/tests/java/BUILD index e38069e7aa9..c893d6ab4ce 100644 --- a/heron/api/tests/java/BUILD +++ b/heron/api/tests/java/BUILD @@ -37,7 +37,9 @@ java_tests( "org.apache.heron.streamlet.impl.utils.StreamletUtilsTest", "org.apache.heron.api.ConfigTest", "org.apache.heron.api.HeronSubmitterTest", - "org.apache.heron.api.utils.UtilsTest" + "org.apache.heron.api.utils.UtilsTest", + "org.apache.heron.streamlet.impl.sources.SupplierSourceTest", + "org.apache.heron.streamlet.impl.sources.ComplexSourceTest" ], runtime_deps = [ ":api-tests" ], size = "small", diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/sources/ComplexIntegerSource.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/sources/ComplexIntegerSource.java new file mode 100644 index 00000000000..a67bdfb4f55 --- /dev/null +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/sources/ComplexIntegerSource.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.heron.streamlet.impl.sources; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.heron.streamlet.Context; +import org.apache.heron.streamlet.Source; + +public class ComplexIntegerSource implements Source { + + private List intList; + + ComplexIntegerSource() { + intList = new ArrayList<>(); + } + + @Override + public void setup(Context context) { + } + + @Override public Collection get() { + intList.clear(); + int i = ThreadLocalRandom.current().nextInt(25); + intList.add(i + 1); + intList.add(i + 2); + intList.add(i + 3); + return intList; + + } + + @Override + public void cleanup() { + } + +} diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/sources/ComplexSourceTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/sources/ComplexSourceTest.java new file mode 100644 index 00000000000..750009498bb --- /dev/null +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/sources/ComplexSourceTest.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.heron.streamlet.impl.sources; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.heron.api.Config; +import org.apache.heron.api.spout.SpoutOutputCollector; +import org.apache.heron.api.topology.TopologyContext; +import org.apache.heron.streamlet.Source; + +import static org.powermock.api.mockito.PowerMockito.mock; + +public class ComplexSourceTest { + + private ComplexSource source; + private Map confMap = new HashMap<>(); + private TopologyContext mockContext = mock(TopologyContext.class); + + private Source generator = + new org.apache.heron.streamlet.impl.sources.ComplexIntegerSource(); + + private String msgId; + private int limit = 10000; + + public ComplexSourceTest() { + confMap.put("topology.reliability.mode", Config.TopologyReliabilityMode.ATMOST_ONCE); + SpoutOutputCollector mySpout = + new SpoutOutputCollector(new org.apache.heron.streamlet.impl.sources.TestCollector()); + source = new ComplexSource(generator); + source.open(confMap, mockContext, mySpout); + } + + @Before + public void preTestSetup() { + source.msgIdCache.invalidateAll(); + source.taskIds = null; + } + + /** + * Verify that acking removes entry from cache. + */ + @Test + public void testAckWithAckingEnabled() { + source.ackingEnabled = true; + // verify cache is empty + Assert.assertEquals(0, source.msgIdCache.size()); + // Add a 'message id' entry to cache + source.msgIdCache.put("msgId", "1"); + Assert.assertEquals(1, source.msgIdCache.size()); + Assert.assertEquals("1", source.msgIdCache.getIfPresent("msgId")); + // ack this entry + source.ack("msgId"); + // verify the message id entry is no longer in the cache + Assert.assertEquals(0, source.msgIdCache.size()); + Assert.assertNull(source.msgIdCache.getIfPresent("msgId")); + } + + /** + * Ack many ids and verify cache is emptied out. + */ + @Test + public void testMultipleAcksWithAckingEnabled() { + source.ackingEnabled = true; + // verify cache is empty + Assert.assertEquals(0, source.msgIdCache.size()); + // fill cache with many entries + for (int i = 0; i < limit; i++) { + msgId = "mid-" + String.valueOf(i); + source.msgIdCache.put(msgId, String.valueOf(i)); + Assert.assertEquals(String.valueOf(i), source.msgIdCache.getIfPresent(msgId)); + } + Assert.assertEquals(limit, source.msgIdCache.size()); + // ack all of the entries + for (int i = 0; i < limit; i++) { + msgId = "mid-" + String.valueOf(i); + source.ack(msgId); + } + // verify cache is now empty, i.e., all ids were 'acked' successfully + Assert.assertEquals(0, source.msgIdCache.size()); + } + + /** + * With acking disabled the cache is not involved. Use this fact to + * verify acking with ackingEnabled set to false. + */ + @Test + public void testAckWithAckingDisabled1() { + source.ackingEnabled = false; + // clear all cache entries + Assert.assertEquals(0, source.msgIdCache.size()); + // Add an 'msgId' entry to cache. This entry is being placed into the + // cache solely to be used in verifying that the ack call has no effect + // on the cache. + source.msgIdCache.put("msgId", "1"); + Assert.assertEquals(1, source.msgIdCache.size()); + Assert.assertEquals("1", source.msgIdCache.getIfPresent("msgId")); + + source.ack("msgId"); + // with ackingEnabled set to false, the ack call is basically a noop so the + // cache is not involved so size should still be 1. + Assert.assertEquals(1, source.msgIdCache.size()); + Assert.assertNotNull(source.msgIdCache.getIfPresent("msgId")); + } + + /** + * As above, the cache is not involved when acking is disabled, so verify + * that nothing was added to cache by sending a ack. + */ + @Test + public void testAckWithAckingDisabled2() { + source.ackingEnabled = false; + Assert.assertEquals(0, source.msgIdCache.size()); + source.ack("id1"); + // with no acking, the msgIdCache is not involved so size should still be 0 + Assert.assertEquals(0, source.msgIdCache.size()); + Assert.assertNull(source.msgIdCache.getIfPresent("id1")); + } + + /** + * If a message if failed when ackingEnabled is set, the cache will + * not remove the entry. Use this fact to test the fail method. + * Use the value of taskId's to verify the emit method was called + * as expected. + */ + @Test + public void testFailWithAckingEnabled() { + source.ackingEnabled = true; + // Add an entry to the cache to be used in failure test + source.msgIdCache.put("msgId", 1234); + Assert.assertEquals(1, source.msgIdCache.size()); + source.fail("msgId"); + Assert.assertNotNull(source.taskIds); + Assert.assertNotEquals(0, source.taskIds.size()); + // cache should still retain value + Assert.assertEquals(1, source.msgIdCache.size()); + } + + /** + * Fail many ids and verify cache is not emptied out. + */ + @Test + public void testMultipleFailsWithAckingEnabled() { + source.ackingEnabled = true; + // verify cache is empty + Assert.assertEquals(0, source.msgIdCache.size()); + // fill cache with many entries + for (int i = 0; i < limit; i++) { + msgId = "mid-" + String.valueOf(i); + source.msgIdCache.put(msgId, String.valueOf(i)); + Assert.assertEquals(String.valueOf(i), source.msgIdCache.getIfPresent(msgId)); + } + Assert.assertEquals(limit, source.msgIdCache.size()); + // fail all of the entries + for (int i = 0; i < limit; i++) { + msgId = "mid-" + String.valueOf(i); + source.fail(msgId); + } + // verify cache is not empty, i.e., all ids should still be in cache + Assert.assertEquals(limit, source.msgIdCache.size()); + } + + /** + * Failing with acking disabled should do nothing. Verify that no + * taskIds are returned via an emit call. + */ + @Test + public void testFailWithAckingDisabled() { + source.ackingEnabled = false; + String mid = "msgId"; + source.fail(mid); + Assert.assertNull(source.taskIds); + } + + /** + * Verify that nextTuple adds an entry to the cache and that + * a taskId value is returned. + */ + @Test + public void testNextTupleWithAckingEnabled() { + source.ackingEnabled = true; + Assert.assertEquals(0, source.msgIdCache.size()); + source.nextTuple(); + // This complexSource returns three entries per nextTuple call. + Assert.assertEquals(3, source.msgIdCache.size()); + Assert.assertNotNull(source.taskIds); + } + + /** + * Verify that values are emitted and that no cache is used. + */ + @Test + public void testNextTupleWithAckingDisabled() { + source.ackingEnabled = false; + int expectedTaskId = 1234; + // cache should not be utilized + Assert.assertEquals(0, source.msgIdCache.size()); + Assert.assertNull(source.taskIds); + for (int i = 0; i < limit; i++) { + source.nextTuple(); + Assert.assertEquals(expectedTaskId, source.taskIds.get(0)); + } + Assert.assertEquals(0, source.msgIdCache.size()); + } + +} diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/sources/SupplierSourceTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/sources/SupplierSourceTest.java new file mode 100644 index 00000000000..8c1ae61b84f --- /dev/null +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/sources/SupplierSourceTest.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.heron.streamlet.impl.sources; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.heron.api.Config.TopologyReliabilityMode; +import org.apache.heron.api.spout.SpoutOutputCollector; +import org.apache.heron.api.topology.TopologyContext; +import org.apache.heron.streamlet.SerializableSupplier; + +import static org.powermock.api.mockito.PowerMockito.mock; + + +public class SupplierSourceTest { + + private AtomicInteger atomicInteger = new AtomicInteger(0); + private SupplierSource source; + + private SerializableSupplier supplier = + (SerializableSupplier) () -> atomicInteger.getAndIncrement(); + + private Map confMap = new HashMap<>(); + private TopologyContext mockContext = mock(TopologyContext.class); + + private String msgId; + private int limit = 10000; + + + public SupplierSourceTest() { + confMap.put("topology.reliability.mode", TopologyReliabilityMode.ATMOST_ONCE); + SpoutOutputCollector mySpout = + new SpoutOutputCollector(new org.apache.heron.streamlet.impl.sources.TestCollector()); + source = new SupplierSource<>(supplier); + source.open(confMap, mockContext, mySpout); + } + + + @Before + public void preTestSetup() { + source.msgIdCache.invalidateAll(); + source.taskIds = null; + } + + /** + * Verify that acking removes entry from cache. + */ + @Test + public void testAckWithAckingEnabled() { + source.ackingEnabled = true; + // verify cache is empty + Assert.assertEquals(0, source.msgIdCache.size()); + // Add a 'message id' entry to cache + source.msgIdCache.put("msgId", "1"); + Assert.assertEquals(1, source.msgIdCache.size()); + Assert.assertEquals("1", source.msgIdCache.getIfPresent("msgId")); + // ack this entry + source.ack("msgId"); + // verify the message id entry is no longer in the cache + Assert.assertEquals(0, source.msgIdCache.size()); + Assert.assertNull(source.msgIdCache.getIfPresent("msgId")); + } + + /** + * Ack many ids and verify cache is emptied out. + */ + @Test + public void testMultipleAcksWithAckingEnabled() { + source.ackingEnabled = true; + // verify cache is empty + Assert.assertEquals(0, source.msgIdCache.size()); + // fill cache with many entries + for (int i = 0; i < limit; i++) { + msgId = "mid-" + String.valueOf(i); + source.msgIdCache.put(msgId, String.valueOf(i)); + Assert.assertEquals(String.valueOf(i), source.msgIdCache.getIfPresent(msgId)); + } + Assert.assertEquals(limit, source.msgIdCache.size()); + // ack all of the entries + for (int i = 0; i < limit; i++) { + msgId = "mid-" + String.valueOf(i); + source.ack(msgId); + } + // verify cache is now empty, i.e., all ids were 'acked' successfully + Assert.assertEquals(0, source.msgIdCache.size()); + } + + /** + * With acking disabled the cache is not involved. Use this fact to + * verify proper behavior with ackingEnabled set to false. + */ + @Test + public void testAckWithAckingDisabled1() { + source.ackingEnabled = false; + Assert.assertEquals(0, source.msgIdCache.size()); + // Add a 'msgId' entry to cache. This entry is being placed into the + // cache solely to be used in verifying that the ack call has no effect + // on the cache. + source.msgIdCache.put("msgId", "1"); + Assert.assertEquals(1, source.msgIdCache.size()); + Assert.assertEquals("1", source.msgIdCache.getIfPresent("msgId")); + + source.ack("msgId"); + // with ackingEnabled set to false, the ack call is basically a noop so the + // cache is not involved so size should still be 1. + Assert.assertEquals(1, source.msgIdCache.size()); + Assert.assertNotNull(source.msgIdCache.getIfPresent("msgId")); + } + + /** + * As above, the cache is not involved when acking is disabled, so verify + * that nothing was added to cache by sending a ack. + */ + @Test + public void testAckWithAckingDisabled2() { + source.ackingEnabled = false; + Assert.assertEquals(0, source.msgIdCache.size()); + source.ack("id1"); + // with no acking, the msgIdCache is not involved so size should still be 0 + Assert.assertEquals(0, source.msgIdCache.size()); + Assert.assertNull(source.msgIdCache.getIfPresent("id1")); + } + + /** + * If a message if failed when ackingEnabled is set, the cache will + * not remove the entry. Use this fact to test the fail method. + * Use the value of taskId's to verify the emit method was called + * as expected. + */ + @Test + public void testFailWithAckingEnabled() { + source.ackingEnabled = true; + // Add an entry to the cache to be used in failure test + source.msgIdCache.put("msgId", 1234); + Assert.assertEquals(1, source.msgIdCache.size()); + source.fail("msgId"); + Assert.assertNotNull(source.taskIds); + Assert.assertNotEquals(0, source.taskIds.size()); + // cache should still retain value + Assert.assertEquals(1, source.msgIdCache.size()); + } + + /** + * Fail many ids and verify cache is not emptied out. + */ + @Test + public void testMultipleFailsWithAckingEnabled() { + source.ackingEnabled = true; + // verify cache is empty + Assert.assertEquals(0, source.msgIdCache.size()); + // fill cache with many entries + for (int i = 0; i < limit; i++) { + msgId = "mid-" + String.valueOf(i); + source.msgIdCache.put(msgId, String.valueOf(i)); + Assert.assertEquals(String.valueOf(i), source.msgIdCache.getIfPresent(msgId)); + } + Assert.assertEquals(limit, source.msgIdCache.size()); + // fail all of the entries + for (int i = 0; i < limit; i++) { + msgId = "mid-" + String.valueOf(i); + source.fail(msgId); + } + // verify cache is not empty, i.e., all ids should still be in cache + Assert.assertEquals(limit, source.msgIdCache.size()); + } + + /** + * Failing with acking disabled should do nothing. Verify that no + * taskIds are returned via an emit call. + */ + @Test + public void testFailWithAckingDisabled() { + source.ackingEnabled = false; + String mid = "msgId"; + source.fail(mid); + Assert.assertNull(source.taskIds); + } + + /** + * Verify that nextTuple adds an entry to the cache and that + * a taskId value is returned. + */ + @Test + public void testNextTupleWithAckingEnabled() { + source.ackingEnabled = true; + Assert.assertEquals(0, source.msgIdCache.size()); + source.nextTuple(); + Assert.assertEquals(1, source.msgIdCache.size()); + Assert.assertNotNull(source.taskIds); + } + + /** + * Verify that values are emitted and that no cache is used. + */ + @Test + public void testNextTupleWithAckingDisabled() { + source.ackingEnabled = false; + int expectedTaskId = 1234; + // cache should not be utilized + Assert.assertEquals(0, source.msgIdCache.size()); + Assert.assertNull(source.taskIds); + for (int i = 0; i < limit; i++) { + source.nextTuple(); + Assert.assertEquals(expectedTaskId, source.taskIds.get(0)); + } + Assert.assertEquals(0, source.msgIdCache.size()); + } + +} diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/sources/TestCollector.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/sources/TestCollector.java new file mode 100644 index 00000000000..0125dcca85f --- /dev/null +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/sources/TestCollector.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.heron.streamlet.impl.sources; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.heron.api.spout.ISpoutOutputCollector; + + +public class TestCollector implements ISpoutOutputCollector { + + @Override public List emit(String streamId, List tuple, Object messageId) { + int taskId = 1234; + List tskIds = new ArrayList<>(); + if (tuple != null) { + tskIds.add(taskId++); + } + return tskIds; + } + + @Override + public void emitDirect(int taskId, String streamId, List tuple, Object messageId) { + } + + @Override public void reportError(Throwable error) { + } +}