CrawlScript / WebCollector

WebCollector is an open source web crawler framework based on Java.It provides some simple interfaces for crawling the Web,you can setup a multi-threaded web crawler in less than 5 minutes.
https://github.com/CrawlScript/WebCollector
GNU General Public License v3.0
3.07k stars 1.45k forks source link

爬取大数据量时,DB报错 #17

Open ostarsier opened 8 years ago

ostarsier commented 8 years ago

这是我分析日志发现的,我的业务逻辑是在第一层里返回了所有页的Link java.lang.IllegalStateException: Can't call Database.put Database was closed. at com.sleepycat.je.Database.checkOpen(Database.java:1863) at com.sleepycat.je.Database.put(Database.java:1168) at com.itsecu.crawler.collector.webcollector.fetcher.SegmentWriter.wrtieLinks(SegmentWriter.java:89) at com.itsecu.crawler.collector.webcollector.fetcher.Fetcher$FetcherThread.run(Fetcher.java:345)

hujunxianligong commented 8 years ago

麻烦发一下版本号和代码,谢谢

ostarsier commented 8 years ago

怎么查看版本号?我用的应该是2.X public class FetchNetease extends DeepCrawler {

private ProducerServer producerServer = new ProducerServer();
private static Logger logger = Logger.getLogger(FetchNetease.class);
public Map<String, NewsBean> newsmap = new HashMap<String, NewsBean>();
private List<SysSiteAccount> salist = new ArrayList<SysSiteAccount>();
public List<DicHarmfulKeyword> wordlist = new ArrayList<DicHarmfulKeyword>();
private TextClassifier tc = new TextClassifier();
private GetKeywords gk = null;
/**
 * 网易每页评论数
 */
private static final int numPerPage = 30;

private Map<String, String> appraiseMap = new HashMap<String, String>() {
    private static final long serialVersionUID = -2165917227030394220L;
    {
        put("positive", "1");
        put("negative", "-1");
        put("unsure", "0");
    }
};

private Map<String, String> commentFieldsMap = new HashMap<String, String>() {
    private static final long serialVersionUID = -2165917227030394220L;
    {
        put("f", "area");
        put("b", "content");
        put("n", "nick");
        put("t", "time");
        put("ip", "ip");
        put("d", "mid");
        put("userId", "uid");
    }
};

public FetchNetease(String crawlPath) {
    super(crawlPath);
    try {
        initAppraise();
        this.salist = CrawUtils.fetchSysSiteAccountList();
        this.wordlist = CrawUtils.fetchDicHarmfulKeywordList();
    } catch (Exception e) {
        logger.error(e);
    }
}

int pageNum = 1;

/**
 * <pre>
 * 处理一页评论的逻辑(通用)
 * 1、
 */
@Override
public Links visitAndGetNextLinks(Page page) {
    pageNum = fetchPageNum(page.getUrl());
    String commentId = fetchCommentId(page.getUrl());
    if (!commentHasBeenUpdated(page.getHtml(), commentId)) {
        logger.info("评论没有更新" + page.getUrl());
        return null;
    }
    NewsBean news = buildNewsBean(commentId, page);

    JSONArray cmtList = news.getCmntlist();
    if (cmtList != null && cmtList.length() <= 0) {
        logger.info(MessageFormat.format("这页没有评论{0}", page.getUrl()));
        return null;
    }
    Map<String, String> analyzedCmt = analyzeCmts(commentId, news, cmtList);
    if (analyzedCmt != null && analyzedCmt.size() > 0) {
        ESHandlerUtil.batchCreateIndex(analyzedCmt,
                Site.Common.getNewsCommentTable());
        logger.info(MessageFormat.format("评论处理完毕,共{0}条,{1}",
                analyzedCmt.size(), page.getUrl()));
        updateNews(news);
        updateCmt(news);
    }
    return fetchLink(news, page);
}

/**
 * 返回下一页,或多页,或结束
 * 
 * @param news
 * @param page
 * @return
 */
private Links fetchLink(NewsBean news, Page page) {
    // @see touchLastCmn
    if (!news.isTag()) {
        return null;
    }
    String nextPage = fetchUrlParam(page.getUrl());
    // 分页爬的时候,不用再下一页了
    if ("no".equals(nextPage)) {
        return null;
    }
    String commentId = news.getCommentid();
    String[] commentids = commentId.split(":");
    Links links = new Links();
    if (pageNum == 1) {
        int total = Integer.valueOf(news.getTotal());
        int ctnum = fetchCmtNum(commentId);
        if ((total - ctnum) > 2000) {
            multiPageCrawler(commentids, links, total, ctnum);
        }
    }
    // 分页爬取
    if (links != null && links.size() > 0) {
        return links;
    }
    // 下一页
    String commentUrl = MessageFormat.format(Site.NETEASE.getCommentUrl(),
            commentids[0], commentids[1], commentids[2], news.getN() + 1);
    links.add(commentUrl);
    return links;
}

/**
 * <pre>
 * 大数据,分页爬取
 * 网易每页30条
 * @param news
 * @param links
 */
private void multiPageCrawler(String[] commentids, Links links, int total,
        int ctnum) {
    int pagesum = (total - ctnum) % numPerPage == 0 ? (total - ctnum)
            / numPerPage : (total - ctnum) / numPerPage + 1;
    logger.info("共" + pagesum + "页");
    for (int i = 2; i <= pagesum; i++) {
        String commentUrl = MessageFormat.format(
                Site.NETEASE.getCommentUrl() + "?nextPage=no",
                commentids[0], commentids[1], commentids[2], i);
        links.add(commentUrl);
    }
}

/**
 * <pre>
 * url末尾添加的一个自定义参数?nextPage=no
 * 表示处理完后,不用自动爬取下一页
 * 用于大数据量时,添加了多页种子的情况
 * 没有就返回yes
 * @param url
 * @return
 */
private String fetchUrlParam(String url) {
    int index = url.indexOf("&nextPage");
    if (index != -1) {
        return url.substring(index + 10);
    }
    return "yes";
}

// public static void main(String[] args) {
// String host = "news";
// String boardId = "news3_bbs";
// String threadId = "B8REFQHN00014Q4P";
// String commentId = host + ":" + boardId + ":" + threadId;
// String[] commentids = commentId.split(":");
// String commentUrl = MessageFormat.format(
// Site.NETEASE.getPageCommentUrl(), commentids[0],
// commentids[1], commentids[2], 1);
// System.err.println(commentUrl);

// String commentId =
// fetchCommentId("http://comment.news.163.com/cache/newlist/news3_bbs/B8REFQHN00014Q4P_2.html");
// System.err.println(commentId);

// String flag =
// fetchNextPage("http://comment.news.163.com/cache/newlist/news3_bbs/B8REFQHN00014Q4P_2.html?nextPage=no");
// System.err.println(flag);

// }

/**
 * 分析该新闻所有评论
 * 
 * @param commentId
 * @param newsBean
 * @param cmtList
 * @return 评论分析的结果
 */
private Map<String, String> analyzeCmts(String commentId,
        NewsBean newsBean, JSONArray cmtList) {
    if (cmtList == null) {
        return null;
    }
    Map<String, String> analyzedCmt = new HashMap<String, String>();
    for (int i = 0; i < cmtList.length(); i++) {
        JSONObject cmt = (JSONObject) cmtList.get(i);
        cmt = fetchLandlord(cmt);
        if (touchLastCmn(newsBean, cmt)) {
            newsBean.setTag(false);
            break;
        }
        if (newsBean.isFirstuid()) {
            newsBean.setC_uid(value2Str(cmt, "userId"));
            newsBean.setC_time(value2Str(cmt, "t"));
            newsBean.setFirstuid(false);
        }
        analyzeCmt(cmt, commentId, analyzedCmt);
    }
    newsBean.setSnumber(newsBean.getSnumber() + cmtList.length());
    return analyzedCmt;
}

/**
 * <pre>
 * 一条评论可能会有跟帖
 * 这里只取楼主,因为只有他才有userId
 * http://comment.news.163.com/news_guoji2_bbs/B97VQG5U0001121M.html
 * @param cmt
 * @return
 */
private JSONObject fetchLandlord(JSONObject cmt) {
    List<Integer> floors = new ArrayList<Integer>();
    JSONArray array = cmt.names();
    for (int i = 0; i < array.length(); i++) {
        int floor = Integer.valueOf(array.get(i).toString());
        floors.add(floor);
    }
    String landLord = String.valueOf(Collections.max(floors));
    return cmt.getJSONObject(String.valueOf(landLord));
}

/**
 * 评论以及所有跟帖
 * 
 * @param cmt
 * @return
 */
private static JSONArray fetchFloors(JSONObject cmt) {
    JSONArray ret = new JSONArray();
    JSONArray array = cmt.names();
    for (int i = 0; i < array.length(); i++) {
        String floor = array.get(i).toString();
        ret.put(cmt.getJSONObject(floor));
    }
    return ret;
}

public static void main(String[] args) {
    // String str =
    // "{\"300\": {\"f\": \"f3\",},\"299\": {\"f\": \"f2\",},\"1\": {\"f\": \"f1\", }}";
    // JSONObject cmt = new JSONObject(str);
    // JSONArray arr = fetchFloors(cmt);
    // for (int i = 0; i < arr.length(); i++) {
    // System.err.println(arr.get(i));
    // }
}

private void updateCmt(NewsBean newsBean) {
    Put put = new Put(newsBean.getCommentid().getBytes());
    put.add("cf".getBytes(), "uid".getBytes(), newsBean.getC_uid()
            .getBytes());
    put.add("cf".getBytes(), "time".getBytes(), newsBean.getC_time()
            .getBytes());
    HBaseUtil.save(put, Site.Common.getHbaseTmpTable());
}

/**
 * 分析一条评论,将分析结果发送给kafka,并更新ES的评论total
 * 
 * @param cmt
 * @param commentId
 * @param analyzedCmt
 */
private void analyzeCmt(JSONObject cmt, String commentId,
        Map<String, String> analyzedCmt) {
    String isappraise = "0";
    String keyword = "";
    String isself = "n";
    String badword = "";
    try {
        isappraise = fetchIsappraise(cmt.get("b").toString());
        keyword = fetchKeyWord(cmt.get("b").toString());
        isself = fetchIsSelf(value2Str(cmt, "userId"));
        badword = fetchBadWord(cmt.get("b").toString(), commentId);
    } catch (Exception e) {
        logger.error(e);
    }
    //测试时注释
    // sendMessage(cmt, commentId, isappraise, keyword, isself);
    analyzedCmt.put(
            Base64Util.getUUID(),
            obj2JsonData(cmt, commentId, isappraise, keyword, isself,
                    badword));
}

private String fetchBadWord(String content, String commentId) {
    String ret = "null";
    StringBuilder badWord = new StringBuilder();
    for (DicHarmfulKeyword word : wordlist) {
        if (content.indexOf(word.getKeyword()) > 0) {
            badWord.append(word.getKeyword()).append(",");
        }
    }
    if (badWord.length() > 1) {
        ret = badWord.substring(0, badWord.length() - 1).toString();
        newsmap.get(commentId).setBadword(
                newsmap.get(commentId).getBadword() + 1);
    }
    return ret;
}

/**
 * 评论表总数
 * 
 * @param commentId
 * @return
 */
private int fetchCmtNum(String commentId) {
    Map<String, String> map = new HashMap<String, String>();
    map.put("nid", newsmap.get(commentId).getNid());
    SearchHit[] hits = ESHandlerUtil.search(map,
            Site.Common.getNewsCommentTable());
    return hits == null ? 0 : hits.length;
}

private String fetchIsSelf(String uid) {
    if (StringUtils.isBlank(uid)) {
        return "n";
    }
    for (SysSiteAccount sa : salist) {
        if (uid.trim().equals(sa.getUid())) {
            return "y";
        }
    }
    return "n";
}

private String fetchKeyWord(String content) throws Exception {
    ArrayList<String> getKeyword1 = gk.GetKeyword1(content, 10);
    StringBuilder kw = new StringBuilder();
    for (String word : getKeyword1) {
        kw.append(word).append(",");
    }
    if (kw.length() > 1) {
        return kw.substring(0, kw.length() - 1);
    }
    return "";
}

/**
 * 根据评论内容分析情感:正负面
 * 
 * @param cmn
 * @return
 */
private String fetchIsappraise(String content) {
    String isappraise = tc.classify(content);
    return appraiseMap.get(isappraise);
}

private void updateNews(NewsBean news) {
    String jsonData = null;
    try {
        XContentBuilder jsonBuild = XContentFactory.jsonBuilder();
        jsonBuild.startObject().field("totalcount", news.getSnumber())
                .field("hottag", news.getHottag())
                .field("badword", news.getBadword())
                .field("status", news.getStatus()).endObject();
        jsonData = jsonBuild.string();
    } catch (IOException e) {
        e.printStackTrace();
    }
    ESHandlerUtil.updateIndex(jsonData, Site.Common.getNewsTable(),
            news.getNid());
}

/**
 * <pre>
 * 只处理新的评论。
 * hbase里存放了该新闻最后一条评论的用户和时间
 * @param newsBean
 * @param cmn
 * @return
 */
private boolean touchLastCmn(NewsBean newsBean, JSONObject cmn) {
    return cmn.get("userId").toString().equals(newsBean.getUid())
            && cmn.get("t").toString().equals(newsBean.getTime());
}

/**
 * 有新的评论吗
 * 
 * @param pageContent
 * @param commentId
 * @return
 */
private boolean commentHasBeenUpdated(String pageContent, String commentId) {
    JSONObject data = fetJsonData(pageContent);
    String total = fetchTotal(data);
    int cmtCount = fetchCmtNum(commentId);
    return Integer.parseInt(total) > cmtCount;
}

private JSONObject fetJsonData(String pageContent) {
    if (StringUtils.isBlank(pageContent)) {
        logger.info("页面内容为空,ip被封了吧。");
    }
    return new JSONObject(pageContent.substring(16));
}

private NewsBean buildNewsBean(String commentId, Page page) {
    String pageContent = page.getHtml();
    NewsBean ret = newsmap.get(commentId);
    JSONObject data = fetJsonData(pageContent);
    if (pageNum == 1) {
        ret.setTotal(fetchTotal(data));
         Result result = HBaseUtil.getResultByRowKey(
         Site.Common.getHbaseTmpTable(), commentId);
         ret.setUid(fetchProperty(result, "uid"));
         ret.setTime(fetchProperty(result, "time"));
         ret.setN(fetchPageNum(page.getUrl()));
        int total = Integer.valueOf(fetchTotal(data));
        int ctnum = fetchCmtNum(commentId);
        ret.setSnumber(ctnum);
        if (ctnum > 0 && (total - ctnum) > 200) {// 热帖
            ret.setHottag("true");
        } else {
            ret.setHottag("false");
        }
        // 30天没评论就不爬了
        if (ctnum >= total
                && DateUtil.calcDayOffsetDate(ret.getNewstime()) > 30) {
            ret.setStatus("2");
        } else {
            ret.setStatus("0");
        }
    }
    JSONArray cmntlist = new JSONArray();
    Object posts = fetchPosts(data);
    if (posts != null && !"null".equals(posts.toString())) {
        cmntlist = (JSONArray) posts;
    }
    ret.setCmntlist(cmntlist);
    return ret;
}

private static int fetchPageNum(String url) {
    return Integer.parseInt(fetchMatcher(url, "_([0-9]*).html"));
}

private Object fetchPosts(JSONObject data) {
    return data.get("newPosts");
}

/**
 * 
 * @param data
 * @return 评论总数
 */
private String fetchTotal(JSONObject data) {
    try {
        return String.valueOf(data.get("tcount"));
    } catch (Exception e2) {
        return "0";
    }
}

private void sendMessage(JSONObject cmt, String commentid,
        String isappraise, String keyword, String isself) {
    try {
        String message = newsmap.get(commentid).getTopicid() + "@@"
                + cmt.get("d") + "@@" + isappraise + "@@" + cmt.get("f")
                + "@@" + keyword + "@@" + cmt.get("t") + "@@" + isself
                + "@@1@@end";
        producerServer.send("comment", message);
    } catch (Exception e) {
        logger.error(e);
    }
}

/**
 * <pre>
 * 前面的数字1表示1楼;
 * f是显示的内容,对应分别为“网易XX网友”,用户ID,用户IP,没有用户ID的就显示“网易XX网友”,连“网易XX网友”都没有的就显示IP地址 ,我猜的;
 *  v:支持数量; 
 *  u不知道是什么粗略看一眼值全部都是’u’; 
 *  d:新闻id;
 *   t:发表时间;
 *   b:评论内容; 
 *   a:反对数量;
 * p就是上面那个div标签的id;
 *  n怀疑是网易帐号或者用户名;
 *   l不知道是什么;
 *    userId:用户id;
 *     ip就是ip了。
 * 
 * @param cmt
 * @param commentid
 * @param isappraise
 * @param keyword
 * @param isself
 * @return
 */
private String obj2JsonData(JSONObject cmt, String commentid,
        String isappraise, String keyword, String isself, String badword) {
    String jsonData = null;
    try {
        XContentBuilder jsonBuild = XContentFactory.jsonBuilder();
        jsonBuild.startObject().field("isappraise", isappraise)
                .field("keyword", keyword).field("isself", isself)
                .field("nid", newsmap.get(commentid).getNid())
                .field("newsid", commentid).field("badword", badword)
                .field("channel", Site.NETEASE.getChannel());
        addFields(jsonBuild, cmt, "f", "b", "n", "t", "ip", "d", "userId");
        jsonBuild = jsonBuild.endObject();
        jsonData = jsonBuild.string();
    } catch (IOException e) {
        logger.error(e);
    }
    return jsonData;
}

private XContentBuilder addFields(XContentBuilder jsonBuild,
        JSONObject cmt, String... keys) throws JSONException, IOException {
    if (jsonBuild == null || cmt == null || keys == null) {
        return jsonBuild;
    }
    for (String key : keys) {
        addField(jsonBuild, cmt, key);
    }
    return jsonBuild;
}

private void addField(XContentBuilder jsonBuild, JSONObject cmt, String key)
        throws JSONException, IOException {
    if (jsonBuild == null || cmt == null || key == null) {
        return;
    }
    Object value = cmt.opt(key);
    if (value != null) {
        jsonBuild.field(commentFieldsMap.get(key), cmt.get(key).toString());
    }
}

/**
 * 初始化情感:关键字、正负面
 * 
 * @throws URISyntaxException
 * @throws Exception
 */
private void initAppraise() throws URISyntaxException, Exception {
    String path = this.getClass().getClassLoader().getResource("").toURI()
            .getPath();
    String trainDataPath = path + "text-classification/";
    String modelFile = path + "text-classification/text_model.gz";
    Reader reader = new FileReader(trainDataPath, "UTF-8", ".data");
    tc.train(reader, modelFile);
    gk = new GetKeywords(path);
}

private static String fetchCommentId(String url) {
    String host = fetchMatcher(url, "comment.([a-zA-Z0-9]*).");
    String boardId = "";
    String threadId = "";
    if (url.contains("cache")) {
        boardId = fetchMatcher(url, "newlist/(.*?)/");
    } else {
        boardId = fetchMatcher(url, "data/(.*?)/");
    }
    if (url.contains("cache")) {
        threadId = fetchMatcher(url, boardId + "/(.*?)_");
    } else {
        threadId = fetchMatcher(url, "df/(.*?)_");
    }
    return host + ":" + boardId + ":" + threadId;
}

private static String fetchMatcher(String text, String reg) {
    Pattern p = Pattern.compile(reg);
    Matcher m = p.matcher(text);
    while (m.find()) {
        return m.group(1).trim();
    }
    return null;
}

private String fetchProperty(Result result, String property,
        String defaultValue) {
    byte[] ret = result.getValue("cf".getBytes(), property.getBytes());
    if (ret == null) {
        return null;
    }
    return new String(ret);
}

private String fetchProperty(Result result, String property) {
    return fetchProperty(result, property, "");
}

}

hujunxianligong commented 8 years ago

建议升级到2.24以上版本,中间修复了部分BUG