diff options
-rw-r--r-- | README.md | 44 | ||||
-rw-r--r-- | analysis/links.go | 1 | ||||
-rw-r--r-- | client.go | 31 | ||||
-rw-r--r-- | cmd/crawl/crawl.go | 43 | ||||
-rw-r--r-- | crawler.go | 68 | ||||
-rw-r--r-- | queue.go | 2 |
6 files changed, 160 insertions, 29 deletions
diff --git a/README.md b/README.md new file mode 100644 index 0000000..34360fa --- /dev/null +++ b/README.md @@ -0,0 +1,44 @@ +A very simple crawler +===================== + +This tool can crawl a bunch of URLs for HTML content, and save the +results in a nice WARC file. It has little control over its traffic, +save for a limit on concurrent outbound requests. Its main purpose is +to quickly and efficiently save websites for archival purposes. + +The *crawl* tool saves its state in a database, so it can be safely +interrupted and restarted without issues. + +# Installation + +From this source directory (checked out in the correct place in your +GOPATH), run: + + $ go install cmd/crawl + +# Usage + +Just run *crawl* by passing the URLs of the websites you want to crawl +as arguments on the command line: + + $ crawl http://example.com/ + +By default, the tool will store the output WARC file and its own +database in the current directory. This can be controlled with the +*--output* and *--state* command-line options. + +The crawling scope is controlled with a set of overlapping checks: + +* URL scheme must be one of *http* or *https* +* URL must have one of the seeds as a prefix (an eventual *www.* + prefix is implicitly ignored) +* maximum crawling depth can be controlled with the *--depth* option +* resources related to a page (CSS, JS, etc) will always be fetched, + even if on external domains, if the *--include-related* option is + specified + +If the program is interrupted, running it again with the same command +line from the same directory will cause it to resume crawling from +where it stopped. At the end of a successful crawl, the database will +be removed (unless you specify the *--keep* option, for debugging +purposes). diff --git a/analysis/links.go b/analysis/links.go index 3f5a795..97957ad 100644 --- a/analysis/links.go +++ b/analysis/links.go @@ -26,6 +26,7 @@ var ( {"link", "href", crawl.TagRelated}, {"img", "src", crawl.TagRelated}, {"script", "src", crawl.TagRelated}, + {"iframe", "src", crawl.TagRelated}, } ) diff --git a/client.go b/client.go new file mode 100644 index 0000000..c0c2626 --- /dev/null +++ b/client.go @@ -0,0 +1,31 @@ +package crawl + +import ( + "crypto/tls" + "net/http" + "net/http/cookiejar" + "time" +) + +var defaultClientTimeout = 60 * time.Second + +var DefaultClient *http.Client + +// DefaultClient returns a http.Client suitable for crawling: does not +// follow redirects, accepts invalid TLS certificates, sets a +// reasonable timeout for requests. +func init() { + jar, _ := cookiejar.New(nil) + DefaultClient = &http.Client{ + Timeout: defaultClientTimeout, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + Jar: jar, + } +} diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go index e31f63e..e7e8582 100644 --- a/cmd/crawl/crawl.go +++ b/cmd/crawl/crawl.go @@ -11,10 +11,13 @@ import ( "log" "net/http" "os" + "os/signal" + "runtime/pprof" "strconv" "strings" "sync" "sync/atomic" + "syscall" "time" "git.autistici.org/ale/crawl" @@ -30,6 +33,8 @@ var ( validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols") alwaysIncludeRelated = flag.Bool("include-related", false, "always include related resources (css, images, etc)") outputFile = flag.String("output", "crawl.warc.gz", "output WARC file") + + cpuprofile = flag.String("cpuprofile", "", "create cpu profile") ) func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error { @@ -147,14 +152,10 @@ func (c *crawlStats) Dump() { fmt.Fprintf(os.Stderr, "stats: downloaded %d bytes (%.4g KB/s), status: %v\n", c.bytes, rate, c.states) } -var ( - stats *crawlStats - - client *http.Client -) +var stats *crawlStats func fetch(urlstr string) (*http.Response, error) { - resp, err := client.Get(urlstr) + resp, err := crawl.DefaultClient.Get(urlstr) if err == nil { stats.Update(resp) } @@ -162,8 +163,6 @@ func fetch(urlstr string) (*http.Response, error) { } func init() { - client = &http.Client{} - stats = &crawlStats{ states: make(map[int]int), start: time.Now(), @@ -191,6 +190,17 @@ func (b *byteCounter) Read(buf []byte) (int, error) { func main() { flag.Parse() + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + log.Fatal("could not create CPU profile: ", err) + } + if err := pprof.StartCPUProfile(f); err != nil { + log.Fatal("could not start CPU profile: ", err) + } + defer pprof.StopCPUProfile() + } + outf, err := os.Create(*outputFile) if err != nil { log.Fatal(err) @@ -216,9 +226,26 @@ func main() { if err != nil { log.Fatal(err) } + + // Set up signal handlers so we can terminate gently if possible. + var signaled atomic.Value + signaled.Store(false) + sigCh := make(chan os.Signal, 1) + go func() { + <-sigCh + log.Printf("exiting due to signal") + signaled.Store(true) + crawler.Stop() + }() + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + crawler.Run(*concurrency) crawler.Close() + + if signaled.Load().(bool) { + os.Exit(1) + } if !*keepDb { os.RemoveAll(*dbPath) } @@ -10,6 +10,7 @@ import ( "net/http" "net/url" "sync" + "sync/atomic" "time" "github.com/PuerkitoBio/purell" @@ -136,6 +137,9 @@ type Crawler struct { fetcher Fetcher handler Handler + stopCh chan bool + stopping atomic.Value + enqueueMx sync.Mutex } @@ -169,16 +173,25 @@ func (c *Crawler) Enqueue(link Outlink, depth int) { c.db.Write(wb, nil) } +var scanInterval = 1 * time.Second + // Scan the queue for URLs until there are no more. func (c *Crawler) process() <-chan queuePair { - ch := make(chan queuePair) + ch := make(chan queuePair, 100) go func() { - for range time.Tick(2 * time.Second) { - if err := c.queue.Scan(ch); err != nil { - break + t := time.NewTicker(scanInterval) + defer t.Stop() + defer close(ch) + for { + select { + case <-t.C: + if err := c.queue.Scan(ch); err != nil { + return + } + case <-c.stopCh: + return } } - close(ch) }() return ch } @@ -186,6 +199,13 @@ func (c *Crawler) process() <-chan queuePair { // Main worker loop. func (c *Crawler) urlHandler(queue <-chan queuePair) { for p := range queue { + // Stop flag needs to short-circuit the queue (which + // is buffered), or it's going to take a while before + // we actually stop. + if c.stopping.Load().(bool) { + return + } + // Retrieve the URLInfo object from the crawl db. // Ignore errors, we can work with an empty object. urlkey := []byte(fmt.Sprintf("url/%s", p.URL)) @@ -254,7 +274,9 @@ func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler handler: h, seeds: seeds, scope: scope, + stopCh: make(chan bool), } + c.stopping.Store(false) // Recover active tasks. c.queue.Recover() @@ -283,6 +305,12 @@ func (c *Crawler) Run(concurrency int) { wg.Wait() } +// Stop a running crawl. This will cause a running Run function to return. +func (c *Crawler) Stop() { + c.stopping.Store(true) + close(c.stopCh) +} + // Close the database and release resources associated with the crawler state. func (c *Crawler) Close() { c.db.Close() @@ -293,22 +321,24 @@ type redirectHandler struct { } func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error { - if err == nil { - if resp.StatusCode == 200 { - err = wrap.h.Handle(c, u, depth, resp, err) - } else if resp.StatusCode > 300 && resp.StatusCode < 400 { - location := resp.Header.Get("Location") - if location != "" { - locationURL, err := resp.Request.URL.Parse(location) - if err != nil { - log.Printf("error parsing Location header: %v", err) - } else { - c.Enqueue(Outlink{URL: locationURL, Tag: TagPrimary}, depth+1) - } + if err != nil { + return err + } + + if resp.StatusCode == 200 { + err = wrap.h.Handle(c, u, depth, resp, err) + } else if resp.StatusCode > 300 && resp.StatusCode < 400 { + location := resp.Header.Get("Location") + if location != "" { + locationURL, err := resp.Request.URL.Parse(location) + if err != nil { + log.Printf("error parsing Location header: %v", err) + } else { + c.Enqueue(Outlink{URL: locationURL, Tag: TagPrimary}, depth+1) } - } else { - err = errors.New(resp.Status) } + } else { + err = errors.New(resp.Status) } return err } @@ -65,10 +65,8 @@ func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) func (q *queue) acquire(qp queuePair) { wb := new(leveldb.Batch) - q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp) wb.Delete(qp.key) - q.db.Write(wb, nil) atomic.AddInt32(&q.numActive, 1) |