sujit-kr / crawler4j

Automatically exported from code.google.com/p/crawler4j
0 stars 0 forks source link

efficiency suggestion #14

Closed GoogleCodeExporter closed 9 years ago

GoogleCodeExporter commented 9 years ago
PageFetcher.Fetch(Page page)  is currently being used by all crawler threads as 
a utility class, it has become a bottleneck. Instead  why dont you put an 
instance of PageFetcher as an instance variable in the WebCrawler class. This 
way the crawlers wont have to wait to have its pages processed.

the changes I made:

WebCrawler gets a new PageFetcher instance variable
public class WebCrawler implements Runnable {

    private PageFetcher pagefetcher;
...
    private int preProcessPage(WebURL curURL) {
        Page page = new Page(curURL);
//      int statusCode = PageFetcher.fetch(page);
        int statusCode = pagefetcher.fetch(page);

and PageFetcher

//  public static int fetch(Page page) {
    public int fetch(Page page) {
...
//          synchronized (mutex) {
//          }

it worked very well for my case !!
I was able to go 5x faster I was downloading about 5 pages per second now I am 
at my bandwidth limit. (no, I was not being polite: 10 threads, 
fetcher.default_politeness_delay=0 my application requires a "snapshot")

please contact me with your conclusions, even if it is to tell me that my 
changes break the code, I really need to know if it does!

Original issue reported on code.google.com by tiago.lb...@gmail.com on 14 Sep 2010 at 7:21

GoogleCodeExporter commented 9 years ago
here is my final version for the PageFetcher and for WebCrawler:
(only problem is in using a proxy)

package edu.uci.ics.crawler4j.crawler;

import java.util.*;

import org.apache.log4j.Logger;

import edu.uci.ics.crawler4j.frontier.DocIDServer;
import edu.uci.ics.crawler4j.frontier.Frontier;
import edu.uci.ics.crawler4j.url.WebURL;

/**
 * Copyright (C) 2010.
 * 
 * @author Yasser Ganjisaffar <yganjisa at uci dot edu>
 */

public class WebCrawler implements Runnable {

    private static final Logger logger = Logger.getLogger(WebCrawler.class
            .getName());

    private Thread myThread;

    private final static int PROCESS_OK = -12;

    private HTMLParser htmlParser;

    int myid;

    private CrawlController myController;

    private PageFetcher pageFetcher;

    public CrawlController getMyController() {
        return myController;
    }

    public void setMyController(CrawlController myController) {
        this.myController = myController;
    }

    public WebCrawler() {
        htmlParser = new HTMLParser();
        pageFetcher = new PageFetcher();
    }

    public WebCrawler(int myid) {
        this.myid = myid;
    }

    public void setMyId(int myid) {
        this.myid = myid;
    }

    public int getMyId() {
        return myid;
    }

    public void onStart() {

    }

    public void onBeforeExit() {

    }

    public Object getMyLocalData() {
        return null;
    }

    public void run() {
        onStart();
        while (true) {
            ArrayList<WebURL> assignedURLs = new ArrayList<WebURL>(50);
            Frontier.getNextURLs(50, assignedURLs);
            if (assignedURLs.size() == 0) {
                if (Frontier.isFinished()) {
                    return;
                }
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                for (WebURL curURL : assignedURLs) {
                    if (curURL != null) {
                        preProcessPage(curURL);
                    }
                }
            }
        }
    }

    public boolean shouldVisit(WebURL url) {
        return true;
    }

    public void visit(Page page) {

    }

    private int preProcessPage(WebURL curURL) {
        if (curURL == null) {
            return -1;
        }
        Page page = new Page(curURL);
        int statusCode = pageFetcher.fetch(page);
        // The page might have been redirected. So we have to refresh curURL
        curURL = page.getWebURL();
        int docid = curURL.getDocid();
        if (statusCode != PageFetchStatus.OK) {
            if (statusCode == PageFetchStatus.PageTooBig) {
                logger.error("Page was bigger than max allowed size: "
                        + curURL.getURL());
            }
            return statusCode;
        }

        try {
            if (!page.isBinary()) {
                htmlParser.parse(page.getHTML(), curURL.getURL());
                page.setText(htmlParser.getText());
                page.setTitle(htmlParser.getTitle());

                Iterator<String> it = htmlParser.getLinks().iterator();
                ArrayList<WebURL> toSchedule = new ArrayList<WebURL>();
                ArrayList<WebURL> toList = new ArrayList<WebURL>();
                while (it.hasNext()) {
                    String url = it.next();
                    if (url != null) {
                        int newdocid = DocIDServer.getDocID(url);
                        if (newdocid > 0) {
                            if (newdocid != docid) {
                                toList.add(new WebURL(url, newdocid));
                            }
                        } else {
                            toList.add(new WebURL(url, -newdocid));
                            WebURL cur = new WebURL(url, -newdocid);
                            if (shouldVisit(cur)) {
                                cur.setParentDocid(docid);
                                toSchedule.add(cur);
                            }
                        }
                    }
                }
                Frontier.scheduleAll(toSchedule);
                page.setURLs(toList);
            }
            visit(page);
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(e.getMessage() + ", while processing: "
                    + curURL.getURL());
        }
        return PROCESS_OK;
    }

    public Thread getThread() {
        return myThread;
    }

    public void setThread(Thread myThread) {
        this.myThread = myThread;
    }
}

package edu.uci.ics.crawler4j.crawler;

import java.io.IOException;
import java.util.Date;

import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.HttpVersion;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.conn.params.ConnManagerParams;
import org.apache.http.conn.params.ConnPerRouteBean;
import org.apache.http.conn.params.ConnRoutePNames;
import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpParams;
import org.apache.http.params.HttpProtocolParamBean;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import edu.uci.ics.crawler4j.frontier.DocIDServer;
import edu.uci.ics.crawler4j.frontier.Frontier;
import edu.uci.ics.crawler4j.frontier.WorkQueues;
import edu.uci.ics.crawler4j.url.URLCanonicalizer;
import edu.uci.ics.crawler4j.url.WebURL;

/**
 * Copyright (C) 2010.
 * 
 * @author Yasser Ganjisaffar <yganjisa at uci dot edu>
 */

public final class PageFetcher {

    private static final Logger logger = Logger.getLogger(PageFetcher.class);

    private ThreadSafeClientConnManager connectionManager;

    private DefaultHttpClient httpclient;

    private static Object mutex = PageFetcher.class.toString() + "_MUTEX";

    private static int processedCount = 0;

    private static long startOfPeriod = 0;

//  private static long lastFetchTime = 0;

    private static long politenessDelay = Configurations.getIntProperty(
            "fetcher.default_politeness_delay", 200);

    public static final int MAX_DOWNLOAD_SIZE = Configurations.getIntProperty(
            "fetcher.max_download_size", 1048576);

    private static final boolean show404Pages = Configurations
            .getBooleanProperty("logging.show_404_pages", true);

    public static long getPolitenessDelay() {
        return politenessDelay;
    }

    public static void setPolitenessDelay(long politenessDelay) {
        PageFetcher.politenessDelay = politenessDelay;
    }

    public PageFetcher() {
        HttpParams params = new BasicHttpParams();
        HttpProtocolParamBean paramsBean = new HttpProtocolParamBean(params);
        paramsBean.setVersion(HttpVersion.HTTP_1_1);
        paramsBean.setContentCharset("UTF-8");
        paramsBean.setUseExpectContinue(false);

        params.setParameter("http.useragent", Configurations.getStringProperty(
                "fetcher.user_agent",
                "crawler4j (http://code.google.com/p/crawler4j/)"));

        params.setIntParameter("http.socket.timeout", Configurations
                .getIntProperty("fetcher.socket_timeout", 20000));

        params.setIntParameter("http.connection.timeout", Configurations
                .getIntProperty("fetcher.connection_timeout", 30000));

        params.setBooleanParameter("http.protocol.handle-redirects",
                Configurations.getBooleanProperty("fetcher.follow_redirects",
                        true));

        ConnPerRouteBean connPerRouteBean = new ConnPerRouteBean();
        connPerRouteBean.setDefaultMaxPerRoute(Configurations.getIntProperty(
                "fetcher.max_connections_per_host", 100));
        ConnManagerParams.setMaxConnectionsPerRoute(params, connPerRouteBean);
        ConnManagerParams.setMaxTotalConnections(params, Configurations
                .getIntProperty("fetcher.max_total_connections", 100));

        SchemeRegistry schemeRegistry = new SchemeRegistry();
        schemeRegistry.register(new Scheme("http", PlainSocketFactory
                .getSocketFactory(), 80));

        if (Configurations.getBooleanProperty("fetcher.crawl_https", false)) {
            schemeRegistry.register(new Scheme("https", SSLSocketFactory
                    .getSocketFactory(), 443));
        }

        connectionManager = new ThreadSafeClientConnManager(params,
                schemeRegistry);
        logger.setLevel(Level.INFO);
        httpclient = new DefaultHttpClient(connectionManager, params);
        new Thread(new IdleConnectionMonitorThread(connectionManager)).start();
    }

    public static void startConnectionMonitorThread() {
        //new Thread(new IdleConnectionMonitorThread(connectionManager)).start();
    }

    public int fetch(Page page) {
        String toFetchURL = page.getWebURL().getURL();
        HttpGet get = null;
        HttpEntity entity = null;
        try {
            get = new HttpGet(toFetchURL);
            updateFetchCount();
            HttpResponse response = httpclient.execute(get);
            entity = response.getEntity();

            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                if (response.getStatusLine().getStatusCode() != HttpStatus.SC_NOT_FOUND) {
                    logger.info("Failed: "
                            + response.getStatusLine().toString()
                            + ", while fetching " + toFetchURL);
                } else if (show404Pages) {
                    logger.info("Not Found: " + toFetchURL
                            + " (Link found in doc#: "
                            + page.getWebURL().getParentDocid() + ")");
                }
                return response.getStatusLine().getStatusCode();
            }

            String uri = get.getURI().toString();
            if (!uri.equals(toFetchURL)) {
                if (!URLCanonicalizer.getCanonicalURL(uri).equals(toFetchURL)) {
                    int newdocid = DocIDServer.getDocID(uri);
                    if (newdocid != -1) {
                        if (newdocid > 0) {
                            return PageFetchStatus.RedirectedPageIsSeen;
                        }
                        page.setWebURL(new WebURL(uri, -newdocid));
                    }
                }
            }

            if (entity != null) {
                long size = entity.getContentLength();
                if (size == -1) {
                    Header length = response.getLastHeader("Content-Length");
                    if (length == null) {
                        length = response.getLastHeader("Content-length");
                    }
                    if (length != null) {
                        size = Integer.parseInt(length.getValue());
                    } else {
                        size = -1;
                    }
                }
                if (size > MAX_DOWNLOAD_SIZE) {
                    entity.consumeContent();
                    return PageFetchStatus.PageTooBig;
                }

                boolean isBinary = false;

                Header type = entity.getContentType();
                if (type != null
                        && type.getValue().toLowerCase().contains("image")) {
                    isBinary = true;
                }

                if (page.load(entity.getContent(), (int) size, isBinary)) {
                    return PageFetchStatus.OK;
                } else {
                    return PageFetchStatus.PageLoadError;
                }
            } else {
                get.abort();
            }
        } catch (IOException e) {
            logger.error("Fatal transport error: " + e.getMessage()
                    + " while fetching " + toFetchURL + " (link found in doc #"
                    + page.getWebURL().getParentDocid() + ")");
            return PageFetchStatus.FatalTransportError;
        } catch (IllegalStateException e) {
            // ignoring exceptions that occur because of not registering https
            // and other schemes
        } catch (Exception e) {
            if (e.getMessage() == null) {
                logger.error("Error while fetching "
                        + page.getWebURL().getURL());
            } else {
                logger.error(e.getMessage() + " while fetching "
                        + page.getWebURL().getURL());
            }
        } finally {
            try {
                if (entity != null) {
                    entity.consumeContent();
                } else if (get != null) {
                    get.abort();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return PageFetchStatus.UnknownError;
    }

    private static synchronized void updateFetchCount() {
        long now = (new Date()).getTime();
        if (now - startOfPeriod > 10000) {
            logger.info("Number of pages fetched per second: "
                    + processedCount / ((now - startOfPeriod) / 1000));
            logger.info("Queue length: " + Frontier.getQueueLength());
            logger.info("URLs registered: " + DocIDServer.getDocCount());
            processedCount = 0;
            startOfPeriod = now;
        }
        processedCount++;
    }

    public static void setProxy(String proxyHost, int proxyPort) {
//      HttpHost proxy = new HttpHost(proxyHost, proxyPort);
//      httpclient.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY,
//              proxy);
    }

    public static void setProxy(String proxyHost, int proxyPort,
            String username, String password) {
//      httpclient.getCredentialsProvider().setCredentials(
//              new AuthScope(proxyHost, proxyPort),
//              new UsernamePasswordCredentials(username, password));
//      setProxy(proxyHost, proxyPort);
    }

}

Original comment by tiago.lb...@gmail.com on 28 Sep 2010 at 11:35

Attachments:

GoogleCodeExporter commented 9 years ago
Thanks for your suggestions and sorry for the delay in responding. I was busy 
with some other projects.

As you might have noticed, the PageFetcher class uses 
ThreadSafeClientConnManager which maintains a pool of client connections and is 
able to serve multiple threads. So, having a PageFetcher per thread shouldn't 
give you any speedup. You should get the same speed up by modifying the 
parameters (increasing threads, max connections, ...). 

Original comment by ganjisaffar@gmail.com on 4 Oct 2010 at 7:59

GoogleCodeExporter commented 9 years ago
I hope you reconsider,

The way your code is currently functioning it wont matter the number of threads 
you try to open they all will always try to go through the *ONE* PageFetcher 
static class that exists in the memory of the program. 
ThreadSafeClientConnManager is serving only this *one* static class, not the 
wealth of threads that one might try to create.

I suggest you to at least test the version that I created, try increasing the 
number of threads on both (yours and mine) versions. you will find that your 
version will soon come to a limit where it wont matter how many more threads 
you try to start.

PageFetcher was the greatest bottleneck that existed in your code, I found the 
second biggest but I do not believe there is much to be done for that.

I would very much like to further discus this with you if you wish, if you are 
interested please send me an e-mail at
tiagooa at dsc dot ufcg dot edu dot br
or
tiagolbqrq at gmail dot com

Original comment by tiago.lb...@gmail.com on 5 Oct 2010 at 9:37

GoogleCodeExporter commented 9 years ago
But the static PageFetcher class doesn't have any synchronized function to make 
it a bottleneck. The reason that you see speedup there is the fact that you 
have different instances of the connection manager and you reach to their 
maximum limits later. My point is that if you increase the limits of the 
connection manager, you should get similar results.

What do you think?

Original comment by ganjisaffar@gmail.com on 5 Oct 2010 at 9:52

GoogleCodeExporter commented 9 years ago
You are absolutely correct, as I increase the number of PageFetcher instances I 
implicitly increase the maximum number simultaneous connections without 
noticing, unfortunately I am not in my lab where I can test it but you are 
correct.

so, so sorry for the trouble I caused and time stole you!!

Original comment by tiago.lb...@gmail.com on 5 Oct 2010 at 10:26