并发机制
- Worker 进程
- 一个Topology拓扑会包含一个或多个Worker(每个Worker进程只能从属于一个特定的Topology)
- 这些Worker进程会并行跑在集群中不同的服务器上,即一个Topology拓扑其实是由并行运行在Storm集群中多台服务器上的进程所组成
- Executor – 线程
- Executor是由Worker进程中生成的一个线程
- 每个Worker进程中会运行拓扑当中的一个或多个Executor线程
- 一个Executor线程中可以执行一个或多个Task任务(默认每个Executor只执行一个Task任务),但是这些Task任务都是对应着同一个组件(Spout、Bolt)。
- Task
- 实际执行数据处理的最小单元
- 每个task即为一个Spout或者一个Bolt
- Task数量在整个Topology生命周期中保持不变,Executor数量可以变化或手动调整
-
(默认情况下,Task数量和Executor是相同的,即每个Executor线程中默认运行一个Task任务)
-
设置Worker进程数
Config.setNumWorkers(int workers)
- 设置Executor线程数
TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint) TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)
:其中, parallelism_hint即为executor线程数
-
设置Task数量
ComponentConfigurationDeclarer.setNumTasks(Number val)
- 例: ``` Config conf = new Config() ; conf.setNumWorkers(2);
TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout(“spout”, new MySpout(), 1); topologyBuilder.setBolt(“green-bolt”, new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping(“blue-spout);
- Rebalance – 再平衡
即,动态调整Topology拓扑的Worker进程数量、以及Executor线程数量
- 支持两种调整方式:
1. 通过Storm UI
2. 通过Storm CLI
- 通过Storm CLI动态调整:
- 例:`storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10` 将mytopology拓扑worker进程数量调整为5个
“ blue-spout ” 所使用的线程数量调整为3个 “ yellow-bolt ”所使用的线程数量调整为10个
## DRPC (分布式远程过程调用)
- DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。
- DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。
- (其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)
- DRPC设计目的:
- 为了充分利用Storm的计算能力实现高密度的并行实时计算。
- (Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)
- 客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。
## 部署
http://node1:50070/dfshealth.html#tab-overview namenode information
http://node3:50090/status.html secondary namenode information
### 一、环境要求
ZooKeeper3.4.5+
storm 0.9.4+
--------------------------------------------------------------------
### 二、单机模式
上传解压
$ tar xf apache-storm-0.9.4.tar.gz $ cd apache-storm-0.9.4
$ storm安装目录下创建log: mkdir logs $ ./bin/storm –help
下面分别启动ZooKeeper、Nimbus、UI、supervisor、logviewer
$ ./bin/storm dev-zookeeper » ./logs/zk.out 2>&1 & $ ./bin/storm nimbus » ./logs/nimbus.out 2>&1 & $ ./bin/storm ui » ./logs/ui.out 2>&1 & $ ./bin/storm supervisor » ./logs/supervisor.out 2>&1 & $ ./bin/storm logviewer » ./logs/logviewer.out 2>&1 &
需要等一会儿
$ jps 6966 Jps 6684 logviewer 6680 dev_zookeeper 6681 nimbus 6682 core 6683 supervisor
http://node01:8080
提交任务到Storm集群当中运行:
$ ./bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.4.jar storm.starter.WordCountTopology wordcount $ ./bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.4.jar storm.starter.WordCountTopology test
-------------------------------------------------------------------------------------------
### 三、完全分布式安装部署
各节点分配:
| \ | Nimbus | Supervisor | Zookeeper |
| ----- | ------ | ---------- | --------- |
| node1 | 1 | | 1 |
| node2 | | 1 | 1 |
| node3 | | 1 | 1 |
node1作为nimbus,开始配置
$ vim conf/storm.yaml storm.zookeeper.servers:
- “node1”
- “node2”
- “node3”
storm.local.dir: “/tmp/storm”
nimbus.host: “node1”
supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
在storm目录中创建logs目录
$ mkdir logs
集群当中所有服务器,同步所有配置!(分发)
启动ZooKeeper集群
node1上启动Nimbus
$ ./bin/storm nimbus » ./logs/nimbus.out 2>&1 & $ tail -f logs/nimbus.log $ ./bin/storm ui » ./logs/ui.out 2>&1 & $ tail -f logs/ui.log
节点node2和node3启动supervisor,按照配置,每启动一个supervisor就有了4个slots
$ ./bin/storm supervisor » ./logs/supervisor.out 2>&1 & $ tail -f logs/supervisor.log
(当然node1也可以启动supervisor)
http://node1:8080/
提交任务到Storm集群当中运行:
$ ./bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.4.jar storm.starter.WordCountTopology test
环境变量可以配置也可以不配置
export STORM_HOME=/opt/sxt/storm export PATH=$PATH:$STORM_HOME/bin
观察关闭一个supervisor后,nimbus的重新调度
再次启动一个新的supervisor后,观察,并rebalance
### 集群drpc
---------------------------------------------------
修改
$ vi conf/storm.yaml drpc.servers: - “node06” ```
分发配置storm.yaml文件给其他节点,启动zk ,主节点启动 nimbus,supervisor,drpc ,从启动supervisor。