目录
- 概述
- 架构
- 业务
- 总体
- work相关
- 通信
- 实践
- 数据流分流和合并
- 分流
- 数据流合并
- 事务
- 例子
- 说明
- Trident
- 例子
- Ack 机制
- 使用ACK
- ACK例子
- JStorm 任务的动态伸缩
- 命令行方式
- API接口
- 应用场景
- Jstorm 支持动态更新配置文件
- 命令行方式
- API接口
- 开发经验总结
- 运维经验总结
- 限流控制
- 限流和解除限流方式
- 使用方式
- Grouping 方式
- fieldsGrouping
- globalGrouping
- shuffleGrouping
- allGrouping
- IBasicBolt vs IRichBolt
- 性能优化
- 选型
- 增加并发
- 让task分配更均匀
- 使用MetaQ或Kafka时
- 调度定制化接口(0.9.5 及高版本)
- 自定义worker分配
- 默认调度算法
- jstorm状态管理
- 线上配置
- DRPC和Netty
- 数据流分流和合并
- 实际使用一些抽象的工具类
- 滑动窗口
- 实例
概述
- JStorm是一种实时流处理框架,基于Storm开源项目而来。它具有高性能、高伸缩性和高容错性的特点,能够处理海量的实时流数据,并在集群间进行高效的分布式处理。JStorm支持灵活的拓扑结构和丰富的数据处理功能,可以进行数据过滤、转换、聚合和计算等操作,并提供可靠的消息传递机制和分布式的数据持久化能力。JStorm还提供了可视化的管理界面,方便用户进行拓扑的配置、监控和调优等操作。总而言之,JStorm是一种功能强大的实时流处理框架,适用于各种大规模、高速的实时数据处理场景
- 应用场景
- 实时数据处理:JStorm可以实时处理和分析大规模的数据流,用于实时数据分析、实时数据仪表盘、实时监控等场景
- 实时风控:JStorm能够实时分析大量的数据,可以用于实时风控系统,例如实时检测异常行为、实时预测欺诈等
- 在线推荐系统:JStorm可以实时计算用户行为和兴趣,用于在线推荐系统,例如实时推荐商品、实时推荐内容等
- 实时监控和报警:JStorm可以实时收集和分析系统指标、日志等数据,用于实时监控和实时报警
- 实时大数据分析:JStorm可以快速处理大规模的数据集,用于实时数据分析和机器学习
- spark stream, flink同样也是实时流处理框架,如果需要分布式事务一致性,和更丰富的api支持,更活跃的社区还是使用Flink更合适,spark stream也可以,Jstorm近几年不咋活跃了
架构
业务
- Spout: Storm中的消息源,用于为Topology生产消息(数据),一般是从外部数据源不间断地读取数据并发送给Topology消息(tuple元组)
- Bolt: Storm中的消息处理者,用于为Topology进行消息的处理,Bolt可以执行过滤,聚合, 查询数据库等操作,而且可以一级一级的进行处理
总体
-
依赖zookeeper集群
Nimbus: Storm集群的Master节点,负责资源分配和任务调度,负责分发用户代码,指派给具体的Supervisor节点上的Worker节点,去运行Topology对应的组件(Spout/Bolt)的Task
Supervisor: Storm集群的从节点,负责接受Nimbus分配的任务,启动和停止属于自己管理的worker进程。通过Storm的配置文件中的supervisor.slots.ports配置项,可以指定在一个Supervisor上最大允许多少个Slot,每个Slot通过端口号来唯一标识,一个端口号对应一个Worker进程
Worker: 负责运行具体处理组件逻辑的进程。Worker运行的任务类型只有两种,一种是Spout任务,一种是Bolt任务
Task: worker中每一个spout/bolt的线程称为一个task?同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor
ZooKeeper: 用来协调Nimbus和Supervisor,如果Supervisor因故障出现问题而无法运行Topology,Nimbus会第一时间感知到,并重新分配Topology到其它可用的Supervisor上运行
work相关
- 1个worker进程执行的是1个topology的子集(不会出现1个worker为多个topology服务)。1个worker进程会启动1个或多个executor线程来执行1个topology的component(spout或bolt)。因此,1个运行中的topology就是由集群中多台物理机上的多个worker进程组成的。
- executor是1个被worker进程启动的单独线程。每个executor只会运行1个topology的1个component(spout或bolt)的task(task可以是1个或多个,storm默认是1个component只生成1个task,executor线程里会在每次循环里顺序调用所有task实例)
- 一个 task 执行实际的数据处理 - 在业务代码中实现的每个 spout 或 bolt 在整个集群上都执行了许多的 task, 组件的 task数量在 topology(拓扑)的整个生命周期中总是相同的, 但组件的 executors(线程) 数量可能会随时间而变化。 这意味着以下条件成立: #threads ≤ #tasks. 默认情况下,tasks(任务)数量与 executors(执行器)设置成一样,即1个executor线程只运行1个task
通信
- 因为一个topology有可能分布在不同机器,那不同bolt直接通信比如bolt1处理完数据要丢到bolt2,之前通信可以用mq,netty等,公司线上用的是netty
storm.messaging.transport = com.alibaba.jstorm.message.netty.NettyContext
调度不同拓扑的调度器用的默认
topology.scheduler.strategy = backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
实践
数据流分流和合并
分流
- 分流有2钟情况,第一种是,相同的tuple发往下一级不同的bolt, 第二种,分别发送不同的tuple到不同的下级bolt上
- 相同的tuple发往下一级不同的bolt, 本质上就是不同的bolt消费相同的一个spout
SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
new SequenceSpout(), spoutParal);
builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
.shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
- 分别发送不同的tuple到不同的下级bolt上, 这个时候,就需要引入stream概念,发送方发送a 消息到接收方A'时使用stream A, 发送b 消息到接收方B'时,使用stream B
# 在topology提交时:
builder.setBolt(SequenceTopologyDef.SPLIT_BOLT_NAME, new SplitRecord(), 2).shuffleGrouping(
SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字
SequenceTopologyDef.TRADE_STREAM_ID); // --- 接收发送方该stream 的tuple
builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
.shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字
SequenceTopologyDef.CUSTOMER_STREAM_ID); // --- 接收发送方该stream 的tuple
# 在发送消息时
public void execute(Tuple tuple, BasicOutputCollector collector) {
tpsCounter.count();
Long tupleId = tuple.getLong(0);
Object obj = tuple.getValue(1);
if (obj instanceof TradeCustomer) {
TradeCustomer tradeCustomer = (TradeCustomer)obj;
Pair trade = tradeCustomer.getTrade();
Pair customer = tradeCustomer.getCustomer();
collector.emit(SequenceTopologyDef.TRADE_STREAM_ID,
new Values(tupleId, trade));
collector.emit(SequenceTopologyDef.CUSTOMER_STREAM_ID,
new Values(tupleId, customer));
} else if (obj != null) {
LOG.info("Unknow type " + obj.getClass().getName());
} else {
LOG.info("Nullpointer " );
}
}
# 定义输出流格式
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(SequenceTopologyDef.TRADE_STREAM_ID, new Fields("ID", "TRADE"));
declarer.declareStream(SequenceTopologyDef.CUSTOMER_STREAM_ID, new Fields("ID", "CUSTOMER"));
}
# 接受消息时,需要判断数据流
if (input.getSourceStreamId().equals(SequenceTopologyDef.TRADE_STREAM_ID) ) {
customer = pair;
customerTuple = input;
tradeTuple = tradeMap.get(tupleId);
if (tradeTuple == null) {
customerMap.put(tupleId, input);
return;
}
trade = (Pair) tradeTuple.getValue(1);
}
数据流合并
- 本质上是一个bolt监听多个spout
# 生成topology时
builder.setBolt(SequenceTopologyDef.MERGE_BOLT_NAME, new MergeRecord(), 1)
.shuffleGrouping(SequenceTopologyDef.TRADE_BOLT_NAME)
.shuffleGrouping(SequenceTopologyDef.CUSTOMER_BOLT_NAME);
# 接收方是,区分一下来源component即可识别出数据的来源
if (input.getSourceComponent().equals(SequenceTopologyDef.CUSTOMER_BOLT_NAME) ) {
customer = pair;
customerTuple = input;
tradeTuple = tradeMap.get(tupleId);
if (tradeTuple == null) {
customerMap.put(tupleId, input);
return;
}
trade = (Pair) tradeTuple.getValue(1);
} else if (input.getSourceComponent().equals(SequenceTopologyDef.TRADE_BOLT_NAME)) {
trade = pair;
tradeTuple = input;
customerTuple = customerMap.get(tupleId);
if (customerTuple == null) {
tradeMap.put(tupleId, input);
return;
}
customer = (Pair) customerTuple.getValue(1);
}
事务
例子
- Jstorm由于近几年都没怎么维护,导致官方代码下载下来运行单侧跑不通,所以事务这块不建议使用,没咋维护了。如果真的有exactly-one和分布式一直的事务要求,请移步Flink的checkpoint和事务的二阶段提交
- 需要exactly-once 保证和分布式事务一致性(比如写到两个不同地址的数据库)的场景会用到
说明
- storm的事务主要用于对数据准确性要求非常高的环境中,尤其是在计算交易金额或笔数,数据库同步的场景中
- JStorm事务设计:
- 一次性从Kafka 中取出一批数据,然后一条一条将数据发送出去,当所有数据均被正确处理后, 触发一个commit 流,这个commit流是严格排序,通常在commit流中进行flush动作或刷数据库动作,如果commit流最后返回也成功,spout 就更新Meta或kafka的偏移量,否则,任何一个环节出错,都不会更新偏移量,也就最终重复消费这批数据。
- 其实,相当于把一个batch当做一个原子tuple来处理,只是中间计算的过程,可以并发
- 事务核心设计
- 提供一个strong order,也就是,如果一个tuple没有被完整的处理完,就不会处理下一个tuple,说简单一些,就是,采用同步方式。并对每一个tuple赋予一个transaction ID,这个transaction ID是递增属性(强顺序性),如果每个bolt在处理tuple时,记录了上次的tupleID,这样即使在failure replay时能保证处理仅且处理一次
- 如果一次处理一个tuple,性能不够好,可以考虑,一次处理一批(batch tuples) 这个时候,一个batch为一个transaction处理单元,当一个batch处理完毕,才能处理下一个batch,每个batch赋予一个transaction ID
- 如果在计算任务中,并不是所有步骤需要强顺序性,因此将一个计算任务拆分为2个阶段:
3.1 processing 阶段:这个阶段可以并发
3.2 commit阶段:这个阶段必须强顺序性,因此,一个时刻,只要一个batch在被处理 任何一个阶段发生错误,都会完整重发batch
Trident
- Tident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。除此之外,Trident 还提供了一些专门的原语,从而在基于数据库或者其他存储的前提下来应付有状态的递增式处理
例子
- 需求:从一个流式输入中读取语句病计算每个单词的个数,提供查询给定单词列表中每个单词当前总数的功能
- 因为这只是一个例子,我们会从如下这样一个无限的输入流中读取语句作为输入
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"),
spout.setCycle(true);
- 这个spout会循环输出列出的那些语句到sentence stream当中,下面的代码会以这个stream作为输入并计算每个单词的个数:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(6);
- 更多用法可以参考-Storm Trident 教程
Ack 机制
- 通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作。比如在Meta中,成功被处理,即可更新偏移量,当失败时,重复发送数据。ACK能保障tuple一定被消费到。当spout触发fail动作时,不会自动重发失败的tuple,需要spout自己重新获取数据,手动重新再发送一次
- ACK机制只保证至少一次的原语,如果要保障只有一次,那自己得实现幂等保障,ACK机制用来保障消息在Spout和Bolt之间的可靠性传输的机制
- ack机制:
- spout发送的每一条消息,在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理
- 在规定的时间内,没有收到Acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败,
- 或者收到Acker发送的fail响应tuple,也认为失败,触发fail动作
- 另外Ack机制还常用于限流作用: 为了避免spout发送数据太快,而bolt处理太慢,常常设置pending数,当spout有等于或超过pending数的tuple没有收到ack或fail响应时,跳过执行nextTuple, 从而限制spout发送数据, 公司线上是设置成null,不限制
topology.max.spout.pending
使用ACK
spout 在发送数据的时候带上msgid
设置acker数至少大于0,Config.setNumAckers(conf, ackerParal);
在bolt中完成处理tuple时,执行OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple); 推荐使用IBasicBolt, 因为IBasicBolt 自动封装了OutputCollector.ack(tuple), 处理失败时,请抛出FailedException,则自动执行OutputCollector.fail(tuple)
需要注意的是,spout 发送 tuple 时,会将该 tuple 保存在内存中,直到收到 ack 消息后才会将其移除。这就意味着,如果内存不够大,且 tuple 处理时间过长,会导致内存不足,从而引发 OOM 异常。因此,在实际使用中,需要根据实际情况来进行调整。另外,JStorm 还提供了 fail 消息,用于表示 tuple 的处理失败。当 bolt 处理 tuple 失败时,可以向 spout 发送 fail 消息,以便 spout 进行相应的处理,比如重发该 tuple
ACK例子
- 基于JStorm的2.2.1版本,最终会发现在Spout fail方法被激活了,这边可以重新处理重新发送,可以自己维护一个内存数据结构来获取msgId对应数据,然后重新发送
public class WordCountTopology {
public static void main(String[] args) throws Exception {
// 创建一个TopologyBuilder
TopologyBuilder builder = new TopologyBuilder();
// 设置Spout
builder.setSpout("spout", new WordSpout(), 2);
// 设置Bolt
builder.setBolt("split", new WordSplitBolt(), 2)
.shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt(), 2)
.fieldsGrouping("split", new Fields("word"));
// 创建一个本地集群
LocalCluster cluster = new LocalCluster();
// 提交Topology到本地集群运行
Config config = new Config();
config.setDebug(true);
// 设置reliable mode
config.setNumAckers(1);
// config.setMaxSpoutPending(10000);
cluster.submitTopology("word-count", config, builder.createTopology());
}
}
public class WordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
// 随机暂停1秒-5秒
Utils.sleep((int) (Math.random() * 4000 + 1000));
// 发送Tuple到Bolt进行处理
String msgId = UUID.randomUUID().toString();
System.out.println("msgId: " + msgId);
collector.emit(new Values("hello world" + UUID.randomUUID()), msgId);
}
@Override
public void ack(Object msgId) {
// 成功处理消息,不需要进行重发
System.out.println("success: " + msgId);
}
@Override
public void fail(Object msgId) {
// 处理失败的消息,重新发送, 这边可以自己维护一个内存数据,当失败时重发
String messageId = (String) msgId;
System.out.println("fail msg " + msgId);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public class WordSplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
String[] words = word.split(" ");
// 发送每个单词到WordCountBolt进行计数, collector.emit(tuple, new Values(w))这样使用才能在下一个bolt使用时处理
for (String w : words) {
collector.emit(tuple, new Values(w));
}
System.out.println("sout res ----------------:" + word + "msgId: " + tuple.getMessageId().toString());
// 发送ack消息确认Tuple已经被成功处理
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private final Map<String, Integer> counts = new HashMap<>();
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
// 计算单词的频率
if (counts.containsKey(word)) {
counts.put(word, counts.get(word) + 1);
} else {
counts.put(word, 1);
}
System.out.println("sout res ++++++++++++++:" + word + ": " + counts.get(word));
// 发送ack消息确认Tuple已经被成功处理, 这里模拟发送失败
collector.fail(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// do nothing
}
@Override
public void cleanup() {
}
}
JStorm 任务的动态伸缩
- 动态调整的过程中 topology任务不会被重启或者下线,保证服务的持续性。在系统高负载和低负载时都能够有效的利用机器资源。 动态调整包括Task和Worker两个维度。动态伸缩当前仅对shuffer grouping有效(Shuffer, Localfirst, LocalOrShuffer)
- Task维度: Spout, Bolt,Acker并发数的动态调整
- Worker维度: Worker数的动态调整
命令行方式
- JStorm rebalance
USAGE: jstorm rebalance [-r] TopologyName [DelayTime] [NewConfig]
e.g. jstorm rebalance SequenceTest conf.yaml
参数说明:
-r: 对所有task做重新调度
TopologyName: Topology任务的名字
DelayTime: 开始执行动态调整的延迟时间
NewConfig: Spout, Bolt, Acker, Worker的新配置(当前仅支持yaml格式的配置文件)
- 配置文件例子
topology.workers: 4
topology.acker.executors: 4
topology.spout.parallelism:
SequenceSpout : 2
topology.bolt.parallelism:
Total : 8
API接口
backtype.storm.command.rebalance:
- public static void submitRebalance(String topologyName, RebalanceOptions options)
- public static void submitRebalance(String topologyName, RebalanceOptions options, Map conf)
Note: conf里带上zk信息,可以用来做对一个集群的远程提交
应用场景
- 根据系统的负载情况,进行任务的动态调整。JStorm提供了task和worker维度一些系统监控负载情况查询的API。 用户可以通过这些API去判断当前任务的负载情况,由此去判断是否要对任务相关的component进行扩容或者缩容
- 集群扩容时,对任务的扩容
Jstorm 支持动态更新配置文件
- 更新后的配置文件能直接被应用响应,在响应过程中 topology任务不会被重启或者下线,保证服务的持续性。拓扑配置文件的更新不包括对worker和component的并发度调整
- 拓扑中所有的component中只有继承了接口
public interface IDynamicComponent extends Serializable {
public void update(Map conf); //conf 配置文件
}
才能实现配置文件的动态更新,至于配置文件怎么去更新相应component的配置,是由用户事先实现好的接口update()决定的。当用户提交动态更新配置文件的命令后,该函数会被回调。更新配置文件是以component为级别的,每个component都有自己的update,如果哪个component不需要实现配置文件动态更新,那它就无需继续该接口
- 需要实现动态更新配置文件的bolt,一定不要去继承接口IBasicBolt
命令行方式
USAGE: jstorm update_topology TopologyName -conf configPath
e.g. jstorm update_topology SequenceTest –conf conf.yaml
参数说明:
TopologyName: Topology任务的名字
configPath: 配置文件名称(暂时只支持yaml格式)
- 配置文件例子
topology.max.spout.pending: 10000
send.sleep.second: 100
kryo.enable: false
fall.back.on.java.serialization: true
API接口
backtype.storm.command.update_topology:
- private static void updateTopology(String topologyName, String pathJar,
String pathConf)
Note: 参数pathJar是用设置来实现jar的更新,没有的话可以设置为null
开发经验总结
- 在jstorm中, spout中nextTuple和ack/fail运行在不同的线程中, 从而鼓励用户在nextTuple里面执行block的操作(比如从队列中take消息),从而节省cpu
- 非事务环境中,尽量使用IBasicBolt
- 计算并发度时,
- spout 按单task每秒500的QPS计算并发,全内存操作的task,按单task 每秒2000个QPS计算并发
- 有向外部输出结果的task,按外部系统承受能力进行计算并发
- 对于MetaQ 和 Kafka,
- 拉取的频率不要太小,低于100ms时,容易造成MetaQ/Kafka 空转次数偏多
- 一次获取数据Block大小推荐是2M或1M,太大内存GC压力比较大,太小效率比较低
- 条件允许时,尽量让程序可以报警,比如某种特殊情况出现时,比如截止到凌晨2点,数据还没有同步到hadoop,发送报警出来
- 从jstorm 0.9.5.1 开始, 底层netty同时支持同步模式和异步模式
- 异步模式, 性能更好, 但容易造成spout 出现fail, 适合在无acker模式下,storm.messaging.netty.sync.mode: false
- 同步模式, 底层是接收端收一条消息,才能发送一条消息, 适合在有acker模式下,storm.messaging.netty.sync.mode: true
运维经验总结
- 启动supervisor或nimbus最好是以后台方式启动, 避免终端退出时向jstorm发送信号,导致jstorm莫名其妙的退出
nohup jstorm supervisor >/dev/null 2>&1 &
- 推荐使用admin用户启动所有的程序, 尤其是不要用root用户启动web ui
- 在安装目录下,建议使用jstorm-current链接, 比如当前使用版本是jstorm 0.9.4, 则创建链接指向jstorm-0.9.4, 当以后升级时, 只需要将jstorm-current链接指向新的jstorm版本
ln -s jstorm-0.9.4 jstorm-current
- 将JStorm的本地目录和日志配置到一个公共目录下, 比如/home/admin/jstorm_data 和/home/admin/logs, 不要配置到JSTORM_HOME/logs,当升级时,替换整个目录时, 容易丢失所有的本地数据和日志
- JStorm支持环境变量JSTORM_CONF_DIR, 当设置了该变量时, 会从该目录里读取storm.yaml文件, 因此建议设置该变量,该变量指定的目录存放配置文件storm.yaml, 以后每次升级时,就可以简单的替换目录就可以了
- 建议不超过1个月,强制重启一下supervisor, 因为supervisor是一个daemon进程, 不停的创建子进程,当使用时间过长时, 文件打开的句柄会非常多,导致启动worker的时间会变慢,因此,建议每隔一周,强制重启一次supervisor
- 对于应用使用ZK较频繁的,需要将JStorm的ZK 和应用的ZK 隔离起来,不混在一起使用
- nimbus节点上建议不运行supervisor, 并建议把nimbus放置到ZK 所在的机器上运行
- 推荐slot数为 ”CPU 核 - 1“, 假设24核CPU, 则slot为23
- 配置cronjob,定时检查nimbus和supervisor,一旦进程死去,自动重启
- ZK 的maxClientCnxns=500
- Linux对外连接端口数限制,TCP client对外发起连接数达到28000左右时,就开始大量抛异常,需要
限流控制
- 限流控制,又称反压backpressure,jstorm的限流机制, 当下游bolt发生阻塞时, 并且阻塞task的比例超过某个比例时(现在默认设置为0.1), 即假设一个component有100个并发,当这个component 超过10个task 发生阻塞时,才会触发启动反压限流
- 在jstorm 连续4次采样周期中采样,队列情况,当队列超过80%(可以设置)时,即可认为该task处在阻塞状态
- 根据阻塞component,进行DAG 向上推算,直到推算到他的源头spout, 并将topology的一个状态位,设置为 “限流状态”
限流和解除限流方式
- 当task出现阻塞时,他会将自己的执行线程的执行时间, 传给topology master, 当触发阻塞后, topology master会把这个执行时间传给spout, 于是, spout每发送一个tuple,就会等待这个执行时间
- 当spout降速后, 发送过阻塞命令的task 检查队列水位连续4次低于0.05时, 发送解除反应命令到topology master, topology master 发送提速命令给所有的spout, 于是spout 每发送一个tuple的等待时间, 当spout的等待时间降为0时, spout会不断发送“解除限速”命令给 topology master, 而topology master确定所有的降速的spout都发了解除限速命令时, 将topology状态设置为正常,标志真正解除限速
使用方式
## 反压总开关
topology.backpressure.enable: true
## 高水位 -- 当队列使用量超过这个值时,认为阻塞
topology.backpressure.water.mark.high: 0.8
## 低水位 -- 当队列使用量低于这个量时, 认为可以解除阻塞
topology.backpressure.water.mark.low: 0.05
## 阻塞比例 -- 当阻塞task数/这个component并发 的比例高于这值时,触发反压
topology.backpressure.coordinator.trigger.ratio: 0.1
## 反压采样周期, 单位ms
topology.backpressure.check.interval: 1000
## 采样次数和采样比例, 即在连续4次采样中, 超过(不包含)(4 *0.75)次阻塞才能认为真正阻塞, 超过(不包含)(4 * 0.75)次解除阻塞才能认为是真正解除阻塞
topology.backpressure.trigger.sample.rate: 0.75
topology.backpressure.trigger.sample.number: 4
Grouping 方式
fieldsGrouping
- 类似SQL中的group by, 保证相同的Key的数据会发送到相同的task, 原理是 对某个或几个字段做hash,然后用hash结果求模得出目标taskId, 这样可以确保具有相同字段值的消息按顺序处理,但也会导致消息的负载不均衡。目前公司这个用的最多
globalGrouping
- target component第一个task
shuffleGrouping
- 轮询方式,平均分配tuple到下一级component上,使用随机方式将消息发送至下一个bolt的任务,即随机分配给下一个任务处理。这样可以平衡消息的负载,同时也会导致消息的不按顺序处理
localOrShuffleGrouping
- 本worker优先,如果本worker内有目标component的task,则随机从本worker内部的目标component的task中进行选择,否则就和普通的shuffleGrouping一样
localFirstGrouping
- 本worker优先级最高,如果本worker内有目标component的task,则随机从本worker内部的目标component的task中进行选择,
- 本节点优先级其次, 当本worker不能满足条件时,如果本supervisor下其他worker有目标component的task,则随机从中选择一个task进行发送
- 当上叙2种情况都不能满足时, 则从其他supervisor节点的目标task中随机选择一个task进行发送。
noneGrouping
- 随机发送tuple到目标component上,但无法保证平均
allGrouping
- 发送给target component所有task
directGrouping
- 发送指定目标task
customGrouping
- 使用用户接口CustomStreamGrouping选择出目标task
IBasicBolt vs IRichBolt
- 尽可能的使用IBasicBolt,在IBasicBolt中,emit(String streamId, List<Object> tuple)是用于处理元组的可靠方法。但是,在IRichBolt中,它不是一个可靠的方法。
在使用IRichBolt是,如果你想可靠的处理元组,你应该显式地调用
emit(String streamId, Tuple anchor, List<Object> tuple)
性能优化
选型
- 按照性能来说, trident < transaction < 使用ack机制普通接口 < 关掉ack机制的普通接口, 因此,首先要权衡一下应该选用什么方式来完成任务
- 如果“使用ack机制普通接口”时, 可以尝试关掉ack机制,查看性能如何,如果性能有大幅提升,则预示着瓶颈不在spout, 有可能是Acker的并发少了,或者业务处理逻辑慢了
增加并发
- 可以简单增加并发,查看是否能够增加处理能力
让task分配更均匀
- 当使用fieldGrouping方式时,有可能造成有的task任务重,有的task任务轻,因此让整个数据流变慢, 尽量让task之间压力均匀。
使用MetaQ或Kafka时
- 对于MetaQ和Kafka, 一个分区只能一个线程消费,因此有可能简单的增加并发无法解决问题, 可以尝试增加MetaQ和Kafka的分区数。
调度定制化接口(0.9.5 及高版本)
设置每个worker的默认内存大小
ConfigExtension.setMemSizePerWorker(Map conf, long memSize)
设置每个worker的cgroup,cpu权重
ConfigExtension.setCpuSlotNumPerWorker(Map conf, int slotNum)
设置是否使用旧的分配方式
ConfigExtension.setUseOldAssignment(Map conf, boolean useOld)
设置强制某个component的task 运行在不同的节点上
ConfigExtension.setTaskOnDifferentNode(Map componentConf, boolean isIsolate)
注意,这个配置componentConf是component的配置, 需要执行addConfigurations 加入到spout或bolt的configuration当中
自定义worker分配
WorkerAssignment worker = new WorkerAssignment();
worker.addComponent(String compenentName, Integer num);//在这个worker上增加一个task
worker.setHostName(String hostName);//强制这个worker在某台机器上
worker.setJvm(String jvm);//设置这个worker的jvm参数
worker.setMem(long mem); //设置这个worker的内存大小
worker.setCpu(int slotNum); //设置cpu的权重大小
ConfigExtension.setUserDefineAssignment(Map conf, List<WorkerAssignment> userDefines)
注:每一个worker的参数并不需要被全部设置,worker属性在合法的前提下即使只设置了部分参数也仍会生效
强制topology运行在一些supervisor上
在实际应用中, 常常一些机器部署了本地服务(比如本地DB), 为了提高性能, 让这个topology的所有task强制运行在这些机器上
conf.put(Config.ISOLATION_SCHEDULER_MACHINES, List<String> isolationHosts);
conf 是topology的configuration
调度细则
- 任务调度算法以worker为维度
- 调度过程中正在进行的调度动作不会对已发生的调度动作产生影响
- 调度过程中用户可以自定义useDefined Assignment,和使用已有的old Assignment,这两者的优先级是:useDefined Assignment>old Assignment
- 用户可以设置task.on.differ.node参数,强制要求同组件的task分布到不同supervisor上
默认调度算法
- 以worker为维度,尽量将worker平均分配到各个supervisor上
- 以worker为单位,确认worker与task数目大致的对应关系(注意在这之前已经其他拓扑占用利用的worker不再参与本次动作)
- 建立task-worker关系的优先级依次为:尽量避免同类task在同一work和supervisor下的情况,尽量保证task在worker和supervisor基准上平均分配,尽量保证有直接信息流传输的task在同一worker下
jstorm状态管理
- 防止ACK重试的影响,比如数据库加事务id字段,有就不重复处理,乐观锁处理版本号
线上配置
- 公司线上使用netty作为底层通讯
storm.messaging.transport: com.alibaba.jstorm.message.netty.NettyContext
storm.messaging.netty.authentication: false
- 也是用了压背, 都是默认的不一一列举其他参数
topology.backpressure.water.mark.high: 0.8
- drpc也用了默认的
-
我们看下默认配置下,设置一个dataSpout,一个scmSpout,三个analysis,三个dataInsert的Task分布,三个analysis和三个dataInsert都是分别在两个台机器上,有一个机器是在同一work上面分配了两个task,这里设置了work数2台
DRPC和Netty
- DRPC(Distributed Remote Procedure Call)是jstorm中的一种通信协议,用于实现分布式计算任务的调度和通信。它通过客户端将计算任务提交给DRPC服务端,并通过网络传输将计算结果返回给客户端。DRPC在jstorm中起到了分布式任务调度和通信的重要作用
- Jstorm使用netty作为其底层通信框架。Netty是一个开源的高性能网络编程框架,它可以帮助开发者快速构建高性能、高可靠性的网络应用程序。Jstorm利用netty提供的高性能网络传输能力,实现了分布式计算任务的快速、可靠的通信
- 在jstorm中,drpc和netty的结合使得分布式实时计算任务可以快速、可靠地进行调度和通信,保证了数据的准确性和一致性。同时,它们还能够提供高性能的网络传输能力,使jstorm可以处理大规模的实时数据,并实现快速的数据处理和计算
- 在DRPC中,Netty被用作底层网络通信框架,用于处理DRPC请求和响应的传输
- 要设置JStorm使用DRPC,首先需要在JStorm集群上部署DRPC服务端。在DRPC服务端上,需要配置DRPC拓扑和函数。在JStorm集群上的拓扑中调用DRPC函数
实际使用一些抽象的工具类
滑动窗口
- 自己实现的滑动窗口,比如存储最近五分钟数据,滑动取三分钟窗口,这种需求还是比较常见的,flink有自己的实现,Jstorm这块支持的不是太好,故自己实现
- 第一种写法,main方法可以测试
public class WindowContainer {
private static final Map<String, String> markMap = Maps.newConcurrentMap();
public static <T> TimeWindow<T> getTimeWindow(Map map, String key, boolean isTime, int interval, TimeUnit unit) {
TimeWindow<T> window;
if (map.containsKey(key)) {
window = (TimeWindow<T>) map.get(key);
} else {
window = makeWindow(key, isTime, interval, unit);
map.put(key, window);
}
return window;
}
public static <T> TimeWindow<T> getResilienceTimeWindow(Map map, String key, boolean isTime, int interval, TimeUnit unit) {
TimeWindow<T> window;
if (map.containsKey(key)) {
window = (TimeWindow<T>) map.get(key);
String preKey = markMap.get(key);
// 窗口变化,重新平衡
if (!markKey(unit, interval).equals(preKey)) {
window.rebalance(unit, interval);
}
} else {
window = makeWindow(key, isTime, interval, unit);
map.put(key, window);
markMap.put(key, markKey(unit, interval));
}
return window;
}
private static <T> TimeWindow<T> makeWindow(String key, boolean isTime, int interval, TimeUnit unit) {
return isTime new TimeWindow<T>(key, interval, unit) : new TimeWindow<T>(key, interval);
}
private static String markKey(TimeUnit unit, int interval) {
return unit == null String.valueOf(interval) : unit + "-" + interval;
}
private WindowContainer() {
throw new UnsupportedOperationException();
}
public static Map<String, String> getMarkMap() {
return markMap;
}
public static void main(String[] args) {
Map<String, Object> map = new HashMap<>();
String key = "123";
TimeWindow<String> timeWindow = WindowContainer.getTimeWindow(map, key, true, 1000, TimeUnit.SECONDS);
int time = (int) (new DateTime().minusSeconds(1000).getMillis() / 1000);
timeWindow.addCount(time + 5, "test");
List<String> res = timeWindow.getDatas(2, time + 10, true);
System.out.println("Sfdsdff");
}
}
public class TimeWindow<T> {
private final String key;
private int capacity;
private int duration;
private boolean hasTime;
private boolean hasCapacity;
private final Queue<Point<T>> queue = new PriorityBlockingQueue<>();
private volatile int earliestTime = Integer.MAX_VALUE;
private String component;
public TimeWindow(String key, int capacity) {
this.key = key;
this.capacity = capacity;
this.hasCapacity = true;
}
public TimeWindow(String key, int duration, TimeUnit unit) {
this.key = key;
this.duration = (int) unit.toSeconds(duration);
this.hasTime = true;
}
public TimeWindow(String key, int capacity, int duration, TimeUnit unit) {
this.key = key;
this.duration = (int) unit.toSeconds(duration);
this.capacity = capacity;
this.hasCapacity = true;
this.hasTime = true;
}
/**
* 添加元素
* 过期数据丢弃:
* 1. 数据驱动:若队列满且数据时间过期直接丢弃不入队。
* 2. 时间驱动:若数据时间过期直接丢弃不入队。
* 出队过期元素:
* 1. 数据驱动:队列满尝试出队时间小于当前数据的数据
* 2. 时间驱动:每次添加都尝试出队过期数据(时间窗口外的数据)
*
*/
public void addCount(int time, T value) {
// 按队列大小过滤
if (hasCapacity) {
// 队列满 且新进来的数据时间小于最小时间则丢弃
if (queue.size() >= capacity && expired(time)) {
return;
}
while (true) {
// 队列满出队小于当前时间数据
if (queue.size() < capacity || !pollPeekIfExpired(time)) {
break;
}
}
}
// 按时间维度过滤
if (hasTime) {
// 直接丢弃过期数据
int expiredTime = (int) (new DateTime().minusSeconds(duration).getMillis() / 1000);
if (expired(time)) {
return;
}
while (true) {
// 队列满出队不在窗口中的数据
if (queue.isEmpty() || !pollPeekIfExpired(expiredTime)) {
break;
}
}
}
queue.offer(new Point<>(time, value));
if (time < earliestTime) {
earliestTime = time;
}
}
/**
* 数据时间是否过期
* 1. 数据驱动: 查看时间是否比最早的时间还小
* 2. 时间驱动: 查看时间是否早于窗口
*/
public boolean expired(int time) {
int expired = hasCapacity earliestTime : (int) (new DateTime().minusSeconds(duration).getMillis() / 1000);
return expired > time;
}
public List<Point<T>> all() {
List<Point<T>> points = Lists.newLinkedList();
for (Point<T> aQueue : queue) {
points.add(aQueue);
}
Collections.sort(points);
return points;
}
public Point<T> head() {
return queue.peek();
}
public int size() {
return queue.size();
}
public int count(int from, int to, boolean edge) {
return find(from, to, edge).size();
}
public List<T> getDatas(int from, int to, boolean edge) {
List<Point<T>> points = find(from, to, edge);
List<T> results = new ArrayList<>(points.size());
for (Point<T> point : points) {
results.add(point.getValue());
}
return results;
}
public List<T> getDatas(int from, int to, boolean leftEdge, boolean rightEdge) {
List<Point<T>> points = find(from, to, true);
List<T> results = Lists.newLinkedList();
for (Point<T> point : points) {
results.add(point.getValue());
}
if (CollectionUtils.isNotEmpty(points)) {
if (!leftEdge) {
while (!results.isEmpty() && points.get(0).getTime() == from) {
results.remove(0);
points.remove(0);
}
}
if (!rightEdge) {
while (!results.isEmpty() && points.get(points.size() - 1).getTime() == to) {
results.remove(points.size() - 1);
points.remove(points.size() - 1);
}
}
}
return results;
}
public List<T> getDatas() {
List<T> results = Lists.newArrayList();
for (Point<T> p : all()) {
results.add(p.getValue());
}
return results;
}
/**
* 淘汰过期的队列头结点
* CAS 操作防止高并发问题
*/
public synchronized boolean pollPeekIfExpired(int expiredTime) {
if (queue.isEmpty()) {
return false;
}
Point<T> point = queue.peek();
if (point == null) {
return false;
}
if (point.getTime() <= expiredTime) {
queue.poll();
return true;
}
return false;
}
public List<Point<T>> find(int from, int to, boolean edge) {
List<Point<T>> points = all();
boolean leftFlag = false;
boolean rightFlag = false;
int left = 0;
int right = points.size() - 1;
while (left <= right) {
if (!leftFlag) {
int time = points.get(left).getTime();
if ((time > from) || (time == from && edge)) {
leftFlag = true;
} else {
left++;
}
}
if (!rightFlag) {
int time = points.get(right).getTime();
if ((time < to) || (time == to && edge)) {
rightFlag = true;
} else {
right--;
}
}
if (leftFlag && rightFlag) {
return points.subList(left, right + 1);
}
}
return Lists.newArrayList();
}
/**
* 动态修改窗口大小,无需重新平衡数据,当有新的数据进来时会自动做平衡
*/
public void rebalance(TimeUnit unit, int interval) {
if (unit == null) {
this.capacity = interval;
} else {
this.duration = (int) unit.toSeconds(interval);
}
}
public static class Point<T> implements Comparable<Point> {
private final int time;
private final T value;
public Point(int time, T value) {
this.time = time;
this.value = value;
}
public int getTime() {
return time;
}
public T getValue() {
return value;
}
@Override
public int compareTo(Point o) {
return Integer.compare(this.time, o.getTime());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Point<?> point = (Point<?>) o;
return getTime() == point.getTime();
}
@Override
public int hashCode() {
return getTime();
}
@Override
public String toString() {
return "Point{" +
"time=" + time +
", value=" + value +
'}';
}
}
public void setComponent(String component) {
this.component = component;
}
@Override
public String toString() {
StringBuilder msg = new StringBuilder("[");
for (Point<T> p : queue) {
msg.append(p.toString()).append(",");
}
return msg.append("]").toString();
}
}
- 第二种写法更简洁,但是还要自己加上锁,防止并发,数据结构用的TreeMap
public class TimestampData {
private final TreeMap<Long, Object> data;
public TimestampData() {
data = new TreeMap<>();
}
public void put(long timestamp, Object value) {
data.put(timestamp, value);
// TODO: 2023/7/12 put时判断key是否过期
}
public Object getRange(long fromTimestamp, long toTimestamp) {
return data.subMap(fromTimestamp, true, toTimestamp, true);
}
public void removeOldKey(long to) {
Map<Long, Object> expiredData = data.headMap(to);
List<Long> res = new ArrayList<>(expiredData.keySet());
for (Long key : res) {
data.remove(key);
}
}
public static void main(String[] args) {
TimestampData timestampData = new TimestampData();
timestampData.put(222222, "Value 2");
timestampData.put(333333, "Value 3");
timestampData.put(111111, "Value 1");
timestampData.put(444444, "Value 4");
timestampData.put(555555, "Value 5");
Object rangeData = timestampData.getRange(222221, 444445);
// 取数据时可以把 treemap的value给出去
System.out.println(rangeData);
timestampData.removeOldKey(333334);
Object rangeData1 = timestampData.getRange(11, 11111111111L);
System.out.println(rangeData1);
// TODO: 2023/7/12 并发情况下都要枷锁,也会有可能是不同线程处理,同一个work时
// 然后加聚合key, key不存在创造TimestampData, 存在则获取TimestampData,然后对TimestampData进行操作,put时判断key是否过期
// 一个是时间过期,一个是数据量过期
Map<String, TimestampData> res = new ConcurrentHashMap<>();
}
}
实例
- 需求: 需要统计Table表中以A,B字段聚合之后,再以A字段聚合之后的总数,当时真实需求不是这么简单的写法,这边只是抽出来,具体mysql写法
select A, count(1) as cnt
from (select A, B, count(1) as cnt
from Table
where 五分钟内数据
group by A, B) as C
where 三分钟内数据
group by A;
- Jstorm写时,监听Kafka数据源,可以实时获取Table表中数据,真实应用实际上是CK表Table,原始数据即写Kafka又写入CK
-
图示,Spout首先从Kafka读取数据,然后以fieldGroup(A,B)的方式输出到,Bolt1,Bolt1设置一个滑动窗口接收数据五分钟内数据,聚合Key就是接收Spout的(A,B),滑动窗口统计(A,B)五分钟聚合之后,下一个需要聚合的单个A的key信息,然后以fieldGroup(A)输送到Bolt2,Bolt2再以A为Key设置三分钟滑动窗口,到时间再聚合计算输出到外部
- 实战时,设置了滑动窗口,所以启动时避免空数据,会空跑一分钟等,主要看设置滑动窗口的步长,避免启动时数据不全判断不准
- 输出结果时,会加个一分钟缓存,也是看设置滑动窗口的步长,不让数据重复输出,因为JStorm本质上是实时判断的,来一条判断一条,滑动窗口一直动,一分钟内可能就会重复输出了,毕竟往前动一秒数据,结果可能还是一样,具体看业务
参考文章
- Storm架构原理详解!
- Storm-集群安装案例
- jstorm 数据流分流和合并
- Storm Trident 教程
- Acker工作机制
- Jstorm ACK 机制
- JStorm 任务的动态伸缩
- Jstorm 支持动态更新配置文件
- 开发经验总结
- 运维经验总结
- 和Storm编程方式区别
- 限流控制
- 调度定制化接口(0.9.5 及高版本)
- Grouping 方式
- IBasicBolt vs IRichBolt
- 常见问题
- 性能优化
- JStorm - ACK机制实战
- storm中的可靠性机制
- jstorm的恰好一次事务处理