heyrogs / BigData

bigdata learn source by self
0 stars 0 forks source link

JStorm初步认识 #5

Open heyrogs opened 6 years ago

heyrogs commented 6 years ago

1.0.基本概念 首先,JStorm有点类似于Hadoop的MR(Map-Reduce),但是区别在于,hadoop的MR,提交到hadoop的MR job,执行完就结束了,进程就退出了,而一个JStorm任务(JStorm中称为topology),是7*24小时永远在运行的,除非用户主动kill。

1.1.组件: image

1.1.1.spout 这个数据源可以是任意的,比如说kafka,DB,HBase,甚至是HDFS等,JStorm从这个数据源中不断地读取数据,然后发送到下游的bolt中进行处理。

1.1.2.bolt bolt代表处理逻辑,bolt收到消息后,对消息做处理(业务逻辑),处理完后,可以将消息发送给下游的bolt,这样可以形成一个流水线,也可以直接结束。

一般流水线的最后一个bolt会做一些存储操作,比如存储在数据库,hbase,hdfs等,以提供给前端显示。

image

hadoop组件作用: JobTracker:当有任务提交到集群的时候,负责job的运行。 TaskTrack : 负责某一个map或reduce任务。 image

1.2. 优点 在Storm和JStorm出现以前,市面上出现很多实时计算引擎,但自Storm和JStorm出现后,基本上可以说一统江湖: 究其优点:

开发非常迅速:接口简单,容易上手,只要遵守Topology、Spout和Bolt的编程规范即可开发出一个扩展性极好的应用,底层RPC、Worker之间冗余,数据分流之类的动作完全不用考虑 扩展性极好:当一级处理单元速度,直接配置一下并发数,即可线性扩展性能 健壮强:当Worker失效或机器出现故障时, 自动分配新的Worker替换失效Worker 数据准确性:可以采用Ack机制,保证数据不丢失。 如果对精度有更多一步要求,采用事务机制,保证数据准确。

heyrogs commented 6 years ago

组件的接口: jstorm对spout组件定义了一个接口:nextTuple,也就是获取下一条信息。 也就是说,项目运行的时候,这个接口会不停的向数据库获取数据,并且将数据发送给bolt。

bolt组件定义的接口:execute,这个接口是用来处理用户的业务逻辑地方。

每一个topology,既可以有多个spout,也可以有多个bolt,代表同时从多个数据源接收信息,也可以多个bolt执行不同的业务逻辑。

heyrogs commented 6 years ago

配置环境遇到的坑: Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00007ff55c5ea000, 4096, 0) failed; error='Cannot allocate memory' (errno=12)

原因:虚拟机没法分配内存了

解决办法: free -m 查看内存分配情况,如果内存分配好有很多,则是检测内存剩余过程出现问题,设置个全局变量跳过检测过程,直接分配即可: 编辑 /etc/sysctl.conf,修改参数 vm.overcommit_memory = 1,重启服务器或者用户重新登录

heyrogs commented 6 years ago

基本命令:

a).启动jstorm命令: nohup jstorm nimbus & nohup jstorm supervisor &

b).将项目绑定到jstorm集群上: jstorm jar xxxx-.jar com.main.class executeParam 例如: jstorm jar jstorm-demo-1.0-SNAPSHOT.jar com.jiang.jstorm.TestTopology hi

jstore UI: 将项目解压到ROOT上: ln -s jstorm-ui-xx ROOT 例如: ln -s jstorm-ui-2.0.4 ROOT

heyrogs commented 6 years ago

Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.

原因:jdk1.8问题 jstorm-core-2.1.0.jar 中有default.yaml文件,-XX:MaxTenuringThreshold=20 修改为15 就可以了!

heyrogs commented 6 years ago

记录一次jstorm集成kafka的坑: String [] bootStrapServers = stormConf.get("kafka.zk.broker.list").toString().split("\,"); GlobalPartitionInformation partitions = new GlobalPartitionInformation(); int brokerLen = bootStrapServers.length; for (int i=0; i<brokerLen; i++){ String brokerHosts[] = bootStrapServers[i].split("\:"); partitions.addPartition(i, new Broker(brokerHosts[0], JStormUtils.parseInt(brokerHosts[1]))); } BrokerHosts kafkaZKHosts = new StaticHosts(partitions);

这里使用静态方法创建BrokerHosts,事实上使用--bootstrap-server的方式来链接zk,但是这里没有明确制定zk,所以会抛出找不到topic和partition的一异常。

这里是个坑, 最好不要使用静态方法构建这个BrokerHosts