aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorale <ale@incal.net>2014-12-19 13:55:05 +0000
committerale <ale@incal.net>2014-12-19 13:55:05 +0000
commitb09f05f8137e5bbc27a0a306de0529c59d3f2c28 (patch)
tree37e66968c2eb0e361a1f284804a86fe339e68b78
downloadcrawl-b09f05f8137e5bbc27a0a306de0529c59d3f2c28.tar.gz
crawl-b09f05f8137e5bbc27a0a306de0529c59d3f2c28.zip
initial commit
-rw-r--r--cmd/crawl/crawl.go178
-rw-r--r--cmd/links/links.go75
-rw-r--r--crawler.go327
-rw-r--r--warc.go96
4 files changed, 676 insertions, 0 deletions
diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go
new file mode 100644
index 0000000..1e5f952
--- /dev/null
+++ b/cmd/crawl/crawl.go
@@ -0,0 +1,178 @@
+// A restartable crawler that dumps everything to a WARC file.
+
+package main
+
+import (
+ "bytes"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "net/url"
+ "os"
+ "regexp"
+ "strconv"
+ "strings"
+
+ "git.autistici.org/ale/crawl"
+ "github.com/PuerkitoBio/goquery"
+)
+
+var (
+ dbPath = flag.String("state", "crawldb", "crawl state database path")
+ concurrency = flag.Int("c", 10, "concurrent workers")
+ depth = flag.Int("depth", 10, "maximum link depth")
+ validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols")
+ outputFile = flag.String("output", "crawl.warc.gz", "output WARC file")
+
+ urlcssRx = regexp.MustCompile(`background.*:.*url\(["']?([^'"\)]+)["']?\)`)
+)
+
+var linkMatches = []struct {
+ tag string
+ attr string
+}{
+ {"a", "href"},
+ {"link", "href"},
+ {"img", "src"},
+ {"script", "src"},
+}
+
+func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error {
+ var outlinks []string
+
+ ctype := resp.Header.Get("Content-Type")
+ if strings.HasPrefix(ctype, "text/html") {
+ doc, err := goquery.NewDocumentFromResponse(resp)
+ if err != nil {
+ return err
+ }
+
+ for _, lm := range linkMatches {
+ doc.Find(fmt.Sprintf("%s[%s]", lm.tag, lm.attr)).Each(func(i int, s *goquery.Selection) {
+ val, _ := s.Attr(lm.attr)
+ outlinks = append(outlinks, val)
+ })
+ }
+ } else if strings.HasPrefix(ctype, "text/css") {
+ if data, err := ioutil.ReadAll(resp.Body); err == nil {
+ for _, val := range urlcssRx.FindAllStringSubmatch(string(data), -1) {
+ outlinks = append(outlinks, val[1])
+ }
+ }
+ }
+
+ // Uniquify and parse outbound links.
+ links := make(map[string]*url.URL)
+ for _, val := range outlinks {
+ if linkurl, err := resp.Request.URL.Parse(val); err == nil {
+ links[linkurl.String()] = linkurl
+ }
+ }
+ for _, link := range links {
+ //log.Printf("%s -> %s", u, link.String())
+ c.Enqueue(link, depth+1)
+ }
+
+ return nil
+}
+
+type fakeCloser struct {
+ io.Reader
+}
+
+func (f *fakeCloser) Close() error {
+ return nil
+}
+
+func hdr2str(h http.Header) []byte {
+ var b bytes.Buffer
+ h.Write(&b)
+ return b.Bytes()
+}
+
+type warcSaveHandler struct {
+ warc *crawl.WarcWriter
+ warcInfoID string
+}
+
+func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error {
+ data, derr := ioutil.ReadAll(resp.Body)
+ if derr != nil {
+ return err
+ }
+ resp.Body = &fakeCloser{bytes.NewReader(data)}
+
+ // Dump the request.
+ var b bytes.Buffer
+ resp.Request.Write(&b)
+ hdr := crawl.NewWarcHeader()
+ hdr.Set("WARC-Type", "request")
+ hdr.Set("WARC-Target-URI", resp.Request.URL.String())
+ hdr.Set("WARC-Warcinfo-ID", h.warcInfoID)
+ hdr.Set("Content-Length", strconv.Itoa(b.Len()))
+ w := h.warc.NewRecord(hdr)
+ w.Write(b.Bytes())
+ w.Close()
+
+ // Dump the response.
+ statusLine := fmt.Sprintf("HTTP/1.1 %s", resp.Status)
+ respPayload := bytes.Join([][]byte{
+ []byte(statusLine), hdr2str(resp.Header), data},
+ []byte{'\r', '\n'})
+ hdr = crawl.NewWarcHeader()
+ hdr.Set("WARC-Type", "response")
+ hdr.Set("WARC-Target-URI", resp.Request.URL.String())
+ hdr.Set("WARC-Warcinfo-ID", h.warcInfoID)
+ hdr.Set("Content-Length", strconv.Itoa(len(respPayload)))
+ w = h.warc.NewRecord(hdr)
+ w.Write(respPayload)
+ w.Close()
+
+ return extractLinks(c, u, depth, resp, err)
+}
+
+func NewSaveHandler(w *crawl.WarcWriter) crawl.Handler {
+ info := strings.Join([]string{
+ "Software: crawl/1.0\r\n",
+ "Format: WARC File Format 1.0\r\n",
+ "Conformsto: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf\r\n",
+ }, "")
+
+ hdr := crawl.NewWarcHeader()
+ hdr.Set("WARC-Type", "warcinfo")
+ hdr.Set("WARC-Warcinfo-ID", hdr.Get("WARC-Record-ID"))
+ hdr.Set("Content-Length", strconv.Itoa(len(info)))
+ hdrw := w.NewRecord(hdr)
+ io.WriteString(hdrw, info)
+ hdrw.Close()
+ return &warcSaveHandler{
+ warc: w,
+ warcInfoID: hdr.Get("WARC-Record-ID"),
+ }
+}
+
+func main() {
+ flag.Parse()
+
+ outf, err := os.Create(*outputFile)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ seeds := crawl.MustParseURLs(flag.Args())
+ scope := crawl.NewSeedScope(seeds, *depth, strings.Split(*validSchemes, ","))
+
+ w := crawl.NewWarcWriter(outf)
+ defer w.Close()
+
+ saver := NewSaveHandler(w)
+
+ crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), saver)
+ if err != nil {
+ log.Fatal(err)
+ }
+ crawler.Run()
+}
diff --git a/cmd/links/links.go b/cmd/links/links.go
new file mode 100644
index 0000000..3ba63be
--- /dev/null
+++ b/cmd/links/links.go
@@ -0,0 +1,75 @@
+// A restartable crawler that extracts links from HTML pages and
+// simply prints them.
+//
+
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "net/http"
+ "net/url"
+ "strings"
+
+ "git.autistici.org/ale/crawl"
+ "github.com/PuerkitoBio/goquery"
+)
+
+var (
+ dbPath = flag.String("state", "crawldb", "crawl state database path")
+ concurrency = flag.Int("c", 10, "concurrent workers")
+ depth = flag.Int("depth", 10, "maximum link depth")
+ validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols")
+)
+
+var linkMatches = []struct {
+ tag string
+ attr string
+}{
+ {"a", "href"},
+ {"link", "href"},
+ {"img", "src"},
+ {"script", "src"},
+}
+
+func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error {
+ if !strings.HasPrefix(resp.Header.Get("Content-Type"), "text/html") {
+ return nil
+ }
+
+ doc, err := goquery.NewDocumentFromResponse(resp)
+ if err != nil {
+ return err
+ }
+
+ links := make(map[string]*url.URL)
+
+ for _, lm := range linkMatches {
+ doc.Find(fmt.Sprintf("%s[%s]", lm.tag, lm.attr)).Each(func(i int, s *goquery.Selection) {
+ val, _ := s.Attr(lm.attr)
+ if linkurl, err := resp.Request.URL.Parse(val); err == nil {
+ links[linkurl.String()] = linkurl
+ }
+ })
+ }
+
+ for _, link := range links {
+ //log.Printf("%s -> %s", u, link.String())
+ c.Enqueue(link, depth+1)
+ }
+ return nil
+}
+
+func main() {
+ flag.Parse()
+
+ seeds := crawl.MustParseURLs(flag.Args())
+ scope := crawl.NewSeedScope(seeds, *depth, strings.Split(*validSchemes, ","))
+
+ crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), crawl.HandlerFunc(extractLinks))
+ if err != nil {
+ log.Fatal(err)
+ }
+ crawler.Run()
+}
diff --git a/crawler.go b/crawler.go
new file mode 100644
index 0000000..ed43b1f
--- /dev/null
+++ b/crawler.go
@@ -0,0 +1,327 @@
+package crawl
+
+import (
+ "bytes"
+ "encoding/gob"
+ "errors"
+ "fmt"
+ "log"
+ "net/http"
+ "net/url"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/PuerkitoBio/purell"
+ "github.com/jmhodges/levigo"
+)
+
+type gobDB struct {
+ *levigo.DB
+}
+
+func newGobDB(path string) (*gobDB, error) {
+ opts := levigo.NewOptions()
+ opts.SetCreateIfMissing(true)
+ opts.SetCache(levigo.NewLRUCache(2 << 20))
+ opts.SetFilterPolicy(levigo.NewBloomFilter(10))
+ db, err := levigo.Open(path, opts)
+ if err != nil {
+ return nil, err
+ }
+ return &gobDB{db}, nil
+}
+
+func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) error {
+ var b bytes.Buffer
+ if err := gob.NewEncoder(&b).Encode(obj); err != nil {
+ return err
+ }
+ return db.Put(wo, key, b.Bytes())
+}
+
+func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error {
+ data, err := db.Get(ro, key)
+ if err != nil {
+ return err
+ }
+ if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(obj); err != nil {
+ return err
+ }
+ return nil
+}
+
+type URLInfo struct {
+ URL string
+ StatusCode int
+ CrawledAt time.Time
+ Error error
+}
+
+type Scope interface {
+ Check(*url.URL, int) bool
+}
+
+type Fetcher interface {
+ Fetch(string) (*http.Response, error)
+}
+
+type FetcherFunc func(string) (*http.Response, error)
+
+func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
+ return f(u)
+}
+
+type Handler interface {
+ Handle(*Crawler, string, int, *http.Response, error) error
+}
+
+type HandlerFunc func(*Crawler, string, int, *http.Response, error) error
+
+func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Response, err error) error {
+ return f(db, u, depth, resp, err)
+}
+
+// UrlDb is the database of crawls (either pending or done).
+type Crawler struct {
+ db *gobDB
+ seeds []*url.URL
+ scope Scope
+ fetcher Fetcher
+ handler Handler
+
+ enqueueMx sync.Mutex
+}
+
+type QueuePair struct {
+ Key []byte
+ URL string
+ Depth int
+}
+
+// Update this URLInfo entry in the crawl database.
+func (c *Crawler) UpdateURL(info *URLInfo) error {
+ wo := levigo.NewWriteOptions()
+ defer wo.Close()
+ return c.db.PutObj(wo, []byte(fmt.Sprintf("url/%s", info.URL)), info)
+}
+
+// Enqueue a (possibly new) URL for processing.
+func (c *Crawler) Enqueue(u *url.URL, depth int) {
+ // Normalize the URL.
+ urlStr := purell.NormalizeURL(u, purell.FlagsSafe|purell.FlagRemoveDotSegments|purell.FlagRemoveDuplicateSlashes|purell.FlagRemoveFragment|purell.FlagRemoveDirectoryIndex|purell.FlagSortQuery)
+
+ // See if it's in scope.
+ if !c.scope.Check(u, depth) {
+ return
+ }
+
+ c.enqueueMx.Lock()
+ defer c.enqueueMx.Unlock()
+
+ // Check if we've already seen it.
+ var info URLInfo
+ ro := levigo.NewReadOptions()
+ defer ro.Close()
+ ukey := []byte(fmt.Sprintf("url/%s", urlStr))
+ if err := c.db.GetObj(ro, ukey, &info); err == nil {
+ return
+ }
+
+ // Create a unique key using the URL and the current timestamp.
+ qkey := []byte(fmt.Sprintf("queue/%d/%s", time.Now().Unix(), urlStr))
+
+ // Store the URL in the queue, and store an empty URLInfo to
+ // make sure that subsequent calls to Enqueue with the same
+ // URL will fail.
+ wo := levigo.NewWriteOptions()
+ defer wo.Close()
+ c.db.PutObj(wo, qkey, &QueuePair{Key: qkey, URL: urlStr, Depth: depth})
+ c.db.PutObj(wo, ukey, &info)
+}
+
+// Scan the queue for URLs until there are no more.
+func (c *Crawler) process() <-chan QueuePair {
+ ch := make(chan QueuePair)
+ go func() {
+ queuePrefix := []byte("queue/")
+ for range time.Tick(2 * time.Second) {
+ n := 0
+
+ // Scan the queue using a snapshot, to ignore
+ // new URLs that might be added after this.
+ s := c.db.NewSnapshot()
+ ro := levigo.NewReadOptions()
+ ro.SetSnapshot(s)
+
+ iter := c.db.NewIterator(ro)
+ for iter.Seek(queuePrefix); iter.Valid() && bytes.HasPrefix(iter.Key(), queuePrefix); iter.Next() {
+ var p QueuePair
+ if err := gob.NewDecoder(bytes.NewBuffer(iter.Value())).Decode(&p); err != nil {
+ continue
+ }
+ ch <- p
+ n++
+ }
+ iter.Close()
+
+ ro.Close()
+ c.db.ReleaseSnapshot(s)
+
+ if n == 0 {
+ break
+ }
+ }
+ close(ch)
+ }()
+ return ch
+}
+
+// Main worker loop.
+func (c *Crawler) urlHandler(queue <-chan QueuePair) {
+ for p := range queue {
+ // Fetch the URL and handle it. Make sure to Close the
+ // response body (even if it gets replaced in the
+ // Response object).
+ fmt.Printf("%s\n", p.URL)
+ httpResp, httpErr := c.fetcher.Fetch(p.URL)
+ respBody := httpResp.Body
+ err := c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
+ if httpErr == nil {
+ respBody.Close()
+ }
+
+ // Remove the URL from the queue if the handler was successful.
+ if err == nil {
+ wo := levigo.NewWriteOptions()
+ c.db.Delete(wo, p.Key)
+ wo.Close()
+ } else {
+ log.Printf("error handling %s: %v", p.URL, err)
+ }
+ }
+}
+
+type seedScope struct {
+ seeds []*url.URL
+ schemes map[string]struct{}
+ maxDepth int
+}
+
+func (s *seedScope) Check(u *url.URL, depth int) bool {
+ // Ignore non-allowed schemes.
+ if _, ok := s.schemes[u.Scheme]; !ok {
+ return false
+ }
+
+ // Do not crawl beyond maxDepth.
+ if depth > s.maxDepth {
+ return false
+ }
+
+ // Check each seed prefix.
+ for _, seed := range s.seeds {
+ if u.Host == seed.Host && strings.HasPrefix(u.Path, seed.Path) {
+ return true
+ }
+ }
+ return false
+}
+
+// NewSeedScope returns a Scope that will only allow crawling the seed
+// domains, and not beyond the specified maximum link depth.
+func NewSeedScope(seeds []*url.URL, maxDepth int, allowedSchemes []string) Scope {
+ scope := &seedScope{
+ seeds: seeds,
+ maxDepth: maxDepth,
+ schemes: make(map[string]struct{}),
+ }
+ for _, s := range allowedSchemes {
+ scope.schemes[s] = struct{}{}
+ }
+ return scope
+}
+
+func MustParseURLs(urls []string) []*url.URL {
+ // Parse the seed URLs.
+ var parsed []*url.URL
+ for _, s := range urls {
+ u, err := url.Parse(s)
+ if err != nil {
+ log.Fatalf("error parsing seed \"%s\": %v", s, err)
+ }
+ parsed = append(parsed, u)
+ }
+ return parsed
+}
+
+// NewCrawler creates a new Crawler object with the specified behavior.
+func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler) (*Crawler, error) {
+ // Open the crawl database.
+ db, err := newGobDB(path)
+ if err != nil {
+ return nil, err
+ }
+ c := &Crawler{
+ db: db,
+ fetcher: f,
+ handler: &standardPageHandler{h},
+ seeds: seeds,
+ scope: scope,
+ }
+ return c, nil
+}
+
+// Run the crawl, does not exit until it is done.
+func (c *Crawler) Run() {
+ // Load initial seeds into the queue.
+ for _, u := range c.seeds {
+ c.Enqueue(u, 0)
+ }
+
+ // Start some runners and wait until they're done.
+ var wg sync.WaitGroup
+ ch := c.process()
+ for i := 0; i < 3; i++ {
+ wg.Add(1)
+ go func() {
+ c.urlHandler(ch)
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+}
+
+// Standard page handler, follows redirects and invokes a child
+// handler when status == 200.
+type standardPageHandler struct {
+ h Handler
+}
+
+func (wrap *standardPageHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error {
+ info := &URLInfo{URL: u, CrawledAt: time.Now()}
+ if err == nil {
+ info.StatusCode = resp.StatusCode
+ if resp.StatusCode == 200 {
+ err = wrap.h.Handle(c, u, depth, resp, err)
+ } else if resp.StatusCode > 300 && resp.StatusCode < 400 {
+ location := resp.Header.Get("Location")
+ if location != "" {
+ locationUrl, err := resp.Request.URL.Parse(location)
+ if err != nil {
+ log.Printf("error parsing Location header: %v", err)
+ } else {
+ c.Enqueue(locationUrl, depth+1)
+ }
+ }
+ } else {
+ err = errors.New(resp.Status)
+ }
+ }
+ info.Error = err
+
+ //log.Printf("[CRAWL] %+v", info)
+
+ c.UpdateURL(info)
+ return nil
+}
diff --git a/warc.go b/warc.go
new file mode 100644
index 0000000..66cf417
--- /dev/null
+++ b/warc.go
@@ -0,0 +1,96 @@
+package crawl
+
+import (
+ "fmt"
+ "io"
+ "time"
+
+ "compress/gzip"
+
+ "code.google.com/p/go-uuid/uuid"
+)
+
+var (
+ warcTimeFmt = time.RFC3339
+ warcVersion = "WARC/1.0"
+ warcContentTypes = map[string]string{
+ "warcinfo": "application/warc-fields",
+ "response": "application/http; msgtype=response",
+ "request": "application/http; msgtype=request",
+ "metadata": "application/warc-fields",
+ }
+)
+
+// A Warc header. Header field names are case-sensitive.
+type WarcHeader map[string]string
+
+// Set a header to the specified value. Multiple values are not
+// supported.
+func (h WarcHeader) Set(key, value string) {
+ h[key] = value
+
+ // Keep Content-Type in sync with WARC-Type.
+ if key == "WARC-Type" {
+ if ct, ok := warcContentTypes[value]; ok {
+ h["Content-Type"] = ct
+ } else {
+ h["Content-Type"] = "application/octet-stream"
+ }
+ }
+}
+
+// Get the value of a header. If not found, returns an empty string.
+func (h WarcHeader) Get(key string) string {
+ return h[key]
+}
+
+// Encode the header to a Writer.
+func (h WarcHeader) Encode(w io.Writer) {
+ fmt.Fprintf(w, "%s\r\n", warcVersion)
+ for hdr, value := range h {
+ fmt.Fprintf(w, "%s: %s\r\n", hdr, value)
+ }
+ fmt.Fprintf(w, "\r\n")
+}
+
+// NewWarcHeader returns a WarcHeader with its own unique ID and the
+// current timestamp.
+func NewWarcHeader() WarcHeader {
+ h := make(WarcHeader)
+ h.Set("WARC-Record-ID", fmt.Sprintf("<%s>", uuid.NewUUID().URN()))
+ h.Set("WARC-Date", time.Now().Format(warcTimeFmt))
+ h.Set("Content-Type", "application/octet-stream")
+ return h
+}
+
+// WarcWriter can write records to a file in WARC format.
+type WarcWriter struct {
+ writer io.WriteCloser
+}
+
+type recordWriter struct {
+ io.Writer
+}
+
+func (rw *recordWriter) Close() error {
+ // Add the end-of-record marker.
+ fmt.Fprintf(rw, "\r\n\r\n")
+ return nil
+}
+
+// NewRecord starts a new WARC record with the provided header. The
+// caller must call Close on the returned writer before creating the
+// next record.
+func (w *WarcWriter) NewRecord(hdr WarcHeader) io.WriteCloser {
+ hdr.Encode(w.writer)
+ return &recordWriter{w.writer}
+}
+
+// Close the WARC writer and flush all buffers.
+func (w *WarcWriter) Close() error {
+ return w.writer.Close()
+}
+
+func NewWarcWriter(w io.WriteCloser) *WarcWriter {
+ return &WarcWriter{gzip.NewWriter(w)}
+}