Storm学习01

分布式实时大数据处理框架Storm学习

Posted by ZBX on February 17, 2020

并发机制

  • Worker 进程
    • 一个Topology拓扑会包含一个或多个Worker(每个Worker进程只能从属于一个特定的Topology)
    • 这些Worker进程会并行跑在集群中不同的服务器上,即一个Topology拓扑其实是由并行运行在Storm集群中多台服务器上的进程所组成
  • Executor – 线程
    • Executor是由Worker进程中生成的一个线程
    • 每个Worker进程中会运行拓扑当中的一个或多个Executor线程
    • 一个Executor线程中可以执行一个或多个Task任务(默认每个Executor只执行一个Task任务),但是这些Task任务都是对应着同一个组件(Spout、Bolt)。
  • Task
    • 实际执行数据处理的最小单元
    • 每个task即为一个Spout或者一个Bolt
  • Task数量在整个Topology生命周期中保持不变,Executor数量可以变化或手动调整
  • (默认情况下,Task数量和Executor是相同的,即每个Executor线程中默认运行一个Task任务)

  • 设置Worker进程数 Config.setNumWorkers(int workers)

  • 设置Executor线程数
    TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint)
    TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)
    

    :其中, parallelism_hint即为executor线程数

  • 设置Task数量 ComponentConfigurationDeclarer.setNumTasks(Number val)

  • 例: ``` Config conf = new Config() ; conf.setNumWorkers(2);

TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout(“spout”, new MySpout(), 1); topologyBuilder.setBolt(“green-bolt”, new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping(“blue-spout);


- Rebalance – 再平衡
即,动态调整Topology拓扑的Worker进程数量、以及Executor线程数量

- 支持两种调整方式:
  1. 通过Storm UI
  2. 通过Storm CLI

- 通过Storm CLI动态调整:
  - 例:`storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10` 将mytopology拓扑worker进程数量调整为5个 
  “ blue-spout ” 所使用的线程数量调整为3个 “ yellow-bolt ”所使用的线程数量调整为10个


## DRPC  (分布式远程过程调用)
- DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。
- DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。
  - (其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)


- DRPC设计目的:
  - 为了充分利用Storm的计算能力实现高密度的并行实时计算。
  - (Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)

- 客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。



## 部署

http://node1:50070/dfshealth.html#tab-overview    namenode information
http://node3:50090/status.html  secondary namenode information

### 一、环境要求
ZooKeeper3.4.5+
storm 0.9.4+

--------------------------------------------------------------------
### 二、单机模式
上传解压

$ tar xf apache-storm-0.9.4.tar.gz $ cd apache-storm-0.9.4

$ storm安装目录下创建log: mkdir logs $ ./bin/storm –help


下面分别启动ZooKeeper、Nimbus、UI、supervisor、logviewer

$ ./bin/storm dev-zookeeper » ./logs/zk.out 2>&1 & $ ./bin/storm nimbus » ./logs/nimbus.out 2>&1 & $ ./bin/storm ui » ./logs/ui.out 2>&1 & $ ./bin/storm supervisor » ./logs/supervisor.out 2>&1 & $ ./bin/storm logviewer » ./logs/logviewer.out 2>&1 &


需要等一会儿

$ jps 6966 Jps 6684 logviewer 6680 dev_zookeeper 6681 nimbus 6682 core 6683 supervisor


http://node01:8080
提交任务到Storm集群当中运行:

$ ./bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.4.jar storm.starter.WordCountTopology wordcount $ ./bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.4.jar storm.starter.WordCountTopology test


-------------------------------------------------------------------------------------------

### 三、完全分布式安装部署
各节点分配:


| \     | Nimbus | Supervisor | Zookeeper |
| ----- | ------ | ---------- | --------- |
| node1 | 1      |            | 1         |
| node2 |        | 1          | 1         |
| node3 |        | 1          | 1         |


node1作为nimbus,开始配置

$ vim conf/storm.yaml storm.zookeeper.servers:

  • “node1”
  • “node2”
  • “node3”

storm.local.dir: “/tmp/storm”

nimbus.host: “node1”

supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703


在storm目录中创建logs目录

$ mkdir logs


集群当中所有服务器,同步所有配置!(分发)

启动ZooKeeper集群

node1上启动Nimbus

$ ./bin/storm nimbus » ./logs/nimbus.out 2>&1 & $ tail -f logs/nimbus.log $ ./bin/storm ui » ./logs/ui.out 2>&1 & $ tail -f logs/ui.log



节点node2和node3启动supervisor,按照配置,每启动一个supervisor就有了4个slots

$ ./bin/storm supervisor » ./logs/supervisor.out 2>&1 & $ tail -f logs/supervisor.log

(当然node1也可以启动supervisor)

http://node1:8080/
提交任务到Storm集群当中运行:

$ ./bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.4.jar storm.starter.WordCountTopology test


环境变量可以配置也可以不配置

export STORM_HOME=/opt/sxt/storm export PATH=$PATH:$STORM_HOME/bin


观察关闭一个supervisor后,nimbus的重新调度   
再次启动一个新的supervisor后,观察,并rebalance


### 集群drpc
---------------------------------------------------
修改

$ vi conf/storm.yaml drpc.servers: - “node06” ```

分发配置storm.yaml文件给其他节点,启动zk ,主节点启动 nimbus,supervisor,drpc ,从启动supervisor。