Supervisor结构:
单点结构如图:
1. 初始化时,启动进程Supervisor,根据 Nimbus分配的任务情况触发启动/停用Worker Jvm进程!
2. 每个Worker进程启动一个 或 多个 Task线程,且 task必须 同属 一个topology应用!即Worker必须服务于单个Topology;
3. 整体分析:Supervisor节点运行多个JVM线程(非一个节点),包括一个Supervisor进程和 一个(一个端口)或 多个Worker进程。
4. Task通过hb直接将 时间信息、当前Task统计信息写入zookeeper;
5. Worker定期将包括TopologyId、端口、TaskId集合 以及 当前时间写入本地;
6. supervisor 定期将包括时间 及 节点资源(端口集合) 写入到zookeeper,同时从zk中读取任务调度结果,根据结果启动/停用worker进程。
原理:Supervisor 与zk进行通信,通过zk的watch机制” 轮询,感知是否有新的任务需要认领,哪些任务需要被重新分配。
Worker结构:
1. Worker JVM进程内部,线程相互独立,同时 也会 共享数据收发和节点之间连接管理等Worker进程内的公共资源。
线程连接:
-->VirtualPort:数据接收线程;
-->keyoTupleSerialize : Tuple数据序列化;
-->TransferQueue:数据发送管道;
-->DraineRunnable:数据发送线程;
-->RefreshConnections:节点之间连接管理线程。
Supervisor:初始化:
1、清理本地临时目录下数据strom-local-dir/supervisor/tmp;
2、创建zk操作实例;
3、本地新建状态文件,/local-dir/supervisor/localstate;
4、生成supervisor-id并写入localstate,其中key=”supervisor-id”;如果supervisor重启,先检查supervisor-id是否已经存在,若存在直接读取即可!
5、初始化并启动heartbeat线程;
6、初始化并启动SyncProcessEvent线程;
7. 注册主进程退出数据清理Hook in SupervisorManger;
启动流程:
一:校验集群启动模式;
二: 创建supervisor本地目录;
三:
sync-processes方法:用于管理Workers;
1. 校验Worker状态:从本地的LocalState读取Worker的心跳信息来判断Worker状态!关闭状态无效(非valid)的Worker。重新分配端口,且创建新的Worker;
步骤:
-->读取 当前 所有的worker的状态,从LocalState中读出每个worker的心跳:不被允许、没有心跳、超时的Worker的worker-state均为无效!
-->关闭所有 状态不是有效的Worker!
-->为新的worker创建目录,添加到LocalState的LS-APPROVED-WORKERS中。
-->启动新的worker,等待worker的启动!
mk-synchronize-supervisor方法:
当assignMent发生变化,从Nimb
Executor心跳:
功能:Worker心跳信息保存到本地文件,Executor心跳保存到zookeeper中。
参考资料: ;