Skip to content
This repository was archived by the owner on Mar 3, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private OneBoltMultiTasks(String[] args) throws MalformedURLException {
@Override
protected TestTopologyBuilder buildTopology(TestTopologyBuilder builder) {

builder.setSpout("ab-spout", new ABSpout(), 1);
builder.setSpout("ab-spout", new ABSpout(true), 1);
builder.setBolt("identity-bolt", new IdentityBolt(new Fields("word")), 3)
.shuffleGrouping("ab-spout");
return builder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
["A", "B", "A", "B", "A", "B", "A", "B", "A", "B"]
["A_0", "B_1", "A_2", "B_3", "A_4", "B_5", "A_6", "B_7", "A_8", "B_9"]
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
'''ABSpout for integration test'''

from heronpy.api.spout.spout import Spout
from ..core import constants as integ_const

#pylint: disable=unused-argument
class ABSpout(Spout):
Expand All @@ -25,8 +26,11 @@ class ABSpout(Spout):
def initialize(self, config, context):
self.to_send = ["A", "B"]
self.emitted = 0
self.append_stream_id = config[integ_const.USER_APPEND_STREAM_ID]

def next_tuple(self):
word = self.to_send[self.emitted % len(self.to_send)]
if self.append_stream_id:
word = word + "_" + self.emitted
self.emitted += 1
self.emit([word])
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@
USER_BOLT_CLASSPATH = "user.bolt.classpath"
# user defined max executions
USER_MAX_EXECUTIONS = "user.max.exec"
# append stream id or not
USER_APPEND_STREAM_ID = "user.append.stream_id"
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def __init__(self, name, http_server_url):
self.prev = {}

def add_spout(self, name, spout_cls, par, config=None,
optional_outputs=None, max_executions=None):
optional_outputs=None, max_executions=None,
append_sequence_id=False):
"""Add an integration_test spout"""
user_spec = spout_cls.spec(name)
spout_classpath = user_spec.python_class_path
Expand All @@ -70,6 +71,8 @@ def add_spout(self, name, spout_cls, par, config=None,
if max_executions is not None:
_config[integ_const.USER_MAX_EXECUTIONS] = max_executions

_config[integ_const.USER_APPEND_STREAM_ID] = append_sequence_id

test_spec = IntegrationTestSpout.spec(name, par, _config,
user_spout_classpath=spout_classpath,
user_output_fields=user_outputs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

def one_bolt_multi_tasks_builder(topology_name, http_server_url):
builder = TestTopologyBuilder(topology_name, http_server_url)
ab_spout = builder.add_spout("ab-spout", ABSpout, 1)
ab_spout = builder.add_spout("ab-spout", ABSpout, 1, append_sequence_id=True)

builder.add_bolt("identity-bolt", IdentityBolt,
inputs={ab_spout: Grouping.SHUFFLE},
Expand Down