Class TopologyBuilder
```java TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("1", new TestWordSpout(true), 5); builder.setSpout("2", new TestWordSpout(true), 3); builder.setBolt("3", new TestWordCounter(), 3) .fieldsGrouping("1", new Fields("word")) .fieldsGrouping("2", new Fields("word")); builder.setBolt("4", new TestGlobalCount()) .globalGrouping("1");
Map<String, Object> conf = new HashMap(); conf.put(Config.TOPOLOGY_WORKERS, 4);
StormSubmitter.submitTopology("mytopology", conf, builder.createTopology()); ```
Running the exact same topology in local mode (in process), and configuring it to log all tuples emitted, looks like the following. Note that it lets the topology run for 10 seconds before shutting down the local cluster.
```java TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("1", new TestWordSpout(true), 5); builder.setSpout("2", new TestWordSpout(true), 3); builder.setBolt("3", new TestWordCounter(), 3) .fieldsGrouping("1", new Fields("word")) .fieldsGrouping("2", new Fields("word")); builder.setBolt("4", new TestGlobalCount()) .globalGrouping("1");
Map<String, Object> conf = new HashMap(); conf.put(Config.TOPOLOGY_WORKERS, 4); conf.put(Config.TOPOLOGY_DEBUG, true);
try (LocalCluster cluster = new LocalCluster(); LocalTopology topo = cluster.submitTopology("mytopology", conf, builder.createTopology());){ Utils.sleep(10000); } ```
The pattern for `TopologyBuilder` is to map component ids to components using the setSpout and setBolt methods. Those methods return objects that are then used to declare the inputs for that component.
- 
Nested Class SummaryNested ClassesModifier and TypeClassDescriptionprotected classprotected classprotected class
- 
Constructor SummaryConstructors
- 
Method SummaryModifier and TypeMethodDescriptionvoidaddWorkerHook(IWorkerHook workerHook) Add a new worker lifecycle hook.setBolt(String id, SerializableBiConsumer<Tuple, BasicOutputCollector> biConsumer, Number parallelismHint, String... fields) Define a new bolt in this topology.setBolt(String id, SerializableBiConsumer<Tuple, BasicOutputCollector> biConsumer, String... fields) Define a new bolt in this topology.setBolt(String id, SerializableConsumer<Tuple> consumer) Define a new bolt in this topology.setBolt(String id, SerializableConsumer<Tuple> consumer, Number parallelismHint) Define a new bolt in this topology.setBolt(String id, IBasicBolt bolt) Define a new bolt in this topology.setBolt(String id, IBasicBolt bolt, Number parallelismHint) Define a new bolt in this topology.Define a new bolt in this topology with parallelism of just one thread.Define a new bolt in this topology with the specified amount of parallelism.<T extends State>
 BoltDeclarersetBolt(String id, IStatefulBolt<T> bolt) Define a new bolt in this topology.<T extends State>
 BoltDeclarersetBolt(String id, IStatefulBolt<T> bolt, Number parallelismHint) Define a new bolt in this topology.<T extends State>
 BoltDeclarersetBolt(String id, IStatefulWindowedBolt<T> bolt) Define a new bolt in this topology.<T extends State>
 BoltDeclarersetBolt(String id, IStatefulWindowedBolt<T> bolt, Number parallelismHint) Define a new bolt in this topology.setBolt(String id, IWindowedBolt bolt) Define a new bolt in this topology.setBolt(String id, IWindowedBolt bolt, Number parallelismHint) Define a new bolt in this topology.setSpout(String id, SerializableSupplier<?> supplier) Define a new spout in this topology.setSpout(String id, SerializableSupplier<?> supplier, Number parallelismHint) Define a new spout in this topology with the specified parallelism.setSpout(String id, IRichSpout spout) Define a new spout in this topology.setSpout(String id, IRichSpout spout, Number parallelismHint) Define a new spout in this topology with the specified parallelism.
- 
Constructor Details- 
TopologyBuilderpublic TopologyBuilder()
 
- 
- 
Method Details- 
createTopology
- 
setBoltDefine a new bolt in this topology with parallelism of just one thread.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- bolt- the bolt
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setBoltpublic BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelismHint) throws IllegalArgumentException Define a new bolt in this topology with the specified amount of parallelism.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- bolt- the bolt
- parallelismHint- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setBoltDefine a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- bolt- the basic bolt
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setBoltpublic BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelismHint) throws IllegalArgumentException Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- bolt- the basic bolt
- parallelismHint- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setBoltDefine a new bolt in this topology. This defines a windowed bolt, intended for windowing operations. TheIWindowedBolt.execute(TupleWindow)method is triggered for each window interval with the list of current events in the window.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- bolt- the windowed bolt
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setBoltpublic BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelismHint) throws IllegalArgumentException Define a new bolt in this topology. This defines a windowed bolt, intended for windowing operations. TheIWindowedBolt.execute(TupleWindow)method is triggered for each window interval with the list of current events in the window.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- bolt- the windowed bolt
- parallelismHint- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setBoltpublic <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt) throws IllegalArgumentException Define a new bolt in this topology. This defines a stateful bolt, that requires its state (of computation) to be saved. When this bolt is initialized, theIStatefulComponent.initState(State)method is invoked afterIStatefulBolt.prepare(Map, TopologyContext, OutputCollector)but beforeIStatefulBolt.execute(Tuple)with its previously saved state.The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology are expected to anchor the tuples while emitting and ack the input tuples once its processed. - Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- bolt- the stateful bolt
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setBoltpublic <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt, Number parallelismHint) throws IllegalArgumentException Define a new bolt in this topology. This defines a stateful bolt, that requires its state (of computation) to be saved. When this bolt is initialized, theIStatefulComponent.initState(State)method is invoked afterIStatefulBolt.prepare(Map, TopologyContext, OutputCollector)but beforeIStatefulBolt.execute(Tuple)with its previously saved state.The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology are expected to anchor the tuples while emitting and ack the input tuples once its processed. - Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- bolt- the stateful bolt
- parallelismHint- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setBoltpublic <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt) throws IllegalArgumentException Define a new bolt in this topology. This defines a stateful windowed bolt, intended for stateful windowing operations. TheIWindowedBolt.execute(TupleWindow)method is triggered for each window interval with the list of current events in the window. During initialization of this boltIStatefulComponent.initState(State)is invoked with its previously saved state.- Type Parameters:
- T- the type of the state (e.g.- KeyValueState)
- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- bolt- the stateful windowed bolt
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setBoltpublic <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt, Number parallelismHint) throws IllegalArgumentException Define a new bolt in this topology. This defines a stateful windowed bolt, intended for stateful windowing operations. TheIWindowedBolt.execute(TupleWindow)method is triggered for each window interval with the list of current events in the window. During initialization of this boltIStatefulComponent.initState(State)is invoked with its previously saved state.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- bolt- the stateful windowed bolt
- parallelismHint- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setBoltpublic BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple, BasicOutputCollector> biConsumer, String... fields) throws IllegalArgumentExceptionDefine a new bolt in this topology. This defines a lambda basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- biConsumer- lambda expression that implements tuple processing for this bolt
- fields- fields for tuple that should be emitted to downstream bolts
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setBoltpublic BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple, BasicOutputCollector> biConsumer, Number parallelismHint, String... fields) throws IllegalArgumentExceptionDefine a new bolt in this topology. This defines a lambda basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- biConsumer- lambda expression that implements tuple processing for this bolt
- fields- fields for tuple that should be emitted to downstream bolts
- parallelismHint- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setBoltpublic BoltDeclarer setBolt(String id, SerializableConsumer<Tuple> consumer) throws IllegalArgumentException Define a new bolt in this topology. This defines a lambda basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- consumer- lambda expression that implements tuple processing for this bolt
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setBoltpublic BoltDeclarer setBolt(String id, SerializableConsumer<Tuple> consumer, Number parallelismHint) throws IllegalArgumentException Define a new bolt in this topology. This defines a lambda basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
- consumer- lambda expression that implements tuple processing for this bolt
- parallelismHint- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
- Returns:
- use the returned object to declare the inputs to this component
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setSpoutDefine a new spout in this topology.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this spout's outputs.
- spout- the spout
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setSpoutpublic SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelismHint) throws IllegalArgumentException Define a new spout in this topology with the specified parallelism. If the spout declares itself as non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this spout's outputs.
- parallelismHint- the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somewhere around the cluster.
- spout- the spout
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setSpoutpublic SpoutDeclarer setSpout(String id, SerializableSupplier<?> supplier) throws IllegalArgumentException Define a new spout in this topology.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this spout's outputs.
- supplier- lambda expression that implements tuple generating for this spout
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
setSpoutpublic SpoutDeclarer setSpout(String id, SerializableSupplier<?> supplier, Number parallelismHint) throws IllegalArgumentException Define a new spout in this topology with the specified parallelism. If the spout declares itself as non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component.- Parameters:
- id- the id of this component. This id is referenced by other components that want to consume this spout's outputs.
- parallelismHint- the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somewhere around the cluster.
- supplier- lambda expression that implements tuple generating for this spout
- Throws:
- IllegalArgumentException- if- parallelism_hintis not positive
 
- 
addWorkerHookAdd a new worker lifecycle hook.- Parameters:
- workerHook- the lifecycle hook to add
 
 
-