barretlee / Node-Daily-Practice

每天写个小程序。
https://github.com/barretlee/Node-Daily-Practice/issues
MIT License
30 stars 2 forks source link

Day3 大数据处理 #4

Open barretlee opened 9 years ago

barretlee commented 9 years ago

需求:读取 log 日志,分析 category 为自定义类型的数据,产出报表。

场景:log 日志的接口有两个参数,startTime 和 endTime,分别表示收集日志的起始时间和结束时间,接口每次最多吐出 1w 条数据。

问题:数据至少有 1 亿条,不使用数据库,拿到 category 为 A B C 的每日数量,监控每日增量,增量过大则邮件警报。

barretlee commented 9 years ago

后续思考:

  1. 内存是否扛得住
  2. 如何加快时间收集数据(异步并发)
  3. 如何监控
  4. 出现错误 log ,如何在五分钟之内定位并处理问题(快速警报,log 格式)
barretlee commented 9 years ago
var fs = require('fs');
var path = require('path');
var http = require('http');
var moment = require('moment');
var querystring = require('querystring');

var API = "log.php?xx=xx&";
var start = moment().startOf('day');
var end = start.subtract(1, "days");
var delta = 10;  // 10s
var step = 60;   // 60 * 10,每次爬取十分钟的数据,并发太大,服务器502=。 =
var ret = {};

/**
 * 获取一个时间单位内(10s)的log日志
 * 
 * @param  {[type]} st 开始时间
 */
function getLog(st){
    var st = parseInt(st / 1000);
    var et = st + delta;
    var url = API + querystring.stringify({
        st: st,
        et: et
    });

    return new Promise(function(resolve, reject){
        http.get(url, function(res){
            // debug(url);
            var body = [], len = 0;
            res.on("data", function(chunk){
                body.push(chunk);
                len += chunk.length;
            });
            res.on("end", function(){
                body = Buffer.concat(body, len).toString();
                try{
                    body = JSON.parse(body);
                } catch(e){
                    console.log("估计请求太汹涌,服务器502了。");
                    process.exit(0);
                }

                if(body.msg != "ok"){
                    reject("Request Error: " + url);
                    return;
                }
                formatData(body);
                delete body;
                resolve();
            });
        }).on("error", function(err){
            reject(err);
        });
    });
}

function getLogs(st, count){
    var promises = [], stime;
    for(var i = 0; i < count; i++){
        stime = st + i * delta;
        promises.push(getLog(stime));
    }
    return promises;
}

/**
 * 将数据格式化保存,保存形式为
 *
 * {
 *    type: category,
 *    timeline: [time1, time2, time3, ...]
 * }
 * 
 * @param  {[type]} data 需要格式化的数据
 * @return {[type]}      [description]
 */
function formatData(data){
    var i = 0, len;
    var data = data.data.data;
    for(len = data.length; i < len; i++){
        var item = data[i];
        var type = item['category'];
        var date = item['timestamp'];

        if(type == "SYS" || type == "PV") continue;

        if(ret[type]){
            ret[type]['count'] += 1;
            ret[type]["timeline"].push(date);
        } else {
            debug("新鲜出炉:" + type);
            ret[type] = {};
            ret[type]['count'] = 1;
            ret[type]["timeline"] = [date];
        }
    }
}

function debug(msg){
    console.log('> DEBUG: ' + msg);
}

function go(st, end){
    // debug("我顶!");
    Promise.all(getLogs(st, step)).then(function(data){
        var start = st + step * delta;
        if(start < end) {
            go(start, end);
        }
    }).catch(function(err){
        console.log(err);
    }); 
}

go(start, start + step * delta * 20);
barretlee commented 9 years ago

image 先这么着吧,数据量多了,每个细节都要考虑进去,程序一挂就得重新开始,费时费力。

barretlee commented 9 years ago

难点在于,出了错还能继续正常的运行,并且将出错的请求增补回来,因为数据量比较大,有几个错误请求就不考虑了。。

sabrinaluo commented 8 years ago

应该可以用eventproxy来做retry来实现出错请求的增补