`

实时分析数据分析平台——storm

阅读更多

 storm集群和hadoop集群类似,在hadoop上运行mapreduce任务,而在storm上称为topologies任务,两种任务之间有区别,典型的一点是mapreduce任务最后会结束,而topologies不会结束。

 

一个storm集群包含两种节点,master节点和worker节点,其中master节点运行一个daemon进程("Nimbus"),在hadoop中称为JobTracker,Nimbus负责向集群分发代码,分配任务,处理错误。

每个worker节点运行一个daemon进程("Supervisor"),它负责监听分配给自己的任务并开启工作进程处理。nimbus和supervisors之间通过zookeeper协调。

 

stream是Storm里抽象的核心概念,stream是一组没有边界的数组(tuples)

spout是stream的来源(可能有多个spout源头),比如spout从Kestrel队列中读取数据发送成一个流;bolt负责处理输入流,发送新的数据流到别的bolt,Bolts能通过运行函数做很多事情,过滤tuples,聚集和拼装数据流,和操作数据库等。spout和bolt的网络结构也会被打包到"topology"中,数据流会按照网络结构流下去。_一个topology会永远运行知道我们kill它,storm会重分配失败的任务,此外它保证不因为机器挂掉丢失正在处理的数据



下面的代码统计各个数字0~100之间的计数结果

package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.FeederSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

/**
 * This is a basic example of a Storm topology.
 */
public class CounterTopology {
    
    public static class CounterBolt extends BaseRichBolt {
		OutputCollector _collector;
		HashMap<Integer,Integer> hm;

        @Override
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
            hm = new HashMap<Integer,Integer>();
        }

        @Override
        public void execute(Tuple tuple) {
        	
        	int id = tuple.getInteger(0);
        	int count = tuple.getInteger(1);
        	
        	Object oldcount = hm.get(id);
        	if(oldcount != null){
        		count += (Integer)oldcount;
        	}
        	hm.put(id, count);
            _collector.emit(tuple, new Values(id,count));
            _collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id","count"));
        }
    }
    
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        
        FeederSpout cs = new FeederSpout(new Fields("id","count"));
        builder.setSpout("countspout", cs, 10);        
        builder.setBolt("countbolt1", new CounterBolt(), 3)
                .shuffleGrouping("countspout");
//        builder.setBolt("countbolt2", new CounterBolt(), 2)
//                .shuffleGrouping("countbolt1");
                
        Config conf = new Config();
        conf.setDebug(true);
        
        if(args!=null && args.length > 0) {
            conf.setNumWorkers(3);
            
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
        
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("counter", conf, builder.createTopology());
//
//            Utils.sleep(10000);
//            cluster.killTopology("counter");
//            cluster.shutdown();    
        }
        
        
        while(true){
        	Utils.sleep(100);
        	
        	Random rand = new Random();
        	int id = rand.nextInt(100);
        	int count = rand.nextInt(100);
        
        	cs.feed(new Values(id,count));
        }
    }
}

 运行方式如下:

javac -cp storm-0.8.2.jar storm/starter/CounterTopology.java
jar cvf CT.jar storm
bin/storm jar CT.jar storm.starter.CounterTopology

 

生产环境使用storm

1,定义一个topology任务(java语言使用TopologyBuilder)

2,使用StormSubmitter提交任务到集群,三个参数分别是(任务名称,配置项,任务)

Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);

3,创建代码jar文件以及提交到集群

storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3

针对topology的常用配置如下:

Config.TOPOLOGY_WORKERS:集群中处理任务的进程数,如果配置为25,并发数设置为150,则每个进程会开启6个线程处理任务
Config.TOPOLOGY_ACKERS:用来检测任务是否正确处理的进程数目
Config.TOPOLOGY_MAX_SPOUT_PENDING:配置spout上最多可以保存为未处理和未失败的任务数,建议设置以免queue爆满
Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS:默认30s
Config.TOPOLOGY_SERIALIZATIONS:注册自定义serilaizer

终止topology: storm kill {stormname}

更新运行中的topology:暂时只能杀掉重启,计划设计storm swap命令实现此功能

监控topology:使用storm ui

 

bolt的格式如下:

public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollectorBase _collector;

    @Override
	//prepare函数里面也可以通过collector发送数据流
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector;
    }

    @Override
	//处理数据,发送数据流,记着ack保证数据不丢失
    public void execute(Tuple input) {
        int val = input.getInteger(0);        
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
	//需要声明输出数据流的格式
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    
}

参照网页

https://github.com/nathanmarz/storm/wiki/Tutorial

https://github.com/nathanmarz/storm-starter

http://xumingming.sinaapp.com/category/storm/

http://www.oschina.net/p/twitter-storm

Trident用来做实时分析不错

https://github.com/nathanmarz/storm/wiki/Trident-tutorial

http://www.ibm.com/developerworks/cn/opensource/os-twitterstorm/index.html

  • 大小: 22.6 KB
分享到:
评论

相关推荐

    使用Storm实现实时大数据分析!

    简单和明了,Storm让大数据分析变得轻松加愉快。当今世界,公司的日常运营经常会生成TB级别的数据。数据来源囊括了互联网装置可以捕获的任何类型数据,网站、社交媒体、交易型商业数据以及其它商业环境中创建的数据...

    数据实时分析平台Heron.zip

    实时流系统是在大规模数据分析的基础上实现系统性的分析。另外,它还需要:每分钟处理数十亿事件的能力、有秒级延迟,和行为可预见;在故障时保证数据的准确性,在达到流量峰值时是弹性的,并且易于调试和在共享的...

    开源力量——数据挖掘原理与实战

    要点 数据分析流程、方法论(PEST、5W2H、逻辑树)、基础数据分析方法、数据分析师能力层级、数据的度量、探索、抽样、原理及实际操作,结合SPSS工具使用 第2周 数据挖掘基础 要点(数据挖掘概念、流程、重要环节、...

    安全数据科学分享.pdf

    • 如何入门和学习安全数据分析 ..... 如何学习安全数据分析? 1、先学习基本的算法原理,补充数学知识——Coursera 上的机器学习课程 2、学习Python的几个机器学习工具——pandas,numpy, seaborn,sklearn 3、去Kaggle...

    storm流数据处理开发应用实战(linux实验环境,storm搭建完毕后的开发)

    linux实验环境,storm搭建完毕后的开发。eclipse开发环境,大数据界hello world——wordcount详解,bolt、分组机制、storm DRPC详解

    大数据概述——精选推荐.pdf

    ⼤数据产业包括IT基础设施层、数据源层、数据管理层、数据分析层、数据平台层和数据应⽤层。 ⼋,⼤数据与云计算、物联⽹: ⼀)云计算: 1)云计算概念:云计算实现了通过⽹络提供可伸缩的、廉价的分布式计算能⼒...

    大数据期末知识点总结.pdf

    常驻空间,时效性,有序性,数据量,数据速率,是否可重现,移动对象,数据精确度 Storm:任务拓扑=有向⽆环图(Spout、Bolt)Spout读取数据(元组)——》Blot。节点:Nimbus Supervisor。特征:编程模型简单 ...

    大数据、数据挖掘与智慧运营.pptx

    LOGO M.94275.CN 1 大数据、数据挖掘与智慧运营综述 1.7 现有数据挖掘的主要分析软件与系统 1.7.1 Hadoop 01 1.7.2 Storm 02 1.7.5 SAS 05 1.7.4 SPASS(SPSS) 04 1.7.3 Spark 03 大数据、数据挖掘与智慧...

    Hadoop基础培训教程.pdf

    数据分析与报表 预测 数据挖掘与BI 机器学习与Google大 脑 起源与目标 大数据与Hadoop 应用模式 大数据技术IT人员的挑战——DevOps DevOps Development和Operations的 组合,是一组过程、方法与 系统的统称,用于...

    JAVA上百实例源码以及开源项目源代码

     Java数据压缩与传输实例,可以学习一下实例化套按字、得到文件输入流、压缩输入流、文件输出流、实例化缓冲区、写入数据到文件、关闭输入流、关闭套接字关闭输出流、输出错误信息等Java编程小技巧。 Java数组倒置...

    JAVA上百实例源码以及开源项目

    此时此景,笔者只专注Android、Iphone等移动平台开发,看着这些源码心中有万分感慨,写此文章纪念那时那景! Java 源码包 Applet钢琴模拟程序java源码 2个目标文件,提供基本的音乐编辑功能。编辑音乐软件的朋友,这...

Global site tag (gtag.js) - Google Analytics