Storm Message Filtering: Bolt Implementation Guide

In Storm, message filtering and routing functionality can be achieved by defining a Bolt. The specific steps are as follows:

  1. Create a filtering bolt to handle message filtering functionality. This bolt can determine whether to process a message based on its content or specific conditions. If processing is required, the message can be passed along; if not, the message can be ignored.
public class FilterBolt extends BaseRichBolt {
  private OutputCollector collector;

  @Override
  public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
  }

  @Override
  public void execute(Tuple input) {
    // 进行消息过滤逻辑
    if (/* 判断条件 */) {
      // 继续传递消息
      collector.emit(input, new Values(/* 消息内容 */));
    } else {
      // 忽略该消息
      collector.ack(input);
    }
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("filteredMessage"));
  }
}
  1. Develop a router Bolt to handle message routing functionality. Within this Bolt, messages can be routed to specific destination Bolts for processing based on their content or specific conditions.
public class RouterBolt extends BaseRichBolt {
  private OutputCollector collector;

  @Override
  public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
  }

  @Override
  public void execute(Tuple input) {
    // 进行消息路由逻辑
    if (/* 判断条件 */) {
      // 路由到目标Bolt中
      collector.emit("targetBolt", input, new Values(/* 消息内容 */));
    } else {
      // 路由到其他Bolt中
      collector.emit("otherBolt", input, new Values(/* 消息内容 */));
    }

    collector.ack(input);
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declareStream("targetBolt", new Fields("routedMessage"));
    declarer.declareStream("otherBolt", new Fields("routedMessage"));
  }
}
  1. Configure filters and router bolts in the Topology, and specify the path of message flow using TopologyBuilder.
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new MySpout(), 1);
builder.setBolt("filterBolt", new FilterBolt(), 2).shuffleGrouping("spout");
builder.setBolt("routerBolt", new RouterBolt(), 2).shuffleGrouping("filterBolt");

Config conf = new Config();
conf.setDebug(true);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("myTopology", conf, builder.createTopology());

By following the steps above, message filtering and routing functionality can be implemented in Storm. Depending on specific requirements, the Bolt can be further customized and extended to achieve more complex message processing logic.

bannerAds