From b09f05f8137e5bbc27a0a306de0529c59d3f2c28 Mon Sep 17 00:00:00 2001 From: ale Date: Fri, 19 Dec 2014 13:55:05 +0000 Subject: initial commit --- cmd/crawl/crawl.go | 178 +++++++++++++++++++++++++++++ cmd/links/links.go | 75 ++++++++++++ crawler.go | 327 +++++++++++++++++++++++++++++++++++++++++++++++++++++ warc.go | 96 ++++++++++++++++ 4 files changed, 676 insertions(+) create mode 100644 cmd/crawl/crawl.go create mode 100644 cmd/links/links.go create mode 100644 crawler.go create mode 100644 warc.go diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go new file mode 100644 index 0000000..1e5f952 --- /dev/null +++ b/cmd/crawl/crawl.go @@ -0,0 +1,178 @@ +// A restartable crawler that dumps everything to a WARC file. + +package main + +import ( + "bytes" + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "net/url" + "os" + "regexp" + "strconv" + "strings" + + "git.autistici.org/ale/crawl" + "github.com/PuerkitoBio/goquery" +) + +var ( + dbPath = flag.String("state", "crawldb", "crawl state database path") + concurrency = flag.Int("c", 10, "concurrent workers") + depth = flag.Int("depth", 10, "maximum link depth") + validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols") + outputFile = flag.String("output", "crawl.warc.gz", "output WARC file") + + urlcssRx = regexp.MustCompile(`background.*:.*url\(["']?([^'"\)]+)["']?\)`) +) + +var linkMatches = []struct { + tag string + attr string +}{ + {"a", "href"}, + {"link", "href"}, + {"img", "src"}, + {"script", "src"}, +} + +func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error { + var outlinks []string + + ctype := resp.Header.Get("Content-Type") + if strings.HasPrefix(ctype, "text/html") { + doc, err := goquery.NewDocumentFromResponse(resp) + if err != nil { + return err + } + + for _, lm := range linkMatches { + doc.Find(fmt.Sprintf("%s[%s]", lm.tag, lm.attr)).Each(func(i int, s *goquery.Selection) { + val, _ := s.Attr(lm.attr) + outlinks = append(outlinks, val) + }) + } + } else if strings.HasPrefix(ctype, "text/css") { + if data, err := ioutil.ReadAll(resp.Body); err == nil { + for _, val := range urlcssRx.FindAllStringSubmatch(string(data), -1) { + outlinks = append(outlinks, val[1]) + } + } + } + + // Uniquify and parse outbound links. + links := make(map[string]*url.URL) + for _, val := range outlinks { + if linkurl, err := resp.Request.URL.Parse(val); err == nil { + links[linkurl.String()] = linkurl + } + } + for _, link := range links { + //log.Printf("%s -> %s", u, link.String()) + c.Enqueue(link, depth+1) + } + + return nil +} + +type fakeCloser struct { + io.Reader +} + +func (f *fakeCloser) Close() error { + return nil +} + +func hdr2str(h http.Header) []byte { + var b bytes.Buffer + h.Write(&b) + return b.Bytes() +} + +type warcSaveHandler struct { + warc *crawl.WarcWriter + warcInfoID string +} + +func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error { + data, derr := ioutil.ReadAll(resp.Body) + if derr != nil { + return err + } + resp.Body = &fakeCloser{bytes.NewReader(data)} + + // Dump the request. + var b bytes.Buffer + resp.Request.Write(&b) + hdr := crawl.NewWarcHeader() + hdr.Set("WARC-Type", "request") + hdr.Set("WARC-Target-URI", resp.Request.URL.String()) + hdr.Set("WARC-Warcinfo-ID", h.warcInfoID) + hdr.Set("Content-Length", strconv.Itoa(b.Len())) + w := h.warc.NewRecord(hdr) + w.Write(b.Bytes()) + w.Close() + + // Dump the response. + statusLine := fmt.Sprintf("HTTP/1.1 %s", resp.Status) + respPayload := bytes.Join([][]byte{ + []byte(statusLine), hdr2str(resp.Header), data}, + []byte{'\r', '\n'}) + hdr = crawl.NewWarcHeader() + hdr.Set("WARC-Type", "response") + hdr.Set("WARC-Target-URI", resp.Request.URL.String()) + hdr.Set("WARC-Warcinfo-ID", h.warcInfoID) + hdr.Set("Content-Length", strconv.Itoa(len(respPayload))) + w = h.warc.NewRecord(hdr) + w.Write(respPayload) + w.Close() + + return extractLinks(c, u, depth, resp, err) +} + +func NewSaveHandler(w *crawl.WarcWriter) crawl.Handler { + info := strings.Join([]string{ + "Software: crawl/1.0\r\n", + "Format: WARC File Format 1.0\r\n", + "Conformsto: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf\r\n", + }, "") + + hdr := crawl.NewWarcHeader() + hdr.Set("WARC-Type", "warcinfo") + hdr.Set("WARC-Warcinfo-ID", hdr.Get("WARC-Record-ID")) + hdr.Set("Content-Length", strconv.Itoa(len(info))) + hdrw := w.NewRecord(hdr) + io.WriteString(hdrw, info) + hdrw.Close() + return &warcSaveHandler{ + warc: w, + warcInfoID: hdr.Get("WARC-Record-ID"), + } +} + +func main() { + flag.Parse() + + outf, err := os.Create(*outputFile) + if err != nil { + log.Fatal(err) + } + + seeds := crawl.MustParseURLs(flag.Args()) + scope := crawl.NewSeedScope(seeds, *depth, strings.Split(*validSchemes, ",")) + + w := crawl.NewWarcWriter(outf) + defer w.Close() + + saver := NewSaveHandler(w) + + crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), saver) + if err != nil { + log.Fatal(err) + } + crawler.Run() +} diff --git a/cmd/links/links.go b/cmd/links/links.go new file mode 100644 index 0000000..3ba63be --- /dev/null +++ b/cmd/links/links.go @@ -0,0 +1,75 @@ +// A restartable crawler that extracts links from HTML pages and +// simply prints them. +// + +package main + +import ( + "flag" + "fmt" + "log" + "net/http" + "net/url" + "strings" + + "git.autistici.org/ale/crawl" + "github.com/PuerkitoBio/goquery" +) + +var ( + dbPath = flag.String("state", "crawldb", "crawl state database path") + concurrency = flag.Int("c", 10, "concurrent workers") + depth = flag.Int("depth", 10, "maximum link depth") + validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols") +) + +var linkMatches = []struct { + tag string + attr string +}{ + {"a", "href"}, + {"link", "href"}, + {"img", "src"}, + {"script", "src"}, +} + +func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error { + if !strings.HasPrefix(resp.Header.Get("Content-Type"), "text/html") { + return nil + } + + doc, err := goquery.NewDocumentFromResponse(resp) + if err != nil { + return err + } + + links := make(map[string]*url.URL) + + for _, lm := range linkMatches { + doc.Find(fmt.Sprintf("%s[%s]", lm.tag, lm.attr)).Each(func(i int, s *goquery.Selection) { + val, _ := s.Attr(lm.attr) + if linkurl, err := resp.Request.URL.Parse(val); err == nil { + links[linkurl.String()] = linkurl + } + }) + } + + for _, link := range links { + //log.Printf("%s -> %s", u, link.String()) + c.Enqueue(link, depth+1) + } + return nil +} + +func main() { + flag.Parse() + + seeds := crawl.MustParseURLs(flag.Args()) + scope := crawl.NewSeedScope(seeds, *depth, strings.Split(*validSchemes, ",")) + + crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), crawl.HandlerFunc(extractLinks)) + if err != nil { + log.Fatal(err) + } + crawler.Run() +} diff --git a/crawler.go b/crawler.go new file mode 100644 index 0000000..ed43b1f --- /dev/null +++ b/crawler.go @@ -0,0 +1,327 @@ +package crawl + +import ( + "bytes" + "encoding/gob" + "errors" + "fmt" + "log" + "net/http" + "net/url" + "strings" + "sync" + "time" + + "github.com/PuerkitoBio/purell" + "github.com/jmhodges/levigo" +) + +type gobDB struct { + *levigo.DB +} + +func newGobDB(path string) (*gobDB, error) { + opts := levigo.NewOptions() + opts.SetCreateIfMissing(true) + opts.SetCache(levigo.NewLRUCache(2 << 20)) + opts.SetFilterPolicy(levigo.NewBloomFilter(10)) + db, err := levigo.Open(path, opts) + if err != nil { + return nil, err + } + return &gobDB{db}, nil +} + +func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) error { + var b bytes.Buffer + if err := gob.NewEncoder(&b).Encode(obj); err != nil { + return err + } + return db.Put(wo, key, b.Bytes()) +} + +func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error { + data, err := db.Get(ro, key) + if err != nil { + return err + } + if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(obj); err != nil { + return err + } + return nil +} + +type URLInfo struct { + URL string + StatusCode int + CrawledAt time.Time + Error error +} + +type Scope interface { + Check(*url.URL, int) bool +} + +type Fetcher interface { + Fetch(string) (*http.Response, error) +} + +type FetcherFunc func(string) (*http.Response, error) + +func (f FetcherFunc) Fetch(u string) (*http.Response, error) { + return f(u) +} + +type Handler interface { + Handle(*Crawler, string, int, *http.Response, error) error +} + +type HandlerFunc func(*Crawler, string, int, *http.Response, error) error + +func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Response, err error) error { + return f(db, u, depth, resp, err) +} + +// UrlDb is the database of crawls (either pending or done). +type Crawler struct { + db *gobDB + seeds []*url.URL + scope Scope + fetcher Fetcher + handler Handler + + enqueueMx sync.Mutex +} + +type QueuePair struct { + Key []byte + URL string + Depth int +} + +// Update this URLInfo entry in the crawl database. +func (c *Crawler) UpdateURL(info *URLInfo) error { + wo := levigo.NewWriteOptions() + defer wo.Close() + return c.db.PutObj(wo, []byte(fmt.Sprintf("url/%s", info.URL)), info) +} + +// Enqueue a (possibly new) URL for processing. +func (c *Crawler) Enqueue(u *url.URL, depth int) { + // Normalize the URL. + urlStr := purell.NormalizeURL(u, purell.FlagsSafe|purell.FlagRemoveDotSegments|purell.FlagRemoveDuplicateSlashes|purell.FlagRemoveFragment|purell.FlagRemoveDirectoryIndex|purell.FlagSortQuery) + + // See if it's in scope. + if !c.scope.Check(u, depth) { + return + } + + c.enqueueMx.Lock() + defer c.enqueueMx.Unlock() + + // Check if we've already seen it. + var info URLInfo + ro := levigo.NewReadOptions() + defer ro.Close() + ukey := []byte(fmt.Sprintf("url/%s", urlStr)) + if err := c.db.GetObj(ro, ukey, &info); err == nil { + return + } + + // Create a unique key using the URL and the current timestamp. + qkey := []byte(fmt.Sprintf("queue/%d/%s", time.Now().Unix(), urlStr)) + + // Store the URL in the queue, and store an empty URLInfo to + // make sure that subsequent calls to Enqueue with the same + // URL will fail. + wo := levigo.NewWriteOptions() + defer wo.Close() + c.db.PutObj(wo, qkey, &QueuePair{Key: qkey, URL: urlStr, Depth: depth}) + c.db.PutObj(wo, ukey, &info) +} + +// Scan the queue for URLs until there are no more. +func (c *Crawler) process() <-chan QueuePair { + ch := make(chan QueuePair) + go func() { + queuePrefix := []byte("queue/") + for range time.Tick(2 * time.Second) { + n := 0 + + // Scan the queue using a snapshot, to ignore + // new URLs that might be added after this. + s := c.db.NewSnapshot() + ro := levigo.NewReadOptions() + ro.SetSnapshot(s) + + iter := c.db.NewIterator(ro) + for iter.Seek(queuePrefix); iter.Valid() && bytes.HasPrefix(iter.Key(), queuePrefix); iter.Next() { + var p QueuePair + if err := gob.NewDecoder(bytes.NewBuffer(iter.Value())).Decode(&p); err != nil { + continue + } + ch <- p + n++ + } + iter.Close() + + ro.Close() + c.db.ReleaseSnapshot(s) + + if n == 0 { + break + } + } + close(ch) + }() + return ch +} + +// Main worker loop. +func (c *Crawler) urlHandler(queue <-chan QueuePair) { + for p := range queue { + // Fetch the URL and handle it. Make sure to Close the + // response body (even if it gets replaced in the + // Response object). + fmt.Printf("%s\n", p.URL) + httpResp, httpErr := c.fetcher.Fetch(p.URL) + respBody := httpResp.Body + err := c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr) + if httpErr == nil { + respBody.Close() + } + + // Remove the URL from the queue if the handler was successful. + if err == nil { + wo := levigo.NewWriteOptions() + c.db.Delete(wo, p.Key) + wo.Close() + } else { + log.Printf("error handling %s: %v", p.URL, err) + } + } +} + +type seedScope struct { + seeds []*url.URL + schemes map[string]struct{} + maxDepth int +} + +func (s *seedScope) Check(u *url.URL, depth int) bool { + // Ignore non-allowed schemes. + if _, ok := s.schemes[u.Scheme]; !ok { + return false + } + + // Do not crawl beyond maxDepth. + if depth > s.maxDepth { + return false + } + + // Check each seed prefix. + for _, seed := range s.seeds { + if u.Host == seed.Host && strings.HasPrefix(u.Path, seed.Path) { + return true + } + } + return false +} + +// NewSeedScope returns a Scope that will only allow crawling the seed +// domains, and not beyond the specified maximum link depth. +func NewSeedScope(seeds []*url.URL, maxDepth int, allowedSchemes []string) Scope { + scope := &seedScope{ + seeds: seeds, + maxDepth: maxDepth, + schemes: make(map[string]struct{}), + } + for _, s := range allowedSchemes { + scope.schemes[s] = struct{}{} + } + return scope +} + +func MustParseURLs(urls []string) []*url.URL { + // Parse the seed URLs. + var parsed []*url.URL + for _, s := range urls { + u, err := url.Parse(s) + if err != nil { + log.Fatalf("error parsing seed \"%s\": %v", s, err) + } + parsed = append(parsed, u) + } + return parsed +} + +// NewCrawler creates a new Crawler object with the specified behavior. +func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler) (*Crawler, error) { + // Open the crawl database. + db, err := newGobDB(path) + if err != nil { + return nil, err + } + c := &Crawler{ + db: db, + fetcher: f, + handler: &standardPageHandler{h}, + seeds: seeds, + scope: scope, + } + return c, nil +} + +// Run the crawl, does not exit until it is done. +func (c *Crawler) Run() { + // Load initial seeds into the queue. + for _, u := range c.seeds { + c.Enqueue(u, 0) + } + + // Start some runners and wait until they're done. + var wg sync.WaitGroup + ch := c.process() + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + c.urlHandler(ch) + wg.Done() + }() + } + wg.Wait() +} + +// Standard page handler, follows redirects and invokes a child +// handler when status == 200. +type standardPageHandler struct { + h Handler +} + +func (wrap *standardPageHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error { + info := &URLInfo{URL: u, CrawledAt: time.Now()} + if err == nil { + info.StatusCode = resp.StatusCode + if resp.StatusCode == 200 { + err = wrap.h.Handle(c, u, depth, resp, err) + } else if resp.StatusCode > 300 && resp.StatusCode < 400 { + location := resp.Header.Get("Location") + if location != "" { + locationUrl, err := resp.Request.URL.Parse(location) + if err != nil { + log.Printf("error parsing Location header: %v", err) + } else { + c.Enqueue(locationUrl, depth+1) + } + } + } else { + err = errors.New(resp.Status) + } + } + info.Error = err + + //log.Printf("[CRAWL] %+v", info) + + c.UpdateURL(info) + return nil +} diff --git a/warc.go b/warc.go new file mode 100644 index 0000000..66cf417 --- /dev/null +++ b/warc.go @@ -0,0 +1,96 @@ +package crawl + +import ( + "fmt" + "io" + "time" + + "compress/gzip" + + "code.google.com/p/go-uuid/uuid" +) + +var ( + warcTimeFmt = time.RFC3339 + warcVersion = "WARC/1.0" + warcContentTypes = map[string]string{ + "warcinfo": "application/warc-fields", + "response": "application/http; msgtype=response", + "request": "application/http; msgtype=request", + "metadata": "application/warc-fields", + } +) + +// A Warc header. Header field names are case-sensitive. +type WarcHeader map[string]string + +// Set a header to the specified value. Multiple values are not +// supported. +func (h WarcHeader) Set(key, value string) { + h[key] = value + + // Keep Content-Type in sync with WARC-Type. + if key == "WARC-Type" { + if ct, ok := warcContentTypes[value]; ok { + h["Content-Type"] = ct + } else { + h["Content-Type"] = "application/octet-stream" + } + } +} + +// Get the value of a header. If not found, returns an empty string. +func (h WarcHeader) Get(key string) string { + return h[key] +} + +// Encode the header to a Writer. +func (h WarcHeader) Encode(w io.Writer) { + fmt.Fprintf(w, "%s\r\n", warcVersion) + for hdr, value := range h { + fmt.Fprintf(w, "%s: %s\r\n", hdr, value) + } + fmt.Fprintf(w, "\r\n") +} + +// NewWarcHeader returns a WarcHeader with its own unique ID and the +// current timestamp. +func NewWarcHeader() WarcHeader { + h := make(WarcHeader) + h.Set("WARC-Record-ID", fmt.Sprintf("<%s>", uuid.NewUUID().URN())) + h.Set("WARC-Date", time.Now().Format(warcTimeFmt)) + h.Set("Content-Type", "application/octet-stream") + return h +} + +// WarcWriter can write records to a file in WARC format. +type WarcWriter struct { + writer io.WriteCloser +} + +type recordWriter struct { + io.Writer +} + +func (rw *recordWriter) Close() error { + // Add the end-of-record marker. + fmt.Fprintf(rw, "\r\n\r\n") + return nil +} + +// NewRecord starts a new WARC record with the provided header. The +// caller must call Close on the returned writer before creating the +// next record. +func (w *WarcWriter) NewRecord(hdr WarcHeader) io.WriteCloser { + hdr.Encode(w.writer) + return &recordWriter{w.writer} +} + +// Close the WARC writer and flush all buffers. +func (w *WarcWriter) Close() error { + return w.writer.Close() +} + +func NewWarcWriter(w io.WriteCloser) *WarcWriter { + return &WarcWriter{gzip.NewWriter(w)} +} -- cgit v1.2.3-54-g00ecf