Closed GoogleCodeExporter closed 9 years ago
here is my final version for the PageFetcher and for WebCrawler:
(only problem is in using a proxy)
package edu.uci.ics.crawler4j.crawler;
import java.util.*;
import org.apache.log4j.Logger;
import edu.uci.ics.crawler4j.frontier.DocIDServer;
import edu.uci.ics.crawler4j.frontier.Frontier;
import edu.uci.ics.crawler4j.url.WebURL;
/**
* Copyright (C) 2010.
*
* @author Yasser Ganjisaffar <yganjisa at uci dot edu>
*/
public class WebCrawler implements Runnable {
private static final Logger logger = Logger.getLogger(WebCrawler.class
.getName());
private Thread myThread;
private final static int PROCESS_OK = -12;
private HTMLParser htmlParser;
int myid;
private CrawlController myController;
private PageFetcher pageFetcher;
public CrawlController getMyController() {
return myController;
}
public void setMyController(CrawlController myController) {
this.myController = myController;
}
public WebCrawler() {
htmlParser = new HTMLParser();
pageFetcher = new PageFetcher();
}
public WebCrawler(int myid) {
this.myid = myid;
}
public void setMyId(int myid) {
this.myid = myid;
}
public int getMyId() {
return myid;
}
public void onStart() {
}
public void onBeforeExit() {
}
public Object getMyLocalData() {
return null;
}
public void run() {
onStart();
while (true) {
ArrayList<WebURL> assignedURLs = new ArrayList<WebURL>(50);
Frontier.getNextURLs(50, assignedURLs);
if (assignedURLs.size() == 0) {
if (Frontier.isFinished()) {
return;
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
for (WebURL curURL : assignedURLs) {
if (curURL != null) {
preProcessPage(curURL);
}
}
}
}
}
public boolean shouldVisit(WebURL url) {
return true;
}
public void visit(Page page) {
}
private int preProcessPage(WebURL curURL) {
if (curURL == null) {
return -1;
}
Page page = new Page(curURL);
int statusCode = pageFetcher.fetch(page);
// The page might have been redirected. So we have to refresh curURL
curURL = page.getWebURL();
int docid = curURL.getDocid();
if (statusCode != PageFetchStatus.OK) {
if (statusCode == PageFetchStatus.PageTooBig) {
logger.error("Page was bigger than max allowed size: "
+ curURL.getURL());
}
return statusCode;
}
try {
if (!page.isBinary()) {
htmlParser.parse(page.getHTML(), curURL.getURL());
page.setText(htmlParser.getText());
page.setTitle(htmlParser.getTitle());
Iterator<String> it = htmlParser.getLinks().iterator();
ArrayList<WebURL> toSchedule = new ArrayList<WebURL>();
ArrayList<WebURL> toList = new ArrayList<WebURL>();
while (it.hasNext()) {
String url = it.next();
if (url != null) {
int newdocid = DocIDServer.getDocID(url);
if (newdocid > 0) {
if (newdocid != docid) {
toList.add(new WebURL(url, newdocid));
}
} else {
toList.add(new WebURL(url, -newdocid));
WebURL cur = new WebURL(url, -newdocid);
if (shouldVisit(cur)) {
cur.setParentDocid(docid);
toSchedule.add(cur);
}
}
}
}
Frontier.scheduleAll(toSchedule);
page.setURLs(toList);
}
visit(page);
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage() + ", while processing: "
+ curURL.getURL());
}
return PROCESS_OK;
}
public Thread getThread() {
return myThread;
}
public void setThread(Thread myThread) {
this.myThread = myThread;
}
}
package edu.uci.ics.crawler4j.crawler;
import java.io.IOException;
import java.util.Date;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.HttpVersion;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.conn.params.ConnManagerParams;
import org.apache.http.conn.params.ConnPerRouteBean;
import org.apache.http.conn.params.ConnRoutePNames;
import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpParams;
import org.apache.http.params.HttpProtocolParamBean;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import edu.uci.ics.crawler4j.frontier.DocIDServer;
import edu.uci.ics.crawler4j.frontier.Frontier;
import edu.uci.ics.crawler4j.frontier.WorkQueues;
import edu.uci.ics.crawler4j.url.URLCanonicalizer;
import edu.uci.ics.crawler4j.url.WebURL;
/**
* Copyright (C) 2010.
*
* @author Yasser Ganjisaffar <yganjisa at uci dot edu>
*/
public final class PageFetcher {
private static final Logger logger = Logger.getLogger(PageFetcher.class);
private ThreadSafeClientConnManager connectionManager;
private DefaultHttpClient httpclient;
private static Object mutex = PageFetcher.class.toString() + "_MUTEX";
private static int processedCount = 0;
private static long startOfPeriod = 0;
// private static long lastFetchTime = 0;
private static long politenessDelay = Configurations.getIntProperty(
"fetcher.default_politeness_delay", 200);
public static final int MAX_DOWNLOAD_SIZE = Configurations.getIntProperty(
"fetcher.max_download_size", 1048576);
private static final boolean show404Pages = Configurations
.getBooleanProperty("logging.show_404_pages", true);
public static long getPolitenessDelay() {
return politenessDelay;
}
public static void setPolitenessDelay(long politenessDelay) {
PageFetcher.politenessDelay = politenessDelay;
}
public PageFetcher() {
HttpParams params = new BasicHttpParams();
HttpProtocolParamBean paramsBean = new HttpProtocolParamBean(params);
paramsBean.setVersion(HttpVersion.HTTP_1_1);
paramsBean.setContentCharset("UTF-8");
paramsBean.setUseExpectContinue(false);
params.setParameter("http.useragent", Configurations.getStringProperty(
"fetcher.user_agent",
"crawler4j (http://code.google.com/p/crawler4j/)"));
params.setIntParameter("http.socket.timeout", Configurations
.getIntProperty("fetcher.socket_timeout", 20000));
params.setIntParameter("http.connection.timeout", Configurations
.getIntProperty("fetcher.connection_timeout", 30000));
params.setBooleanParameter("http.protocol.handle-redirects",
Configurations.getBooleanProperty("fetcher.follow_redirects",
true));
ConnPerRouteBean connPerRouteBean = new ConnPerRouteBean();
connPerRouteBean.setDefaultMaxPerRoute(Configurations.getIntProperty(
"fetcher.max_connections_per_host", 100));
ConnManagerParams.setMaxConnectionsPerRoute(params, connPerRouteBean);
ConnManagerParams.setMaxTotalConnections(params, Configurations
.getIntProperty("fetcher.max_total_connections", 100));
SchemeRegistry schemeRegistry = new SchemeRegistry();
schemeRegistry.register(new Scheme("http", PlainSocketFactory
.getSocketFactory(), 80));
if (Configurations.getBooleanProperty("fetcher.crawl_https", false)) {
schemeRegistry.register(new Scheme("https", SSLSocketFactory
.getSocketFactory(), 443));
}
connectionManager = new ThreadSafeClientConnManager(params,
schemeRegistry);
logger.setLevel(Level.INFO);
httpclient = new DefaultHttpClient(connectionManager, params);
new Thread(new IdleConnectionMonitorThread(connectionManager)).start();
}
public static void startConnectionMonitorThread() {
//new Thread(new IdleConnectionMonitorThread(connectionManager)).start();
}
public int fetch(Page page) {
String toFetchURL = page.getWebURL().getURL();
HttpGet get = null;
HttpEntity entity = null;
try {
get = new HttpGet(toFetchURL);
updateFetchCount();
HttpResponse response = httpclient.execute(get);
entity = response.getEntity();
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_NOT_FOUND) {
logger.info("Failed: "
+ response.getStatusLine().toString()
+ ", while fetching " + toFetchURL);
} else if (show404Pages) {
logger.info("Not Found: " + toFetchURL
+ " (Link found in doc#: "
+ page.getWebURL().getParentDocid() + ")");
}
return response.getStatusLine().getStatusCode();
}
String uri = get.getURI().toString();
if (!uri.equals(toFetchURL)) {
if (!URLCanonicalizer.getCanonicalURL(uri).equals(toFetchURL)) {
int newdocid = DocIDServer.getDocID(uri);
if (newdocid != -1) {
if (newdocid > 0) {
return PageFetchStatus.RedirectedPageIsSeen;
}
page.setWebURL(new WebURL(uri, -newdocid));
}
}
}
if (entity != null) {
long size = entity.getContentLength();
if (size == -1) {
Header length = response.getLastHeader("Content-Length");
if (length == null) {
length = response.getLastHeader("Content-length");
}
if (length != null) {
size = Integer.parseInt(length.getValue());
} else {
size = -1;
}
}
if (size > MAX_DOWNLOAD_SIZE) {
entity.consumeContent();
return PageFetchStatus.PageTooBig;
}
boolean isBinary = false;
Header type = entity.getContentType();
if (type != null
&& type.getValue().toLowerCase().contains("image")) {
isBinary = true;
}
if (page.load(entity.getContent(), (int) size, isBinary)) {
return PageFetchStatus.OK;
} else {
return PageFetchStatus.PageLoadError;
}
} else {
get.abort();
}
} catch (IOException e) {
logger.error("Fatal transport error: " + e.getMessage()
+ " while fetching " + toFetchURL + " (link found in doc #"
+ page.getWebURL().getParentDocid() + ")");
return PageFetchStatus.FatalTransportError;
} catch (IllegalStateException e) {
// ignoring exceptions that occur because of not registering https
// and other schemes
} catch (Exception e) {
if (e.getMessage() == null) {
logger.error("Error while fetching "
+ page.getWebURL().getURL());
} else {
logger.error(e.getMessage() + " while fetching "
+ page.getWebURL().getURL());
}
} finally {
try {
if (entity != null) {
entity.consumeContent();
} else if (get != null) {
get.abort();
}
} catch (Exception e) {
e.printStackTrace();
}
}
return PageFetchStatus.UnknownError;
}
private static synchronized void updateFetchCount() {
long now = (new Date()).getTime();
if (now - startOfPeriod > 10000) {
logger.info("Number of pages fetched per second: "
+ processedCount / ((now - startOfPeriod) / 1000));
logger.info("Queue length: " + Frontier.getQueueLength());
logger.info("URLs registered: " + DocIDServer.getDocCount());
processedCount = 0;
startOfPeriod = now;
}
processedCount++;
}
public static void setProxy(String proxyHost, int proxyPort) {
// HttpHost proxy = new HttpHost(proxyHost, proxyPort);
// httpclient.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY,
// proxy);
}
public static void setProxy(String proxyHost, int proxyPort,
String username, String password) {
// httpclient.getCredentialsProvider().setCredentials(
// new AuthScope(proxyHost, proxyPort),
// new UsernamePasswordCredentials(username, password));
// setProxy(proxyHost, proxyPort);
}
}
Original comment by tiago.lb...@gmail.com
on 28 Sep 2010 at 11:35
Attachments:
Thanks for your suggestions and sorry for the delay in responding. I was busy
with some other projects.
As you might have noticed, the PageFetcher class uses
ThreadSafeClientConnManager which maintains a pool of client connections and is
able to serve multiple threads. So, having a PageFetcher per thread shouldn't
give you any speedup. You should get the same speed up by modifying the
parameters (increasing threads, max connections, ...).
Original comment by ganjisaffar@gmail.com
on 4 Oct 2010 at 7:59
I hope you reconsider,
The way your code is currently functioning it wont matter the number of threads
you try to open they all will always try to go through the *ONE* PageFetcher
static class that exists in the memory of the program.
ThreadSafeClientConnManager is serving only this *one* static class, not the
wealth of threads that one might try to create.
I suggest you to at least test the version that I created, try increasing the
number of threads on both (yours and mine) versions. you will find that your
version will soon come to a limit where it wont matter how many more threads
you try to start.
PageFetcher was the greatest bottleneck that existed in your code, I found the
second biggest but I do not believe there is much to be done for that.
I would very much like to further discus this with you if you wish, if you are
interested please send me an e-mail at
tiagooa at dsc dot ufcg dot edu dot br
or
tiagolbqrq at gmail dot com
Original comment by tiago.lb...@gmail.com
on 5 Oct 2010 at 9:37
But the static PageFetcher class doesn't have any synchronized function to make
it a bottleneck. The reason that you see speedup there is the fact that you
have different instances of the connection manager and you reach to their
maximum limits later. My point is that if you increase the limits of the
connection manager, you should get similar results.
What do you think?
Original comment by ganjisaffar@gmail.com
on 5 Oct 2010 at 9:52
You are absolutely correct, as I increase the number of PageFetcher instances I
implicitly increase the maximum number simultaneous connections without
noticing, unfortunately I am not in my lab where I can test it but you are
correct.
so, so sorry for the trouble I caused and time stole you!!
Original comment by tiago.lb...@gmail.com
on 5 Oct 2010 at 10:26
Original issue reported on code.google.com by
tiago.lb...@gmail.com
on 14 Sep 2010 at 7:21