From b09f05f8137e5bbc27a0a306de0529c59d3f2c28 Mon Sep 17 00:00:00 2001 From: ale Date: Fri, 19 Dec 2014 13:55:05 +0000 Subject: initial commit --- crawler.go | 327 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 327 insertions(+) create mode 100644 crawler.go (limited to 'crawler.go') diff --git a/crawler.go b/crawler.go new file mode 100644 index 0000000..ed43b1f --- /dev/null +++ b/crawler.go @@ -0,0 +1,327 @@ +package crawl + +import ( + "bytes" + "encoding/gob" + "errors" + "fmt" + "log" + "net/http" + "net/url" + "strings" + "sync" + "time" + + "github.com/PuerkitoBio/purell" + "github.com/jmhodges/levigo" +) + +type gobDB struct { + *levigo.DB +} + +func newGobDB(path string) (*gobDB, error) { + opts := levigo.NewOptions() + opts.SetCreateIfMissing(true) + opts.SetCache(levigo.NewLRUCache(2 << 20)) + opts.SetFilterPolicy(levigo.NewBloomFilter(10)) + db, err := levigo.Open(path, opts) + if err != nil { + return nil, err + } + return &gobDB{db}, nil +} + +func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) error { + var b bytes.Buffer + if err := gob.NewEncoder(&b).Encode(obj); err != nil { + return err + } + return db.Put(wo, key, b.Bytes()) +} + +func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error { + data, err := db.Get(ro, key) + if err != nil { + return err + } + if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(obj); err != nil { + return err + } + return nil +} + +type URLInfo struct { + URL string + StatusCode int + CrawledAt time.Time + Error error +} + +type Scope interface { + Check(*url.URL, int) bool +} + +type Fetcher interface { + Fetch(string) (*http.Response, error) +} + +type FetcherFunc func(string) (*http.Response, error) + +func (f FetcherFunc) Fetch(u string) (*http.Response, error) { + return f(u) +} + +type Handler interface { + Handle(*Crawler, string, int, *http.Response, error) error +} + +type HandlerFunc func(*Crawler, string, int, *http.Response, error) error + +func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Response, err error) error { + return f(db, u, depth, resp, err) +} + +// UrlDb is the database of crawls (either pending or done). +type Crawler struct { + db *gobDB + seeds []*url.URL + scope Scope + fetcher Fetcher + handler Handler + + enqueueMx sync.Mutex +} + +type QueuePair struct { + Key []byte + URL string + Depth int +} + +// Update this URLInfo entry in the crawl database. +func (c *Crawler) UpdateURL(info *URLInfo) error { + wo := levigo.NewWriteOptions() + defer wo.Close() + return c.db.PutObj(wo, []byte(fmt.Sprintf("url/%s", info.URL)), info) +} + +// Enqueue a (possibly new) URL for processing. +func (c *Crawler) Enqueue(u *url.URL, depth int) { + // Normalize the URL. + urlStr := purell.NormalizeURL(u, purell.FlagsSafe|purell.FlagRemoveDotSegments|purell.FlagRemoveDuplicateSlashes|purell.FlagRemoveFragment|purell.FlagRemoveDirectoryIndex|purell.FlagSortQuery) + + // See if it's in scope. + if !c.scope.Check(u, depth) { + return + } + + c.enqueueMx.Lock() + defer c.enqueueMx.Unlock() + + // Check if we've already seen it. + var info URLInfo + ro := levigo.NewReadOptions() + defer ro.Close() + ukey := []byte(fmt.Sprintf("url/%s", urlStr)) + if err := c.db.GetObj(ro, ukey, &info); err == nil { + return + } + + // Create a unique key using the URL and the current timestamp. + qkey := []byte(fmt.Sprintf("queue/%d/%s", time.Now().Unix(), urlStr)) + + // Store the URL in the queue, and store an empty URLInfo to + // make sure that subsequent calls to Enqueue with the same + // URL will fail. + wo := levigo.NewWriteOptions() + defer wo.Close() + c.db.PutObj(wo, qkey, &QueuePair{Key: qkey, URL: urlStr, Depth: depth}) + c.db.PutObj(wo, ukey, &info) +} + +// Scan the queue for URLs until there are no more. +func (c *Crawler) process() <-chan QueuePair { + ch := make(chan QueuePair) + go func() { + queuePrefix := []byte("queue/") + for range time.Tick(2 * time.Second) { + n := 0 + + // Scan the queue using a snapshot, to ignore + // new URLs that might be added after this. + s := c.db.NewSnapshot() + ro := levigo.NewReadOptions() + ro.SetSnapshot(s) + + iter := c.db.NewIterator(ro) + for iter.Seek(queuePrefix); iter.Valid() && bytes.HasPrefix(iter.Key(), queuePrefix); iter.Next() { + var p QueuePair + if err := gob.NewDecoder(bytes.NewBuffer(iter.Value())).Decode(&p); err != nil { + continue + } + ch <- p + n++ + } + iter.Close() + + ro.Close() + c.db.ReleaseSnapshot(s) + + if n == 0 { + break + } + } + close(ch) + }() + return ch +} + +// Main worker loop. +func (c *Crawler) urlHandler(queue <-chan QueuePair) { + for p := range queue { + // Fetch the URL and handle it. Make sure to Close the + // response body (even if it gets replaced in the + // Response object). + fmt.Printf("%s\n", p.URL) + httpResp, httpErr := c.fetcher.Fetch(p.URL) + respBody := httpResp.Body + err := c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr) + if httpErr == nil { + respBody.Close() + } + + // Remove the URL from the queue if the handler was successful. + if err == nil { + wo := levigo.NewWriteOptions() + c.db.Delete(wo, p.Key) + wo.Close() + } else { + log.Printf("error handling %s: %v", p.URL, err) + } + } +} + +type seedScope struct { + seeds []*url.URL + schemes map[string]struct{} + maxDepth int +} + +func (s *seedScope) Check(u *url.URL, depth int) bool { + // Ignore non-allowed schemes. + if _, ok := s.schemes[u.Scheme]; !ok { + return false + } + + // Do not crawl beyond maxDepth. + if depth > s.maxDepth { + return false + } + + // Check each seed prefix. + for _, seed := range s.seeds { + if u.Host == seed.Host && strings.HasPrefix(u.Path, seed.Path) { + return true + } + } + return false +} + +// NewSeedScope returns a Scope that will only allow crawling the seed +// domains, and not beyond the specified maximum link depth. +func NewSeedScope(seeds []*url.URL, maxDepth int, allowedSchemes []string) Scope { + scope := &seedScope{ + seeds: seeds, + maxDepth: maxDepth, + schemes: make(map[string]struct{}), + } + for _, s := range allowedSchemes { + scope.schemes[s] = struct{}{} + } + return scope +} + +func MustParseURLs(urls []string) []*url.URL { + // Parse the seed URLs. + var parsed []*url.URL + for _, s := range urls { + u, err := url.Parse(s) + if err != nil { + log.Fatalf("error parsing seed \"%s\": %v", s, err) + } + parsed = append(parsed, u) + } + return parsed +} + +// NewCrawler creates a new Crawler object with the specified behavior. +func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler) (*Crawler, error) { + // Open the crawl database. + db, err := newGobDB(path) + if err != nil { + return nil, err + } + c := &Crawler{ + db: db, + fetcher: f, + handler: &standardPageHandler{h}, + seeds: seeds, + scope: scope, + } + return c, nil +} + +// Run the crawl, does not exit until it is done. +func (c *Crawler) Run() { + // Load initial seeds into the queue. + for _, u := range c.seeds { + c.Enqueue(u, 0) + } + + // Start some runners and wait until they're done. + var wg sync.WaitGroup + ch := c.process() + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + c.urlHandler(ch) + wg.Done() + }() + } + wg.Wait() +} + +// Standard page handler, follows redirects and invokes a child +// handler when status == 200. +type standardPageHandler struct { + h Handler +} + +func (wrap *standardPageHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error { + info := &URLInfo{URL: u, CrawledAt: time.Now()} + if err == nil { + info.StatusCode = resp.StatusCode + if resp.StatusCode == 200 { + err = wrap.h.Handle(c, u, depth, resp, err) + } else if resp.StatusCode > 300 && resp.StatusCode < 400 { + location := resp.Header.Get("Location") + if location != "" { + locationUrl, err := resp.Request.URL.Parse(location) + if err != nil { + log.Printf("error parsing Location header: %v", err) + } else { + c.Enqueue(locationUrl, depth+1) + } + } + } else { + err = errors.New(resp.Status) + } + } + info.Error = err + + //log.Printf("[CRAWL] %+v", info) + + c.UpdateURL(info) + return nil +} -- cgit v1.2.3-54-g00ecf