diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasks.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasks.java index bc1437759df..b52ea267fe3 100644 --- a/integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasks.java +++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasks.java @@ -33,7 +33,7 @@ private OneBoltMultiTasks(String[] args) throws MalformedURLException { @Override protected TestTopologyBuilder buildTopology(TestTopologyBuilder builder) { - builder.setSpout("ab-spout", new ABSpout(), 1); + builder.setSpout("ab-spout", new ABSpout(true), 1); builder.setBolt("identity-bolt", new IdentityBolt(new Fields("word")), 3) .shuffleGrouping("ab-spout"); return builder; diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasksResults.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasksResults.json index b822614c4ef..43b827b22bb 100644 --- a/integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasksResults.json +++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasksResults.json @@ -1 +1 @@ -["A", "B", "A", "B", "A", "B", "A", "B", "A", "B"] \ No newline at end of file +["A_0", "B_1", "A_2", "B_3", "A_4", "B_5", "A_6", "B_7", "A_8", "B_9"] \ No newline at end of file diff --git a/integration_test/src/python/integration_test/common/spout/ab_spout.py b/integration_test/src/python/integration_test/common/spout/ab_spout.py index 5408981f7ca..3862a40cc46 100644 --- a/integration_test/src/python/integration_test/common/spout/ab_spout.py +++ b/integration_test/src/python/integration_test/common/spout/ab_spout.py @@ -17,6 +17,7 @@ '''ABSpout for integration test''' from heronpy.api.spout.spout import Spout +from ..core import constants as integ_const #pylint: disable=unused-argument class ABSpout(Spout): @@ -25,8 +26,11 @@ class ABSpout(Spout): def initialize(self, config, context): self.to_send = ["A", "B"] self.emitted = 0 + self.append_stream_id = config[integ_const.USER_APPEND_STREAM_ID] def next_tuple(self): word = self.to_send[self.emitted % len(self.to_send)] + if self.append_stream_id: + word = word + "_" + self.emitted self.emitted += 1 self.emit([word]) diff --git a/integration_test/src/python/integration_test/core/constants.py b/integration_test/src/python/integration_test/core/constants.py index 219b1b8d71a..9ab9ce7118d 100644 --- a/integration_test/src/python/integration_test/core/constants.py +++ b/integration_test/src/python/integration_test/core/constants.py @@ -28,3 +28,5 @@ USER_BOLT_CLASSPATH = "user.bolt.classpath" # user defined max executions USER_MAX_EXECUTIONS = "user.max.exec" +# append stream id or not +USER_APPEND_STREAM_ID = "user.append.stream_id" diff --git a/integration_test/src/python/integration_test/core/test_topology_builder.py b/integration_test/src/python/integration_test/core/test_topology_builder.py index 1fdfa5b50bb..0a56594d6c2 100644 --- a/integration_test/src/python/integration_test/core/test_topology_builder.py +++ b/integration_test/src/python/integration_test/core/test_topology_builder.py @@ -49,7 +49,8 @@ def __init__(self, name, http_server_url): self.prev = {} def add_spout(self, name, spout_cls, par, config=None, - optional_outputs=None, max_executions=None): + optional_outputs=None, max_executions=None, + append_sequence_id=False): """Add an integration_test spout""" user_spec = spout_cls.spec(name) spout_classpath = user_spec.python_class_path @@ -70,6 +71,8 @@ def add_spout(self, name, spout_cls, par, config=None, if max_executions is not None: _config[integ_const.USER_MAX_EXECUTIONS] = max_executions + _config[integ_const.USER_APPEND_STREAM_ID] = append_sequence_id + test_spec = IntegrationTestSpout.spec(name, par, _config, user_spout_classpath=spout_classpath, user_output_fields=user_outputs) diff --git a/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/one_bolt_multi_tasks.py b/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/one_bolt_multi_tasks.py index f7997e8c786..44477a8dbe9 100644 --- a/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/one_bolt_multi_tasks.py +++ b/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/one_bolt_multi_tasks.py @@ -24,7 +24,7 @@ def one_bolt_multi_tasks_builder(topology_name, http_server_url): builder = TestTopologyBuilder(topology_name, http_server_url) - ab_spout = builder.add_spout("ab-spout", ABSpout, 1) + ab_spout = builder.add_spout("ab-spout", ABSpout, 1, append_sequence_id=True) builder.add_bolt("identity-bolt", IdentityBolt, inputs={ab_spout: Grouping.SHUFFLE},