package crawl import ( "bytes" "encoding/gob" "errors" "fmt" "io" "log" "net/http" "net/url" "sync" "sync/atomic" "time" "github.com/PuerkitoBio/purell" "github.com/syndtr/goleveldb/leveldb" lerr "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/iterator" lutil "github.com/syndtr/goleveldb/leveldb/util" ) var errorRetryDelay = 180 * time.Second type gobDB struct { *leveldb.DB } func newGobDB(path string) (*gobDB, error) { db, err := leveldb.OpenFile(path, nil) if lerr.IsCorrupted(err) { log.Printf("corrupted database, recovering...") db, err = leveldb.RecoverFile(path, nil) } if err != nil { return nil, err } return &gobDB{db}, nil } func (db *gobDB) PutObjBatch(wb *leveldb.Batch, key []byte, obj interface{}) error { var b bytes.Buffer if err := gob.NewEncoder(&b).Encode(obj); err != nil { return err } wb.Put(key, b.Bytes()) return nil } func (db *gobDB) GetObj(key []byte, obj interface{}) error { data, err := db.Get(key, nil) if err != nil { return err } if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(obj); err != nil { return err } return nil } func (db *gobDB) NewPrefixIterator(prefix []byte) *gobIterator { return newGobIterator(db.NewIterator(lutil.BytesPrefix(prefix), nil)) } func (db *gobDB) NewRangeIterator(startKey, endKey []byte) *gobIterator { return newGobIterator(db.NewIterator(&lutil.Range{Start: startKey, Limit: endKey}, nil)) } type gobIterator struct { iterator.Iterator } func newGobIterator(i iterator.Iterator) *gobIterator { return &gobIterator{i} } func (i *gobIterator) Value(obj interface{}) error { return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj) } // Outlink is a tagged outbound link. type Outlink struct { URL *url.URL Tag int } const ( // TagPrimary is a primary reference (another web page). TagPrimary = iota // TagRelated is a secondary resource, related to a page. TagRelated ) // A Fetcher retrieves contents from remote URLs. type Fetcher interface { // Fetch retrieves a URL and returns the response. Fetch(string) (*http.Response, error) } // FetcherFunc wraps a simple function into the Fetcher interface. type FetcherFunc func(string) (*http.Response, error) // Fetch retrieves a URL and returns the response. func (f FetcherFunc) Fetch(u string) (*http.Response, error) { return f(u) } // A Handler processes crawled contents. Any errors returned by public // implementations of this interface are considered fatal and will // cause the crawl to abort. The URL will be removed from the queue // unless the handler returns the special error ErrRetryRequest. type Handler interface { // Handle the response from a URL. Handle(Publisher, string, int, int, *http.Response, error) error } // HandlerFunc wraps a function into the Handler interface. type HandlerFunc func(Publisher, string, int, int, *http.Response, error) error // Handle the response from a URL. func (f HandlerFunc) Handle(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { return f(p, u, tag, depth, resp, err) } // ErrRetryRequest is returned by a Handler when the request should be // retried after some time. var ErrRetryRequest = errors.New("retry_request") // Publisher is an interface to something with an Enqueue() method to // add new potential URLs to crawl. type Publisher interface { Enqueue(Outlink, int) error } // The Crawler object contains the crawler state. type Crawler struct { db *gobDB queue *queue seeds []*url.URL scope Scope fetcher Fetcher handler Handler stopCh chan bool stopping atomic.Value enqueueMx sync.Mutex } func normalizeURL(u *url.URL) *url.URL { urlStr := purell.NormalizeURL(u, purell.FlagsSafe|purell.FlagRemoveDotSegments|purell.FlagRemoveDuplicateSlashes| purell.FlagRemoveFragment|purell.FlagSortQuery) u2, err := url.Parse(urlStr) if err != nil { // Ignore errors here. return nil } return u2 } func seenKey(u *url.URL) []byte { return []byte(fmt.Sprintf("_seen/%s", u.String())) } func (c *Crawler) hasSeen(u *url.URL) bool { _, err := c.db.Get(seenKey(u), nil) return err == nil } func (c *Crawler) setSeen(wb *leveldb.Batch, u *url.URL) { wb.Put(seenKey(u), []byte{}) } // Enqueue a (possibly new) URL for processing. func (c *Crawler) Enqueue(link Outlink, depth int) error { // Normalize the URL. We are going to replace link.URL in-place, to // ensure that scope checks are applied to the normalized URL. link.URL = normalizeURL(link.URL) if link.URL == nil { // We couldn't parse a URL that we have extracted // ourselves from the documents. This is an internal // inconsistency, but by ignoring the error we avoid // failing the entire crawl. return nil } // See if it's in scope. if !c.scope.Check(link, depth) { return nil } // Protect the read-modify-update below with a mutex. c.enqueueMx.Lock() defer c.enqueueMx.Unlock() // Check if we've already seen it. if c.hasSeen(link.URL) { return nil } // Store the URL in the queue, and mark it as seen to make // sure that subsequent calls to Enqueue with the same URL // will fail. wb := new(leveldb.Batch) if err := c.queue.Add(wb, link.URL.String(), link.Tag, depth, time.Now()); err != nil { return err } c.setSeen(wb, link.URL) return c.db.Write(wb, nil) } var scanInterval = 1 * time.Second // Scan the queue for URLs until there are no more. func (c *Crawler) process() <-chan queuePair { ch := make(chan queuePair, 100) go func() { t := time.NewTicker(scanInterval) defer t.Stop() defer close(ch) for { select { case <-t.C: if err := c.queue.Scan(ch); err != nil { return } case <-c.stopCh: return } } }() return ch } // Main worker loop. func (c *Crawler) urlHandler(queue <-chan queuePair) { for p := range queue { // Stop flag needs to short-circuit the queue (which // is buffered), or it's going to take a while before // we actually stop. if c.stopping.Load().(bool) { return } // Fetch the URL and handle it. Make sure to Close the // response body (even if it gets replaced in the // Response object). fmt.Printf("%s\n", p.URL) httpResp, httpErr := c.fetcher.Fetch(p.URL) var respBody io.ReadCloser if httpErr == nil { respBody = httpResp.Body } // Invoke the handler (even if the fetcher errored // out). Errors in handling requests are fatal, crawl // will be aborted. err := c.handler.Handle(c, p.URL, p.Tag, p.Depth, httpResp, httpErr) if httpErr == nil { respBody.Close() // nolint } wb := new(leveldb.Batch) switch err { case nil: c.queue.Release(wb, p) case ErrRetryRequest: Must(c.queue.Retry(wb, p, errorRetryDelay)) default: log.Panicf("fatal error in handling %s: %v", p.URL, err) } // Write the result in our database. Must(c.db.Write(wb, nil)) } } // MustParseURLs parses a list of URLs and aborts on failure. func MustParseURLs(urls []string) []*url.URL { // Parse the seed URLs. var parsed []*url.URL for _, s := range urls { u, err := url.Parse(s) if err != nil { log.Fatalf("error parsing URL \"%s\": %v", s, err) } parsed = append(parsed, u) } return parsed } // NewCrawler creates a new Crawler object with the specified behavior. func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler) (*Crawler, error) { // Open the crawl database. db, err := newGobDB(path) if err != nil { return nil, err } c := &Crawler{ db: db, queue: &queue{db: db}, fetcher: f, handler: h, seeds: seeds, scope: scope, stopCh: make(chan bool), } c.stopping.Store(false) // Recover active tasks. if err := c.queue.Recover(); err != nil { return nil, err } return c, nil } // Run the crawl with the specified number of workers. This function // does not exit until all work is done (no URLs left in the queue). func (c *Crawler) Run(concurrency int) { // Load initial seeds into the queue. for _, u := range c.seeds { Must(c.Enqueue(Outlink{URL: u, Tag: TagPrimary}, 0)) } // Start some runners and wait until they're done. var wg sync.WaitGroup ch := c.process() for i := 0; i < concurrency; i++ { wg.Add(1) go func() { c.urlHandler(ch) wg.Done() }() } wg.Wait() } // Stop a running crawl. This will cause a running Run function to return. func (c *Crawler) Stop() { c.stopping.Store(true) close(c.stopCh) } // Close the database and release resources associated with the crawler state. func (c *Crawler) Close() { c.db.Close() // nolint } // FollowRedirects returns a Handler that follows HTTP redirects // and adds them to the queue for crawling. It will call the wrapped // handler on all requests regardless. func FollowRedirects(wrap Handler) Handler { return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { if herr := wrap.Handle(p, u, tag, depth, resp, err); herr != nil { return herr } if err != nil { return nil } location := resp.Header.Get("Location") if resp.StatusCode >= 300 && resp.StatusCode < 400 && location != "" { locationURL, uerr := resp.Request.URL.Parse(location) if uerr != nil { log.Printf("error parsing Location header: %v", uerr) } else { return p.Enqueue(Outlink{URL: locationURL, Tag: tag}, depth+1) } } return nil }) } // FilterErrors returns a Handler that forwards only requests with a // "successful" HTTP status code (anything < 400). When using this // wrapper, subsequent Handle calls will always have err set to nil. func FilterErrors(wrap Handler) Handler { return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { if err != nil { return nil } if resp.StatusCode >= 400 { return nil } return wrap.Handle(p, u, tag, depth, resp, nil) }) } // HandleRetries returns a Handler that will retry requests on // temporary errors (all transport-level errors are considered // temporary, as well as any HTTP status code >= 500). func HandleRetries(wrap Handler) Handler { return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { if err != nil || resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 { return ErrRetryRequest } return wrap.Handle(p, u, tag, depth, resp, nil) }) } // Must will abort the program with a message when we encounter an // error that we can't recover from. func Must(err error) { if err != nil { log.Panicf("fatal error: %v", err) } }