aboutsummaryrefslogtreecommitdiff
path: root/crawler.go
diff options
context:
space:
mode:
authorale <ale@incal.net>2014-12-19 13:55:05 +0000
committerale <ale@incal.net>2014-12-19 13:55:05 +0000
commitb09f05f8137e5bbc27a0a306de0529c59d3f2c28 (patch)
tree37e66968c2eb0e361a1f284804a86fe339e68b78 /crawler.go
downloadcrawl-b09f05f8137e5bbc27a0a306de0529c59d3f2c28.tar.gz
crawl-b09f05f8137e5bbc27a0a306de0529c59d3f2c28.zip
initial commit
Diffstat (limited to 'crawler.go')
-rw-r--r--crawler.go327
1 files changed, 327 insertions, 0 deletions
diff --git a/crawler.go b/crawler.go
new file mode 100644
index 0000000..ed43b1f
--- /dev/null
+++ b/crawler.go
@@ -0,0 +1,327 @@
+package crawl
+
+import (
+ "bytes"
+ "encoding/gob"
+ "errors"
+ "fmt"
+ "log"
+ "net/http"
+ "net/url"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/PuerkitoBio/purell"
+ "github.com/jmhodges/levigo"
+)
+
+type gobDB struct {
+ *levigo.DB
+}
+
+func newGobDB(path string) (*gobDB, error) {
+ opts := levigo.NewOptions()
+ opts.SetCreateIfMissing(true)
+ opts.SetCache(levigo.NewLRUCache(2 << 20))
+ opts.SetFilterPolicy(levigo.NewBloomFilter(10))
+ db, err := levigo.Open(path, opts)
+ if err != nil {
+ return nil, err
+ }
+ return &gobDB{db}, nil
+}
+
+func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) error {
+ var b bytes.Buffer
+ if err := gob.NewEncoder(&b).Encode(obj); err != nil {
+ return err
+ }
+ return db.Put(wo, key, b.Bytes())
+}
+
+func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error {
+ data, err := db.Get(ro, key)
+ if err != nil {
+ return err
+ }
+ if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(obj); err != nil {
+ return err
+ }
+ return nil
+}
+
+type URLInfo struct {
+ URL string
+ StatusCode int
+ CrawledAt time.Time
+ Error error
+}
+
+type Scope interface {
+ Check(*url.URL, int) bool
+}
+
+type Fetcher interface {
+ Fetch(string) (*http.Response, error)
+}
+
+type FetcherFunc func(string) (*http.Response, error)
+
+func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
+ return f(u)
+}
+
+type Handler interface {
+ Handle(*Crawler, string, int, *http.Response, error) error
+}
+
+type HandlerFunc func(*Crawler, string, int, *http.Response, error) error
+
+func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Response, err error) error {
+ return f(db, u, depth, resp, err)
+}
+
+// UrlDb is the database of crawls (either pending or done).
+type Crawler struct {
+ db *gobDB
+ seeds []*url.URL
+ scope Scope
+ fetcher Fetcher
+ handler Handler
+
+ enqueueMx sync.Mutex
+}
+
+type QueuePair struct {
+ Key []byte
+ URL string
+ Depth int
+}
+
+// Update this URLInfo entry in the crawl database.
+func (c *Crawler) UpdateURL(info *URLInfo) error {
+ wo := levigo.NewWriteOptions()
+ defer wo.Close()
+ return c.db.PutObj(wo, []byte(fmt.Sprintf("url/%s", info.URL)), info)
+}
+
+// Enqueue a (possibly new) URL for processing.
+func (c *Crawler) Enqueue(u *url.URL, depth int) {
+ // Normalize the URL.
+ urlStr := purell.NormalizeURL(u, purell.FlagsSafe|purell.FlagRemoveDotSegments|purell.FlagRemoveDuplicateSlashes|purell.FlagRemoveFragment|purell.FlagRemoveDirectoryIndex|purell.FlagSortQuery)
+
+ // See if it's in scope.
+ if !c.scope.Check(u, depth) {
+ return
+ }
+
+ c.enqueueMx.Lock()
+ defer c.enqueueMx.Unlock()
+
+ // Check if we've already seen it.
+ var info URLInfo
+ ro := levigo.NewReadOptions()
+ defer ro.Close()
+ ukey := []byte(fmt.Sprintf("url/%s", urlStr))
+ if err := c.db.GetObj(ro, ukey, &info); err == nil {
+ return
+ }
+
+ // Create a unique key using the URL and the current timestamp.
+ qkey := []byte(fmt.Sprintf("queue/%d/%s", time.Now().Unix(), urlStr))
+
+ // Store the URL in the queue, and store an empty URLInfo to
+ // make sure that subsequent calls to Enqueue with the same
+ // URL will fail.
+ wo := levigo.NewWriteOptions()
+ defer wo.Close()
+ c.db.PutObj(wo, qkey, &QueuePair{Key: qkey, URL: urlStr, Depth: depth})
+ c.db.PutObj(wo, ukey, &info)
+}
+
+// Scan the queue for URLs until there are no more.
+func (c *Crawler) process() <-chan QueuePair {
+ ch := make(chan QueuePair)
+ go func() {
+ queuePrefix := []byte("queue/")
+ for range time.Tick(2 * time.Second) {
+ n := 0
+
+ // Scan the queue using a snapshot, to ignore
+ // new URLs that might be added after this.
+ s := c.db.NewSnapshot()
+ ro := levigo.NewReadOptions()
+ ro.SetSnapshot(s)
+
+ iter := c.db.NewIterator(ro)
+ for iter.Seek(queuePrefix); iter.Valid() && bytes.HasPrefix(iter.Key(), queuePrefix); iter.Next() {
+ var p QueuePair
+ if err := gob.NewDecoder(bytes.NewBuffer(iter.Value())).Decode(&p); err != nil {
+ continue
+ }
+ ch <- p
+ n++
+ }
+ iter.Close()
+
+ ro.Close()
+ c.db.ReleaseSnapshot(s)
+
+ if n == 0 {
+ break
+ }
+ }
+ close(ch)
+ }()
+ return ch
+}
+
+// Main worker loop.
+func (c *Crawler) urlHandler(queue <-chan QueuePair) {
+ for p := range queue {
+ // Fetch the URL and handle it. Make sure to Close the
+ // response body (even if it gets replaced in the
+ // Response object).
+ fmt.Printf("%s\n", p.URL)
+ httpResp, httpErr := c.fetcher.Fetch(p.URL)
+ respBody := httpResp.Body
+ err := c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
+ if httpErr == nil {
+ respBody.Close()
+ }
+
+ // Remove the URL from the queue if the handler was successful.
+ if err == nil {
+ wo := levigo.NewWriteOptions()
+ c.db.Delete(wo, p.Key)
+ wo.Close()
+ } else {
+ log.Printf("error handling %s: %v", p.URL, err)
+ }
+ }
+}
+
+type seedScope struct {
+ seeds []*url.URL
+ schemes map[string]struct{}
+ maxDepth int
+}
+
+func (s *seedScope) Check(u *url.URL, depth int) bool {
+ // Ignore non-allowed schemes.
+ if _, ok := s.schemes[u.Scheme]; !ok {
+ return false
+ }
+
+ // Do not crawl beyond maxDepth.
+ if depth > s.maxDepth {
+ return false
+ }
+
+ // Check each seed prefix.
+ for _, seed := range s.seeds {
+ if u.Host == seed.Host && strings.HasPrefix(u.Path, seed.Path) {
+ return true
+ }
+ }
+ return false
+}
+
+// NewSeedScope returns a Scope that will only allow crawling the seed
+// domains, and not beyond the specified maximum link depth.
+func NewSeedScope(seeds []*url.URL, maxDepth int, allowedSchemes []string) Scope {
+ scope := &seedScope{
+ seeds: seeds,
+ maxDepth: maxDepth,
+ schemes: make(map[string]struct{}),
+ }
+ for _, s := range allowedSchemes {
+ scope.schemes[s] = struct{}{}
+ }
+ return scope
+}
+
+func MustParseURLs(urls []string) []*url.URL {
+ // Parse the seed URLs.
+ var parsed []*url.URL
+ for _, s := range urls {
+ u, err := url.Parse(s)
+ if err != nil {
+ log.Fatalf("error parsing seed \"%s\": %v", s, err)
+ }
+ parsed = append(parsed, u)
+ }
+ return parsed
+}
+
+// NewCrawler creates a new Crawler object with the specified behavior.
+func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler) (*Crawler, error) {
+ // Open the crawl database.
+ db, err := newGobDB(path)
+ if err != nil {
+ return nil, err
+ }
+ c := &Crawler{
+ db: db,
+ fetcher: f,
+ handler: &standardPageHandler{h},
+ seeds: seeds,
+ scope: scope,
+ }
+ return c, nil
+}
+
+// Run the crawl, does not exit until it is done.
+func (c *Crawler) Run() {
+ // Load initial seeds into the queue.
+ for _, u := range c.seeds {
+ c.Enqueue(u, 0)
+ }
+
+ // Start some runners and wait until they're done.
+ var wg sync.WaitGroup
+ ch := c.process()
+ for i := 0; i < 3; i++ {
+ wg.Add(1)
+ go func() {
+ c.urlHandler(ch)
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+}
+
+// Standard page handler, follows redirects and invokes a child
+// handler when status == 200.
+type standardPageHandler struct {
+ h Handler
+}
+
+func (wrap *standardPageHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error {
+ info := &URLInfo{URL: u, CrawledAt: time.Now()}
+ if err == nil {
+ info.StatusCode = resp.StatusCode
+ if resp.StatusCode == 200 {
+ err = wrap.h.Handle(c, u, depth, resp, err)
+ } else if resp.StatusCode > 300 && resp.StatusCode < 400 {
+ location := resp.Header.Get("Location")
+ if location != "" {
+ locationUrl, err := resp.Request.URL.Parse(location)
+ if err != nil {
+ log.Printf("error parsing Location header: %v", err)
+ } else {
+ c.Enqueue(locationUrl, depth+1)
+ }
+ }
+ } else {
+ err = errors.New(resp.Status)
+ }
+ }
+ info.Error = err
+
+ //log.Printf("[CRAWL] %+v", info)
+
+ c.UpdateURL(info)
+ return nil
+}