aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorale <ale@incal.net>2018-08-31 08:29:14 +0100
committerale <ale@incal.net>2018-08-31 08:29:14 +0100
commitee1a3d8e5278a4a4e8435f9129852b95a9c22afb (patch)
treefd7a42cfff4aed5bd2379feb35f7172287430ba2
parentb3d419486a87c9193c2fd6c16168f600876e0f73 (diff)
downloadcrawl-ee1a3d8e5278a4a4e8435f9129852b95a9c22afb.tar.gz
crawl-ee1a3d8e5278a4a4e8435f9129852b95a9c22afb.zip
Improve error checking
Detect write errors (both on the database and to the WARC output) and abort with an error message. Also fix a bunch of harmless lint warnings.
-rw-r--r--analysis/links.go73
-rw-r--r--client.go8
-rw-r--r--cmd/crawl/crawl.go107
-rw-r--r--cmd/links/links.go10
-rw-r--r--crawler.go51
-rw-r--r--queue.go37
-rw-r--r--warc/warc.go38
7 files changed, 197 insertions, 127 deletions
diff --git a/analysis/links.go b/analysis/links.go
index 97957ad..2bcfff1 100644
--- a/analysis/links.go
+++ b/analysis/links.go
@@ -39,39 +39,11 @@ type rawOutlink struct {
// GetLinks returns all the links found in a document. Currently only
// parses HTML pages and CSS stylesheets.
func GetLinks(resp *http.Response) ([]crawl.Outlink, error) {
- var outlinks []rawOutlink
-
- ctype := resp.Header.Get("Content-Type")
- if strings.HasPrefix(ctype, "text/html") {
- // Use goquery to extract links from the parsed HTML
- // contents (query patterns are described in the
- // linkMatches table).
- doc, err := goquery.NewDocumentFromResponse(resp)
- if err != nil {
- return nil, err
- }
-
- for _, lm := range linkMatches {
- doc.Find(fmt.Sprintf("%s[%s]", lm.tag, lm.attr)).Each(func(i int, s *goquery.Selection) {
- val, _ := s.Attr(lm.attr)
- outlinks = append(outlinks, rawOutlink{URL: val, Tag: lm.linkTag})
- })
- }
- } else if strings.HasPrefix(ctype, "text/css") {
- // Use a simple (and actually quite bad) regular
- // expression to extract "url()" links from CSS.
- if data, err := ioutil.ReadAll(resp.Body); err == nil {
- for _, val := range urlcssRx.FindAllStringSubmatch(string(data), -1) {
- outlinks = append(outlinks, rawOutlink{URL: val[1], Tag: crawl.TagRelated})
- }
- }
- }
-
// Parse outbound links relative to the request URI, and
// return unique results.
var result []crawl.Outlink
links := make(map[string]crawl.Outlink)
- for _, l := range outlinks {
+ for _, l := range extractLinks(resp) {
// Skip data: URLs altogether.
if strings.HasPrefix(l.URL, "data:") {
continue
@@ -88,3 +60,46 @@ func GetLinks(resp *http.Response) ([]crawl.Outlink, error) {
}
return result, nil
}
+
+func extractLinks(resp *http.Response) []rawOutlink {
+ ctype := resp.Header.Get("Content-Type")
+ switch {
+ case strings.HasPrefix(ctype, "text/html"):
+ return extractLinksFromHTML(resp)
+ case strings.HasPrefix(ctype, "text/css"):
+ return extractLinksFromCSS(resp)
+ default:
+ return nil
+ }
+}
+
+func extractLinksFromHTML(resp *http.Response) []rawOutlink {
+ var outlinks []rawOutlink
+ // Use goquery to extract links from the parsed HTML
+ // contents (query patterns are described in the
+ // linkMatches table).
+ doc, err := goquery.NewDocumentFromReader(resp.Body)
+ if err != nil {
+ return nil
+ }
+
+ for _, lm := range linkMatches {
+ doc.Find(fmt.Sprintf("%s[%s]", lm.tag, lm.attr)).Each(func(i int, s *goquery.Selection) {
+ val, _ := s.Attr(lm.attr)
+ outlinks = append(outlinks, rawOutlink{URL: val, Tag: lm.linkTag})
+ })
+ }
+ return outlinks
+}
+
+func extractLinksFromCSS(resp *http.Response) []rawOutlink {
+ // Use a simple (and actually quite bad) regular
+ // expression to extract "url()" links from CSS.
+ var outlinks []rawOutlink
+ if data, err := ioutil.ReadAll(resp.Body); err == nil {
+ for _, val := range urlcssRx.FindAllStringSubmatch(string(data), -1) {
+ outlinks = append(outlinks, rawOutlink{URL: val[1], Tag: crawl.TagRelated})
+ }
+ }
+ return outlinks
+}
diff --git a/client.go b/client.go
index c0c2626..45736f5 100644
--- a/client.go
+++ b/client.go
@@ -9,18 +9,18 @@ import (
var defaultClientTimeout = 60 * time.Second
-var DefaultClient *http.Client
-
// DefaultClient returns a http.Client suitable for crawling: does not
// follow redirects, accepts invalid TLS certificates, sets a
// reasonable timeout for requests.
+var DefaultClient *http.Client
+
func init() {
- jar, _ := cookiejar.New(nil)
+ jar, _ := cookiejar.New(nil) // nolint
DefaultClient = &http.Client{
Timeout: defaultClientTimeout,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
- InsecureSkipVerify: true,
+ InsecureSkipVerify: true, // nolint
},
},
CheckRedirect: func(req *http.Request, via []*http.Request) error {
diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go
index 0e5fc15..3d1120c 100644
--- a/cmd/crawl/crawl.go
+++ b/cmd/crawl/crawl.go
@@ -37,30 +37,24 @@ var (
cpuprofile = flag.String("cpuprofile", "", "create cpu profile")
)
-func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error {
+func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, _ error) error {
links, err := analysis.GetLinks(resp)
if err != nil {
return err
}
for _, link := range links {
- c.Enqueue(link, depth+1)
+ if err := c.Enqueue(link, depth+1); err != nil {
+ return err
+ }
}
return nil
}
-type fakeCloser struct {
- io.Reader
-}
-
-func (f *fakeCloser) Close() error {
- return nil
-}
-
func hdr2str(h http.Header) []byte {
var b bytes.Buffer
- h.Write(&b)
+ h.Write(&b) // nolint
return b.Bytes()
}
@@ -69,43 +63,58 @@ type warcSaveHandler struct {
warcInfoID string
}
+func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error {
+ hdr := warc.NewHeader()
+ hdr.Set("WARC-Type", typ)
+ hdr.Set("WARC-Target-URI", uri)
+ hdr.Set("WARC-Warcinfo-ID", h.warcInfoID)
+ hdr.Set("Content-Length", strconv.Itoa(len(data)))
+
+ w, err := h.warc.NewRecord(hdr)
+ if err != nil {
+ return err
+ }
+ if _, err := w.Write(data); err != nil {
+ return err
+ }
+ return w.Close()
+}
+
func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error {
+ if err != nil {
+ return err
+ }
+
+ // Read the response body (so we can save it to the WARC
+ // output) and replace it with a buffer.
data, derr := ioutil.ReadAll(resp.Body)
if derr != nil {
- return err
+ return derr
}
- resp.Body = &fakeCloser{bytes.NewReader(data)}
+ resp.Body = ioutil.NopCloser(bytes.NewReader(data))
- // Dump the request.
+ // Dump the request to the WARC output.
var b bytes.Buffer
- resp.Request.Write(&b)
- hdr := warc.NewHeader()
- hdr.Set("WARC-Type", "request")
- hdr.Set("WARC-Target-URI", resp.Request.URL.String())
- hdr.Set("WARC-Warcinfo-ID", h.warcInfoID)
- hdr.Set("Content-Length", strconv.Itoa(b.Len()))
- w := h.warc.NewRecord(hdr)
- w.Write(b.Bytes())
- w.Close()
+ if werr := resp.Request.Write(&b); werr != nil {
+ return werr
+ }
+ if werr := h.writeWARCRecord("request", resp.Request.URL.String(), b.Bytes()); werr != nil {
+ return werr
+ }
// Dump the response.
statusLine := fmt.Sprintf("HTTP/1.1 %s", resp.Status)
respPayload := bytes.Join([][]byte{
[]byte(statusLine), hdr2str(resp.Header), data},
[]byte{'\r', '\n'})
- hdr = warc.NewHeader()
- hdr.Set("WARC-Type", "response")
- hdr.Set("WARC-Target-URI", resp.Request.URL.String())
- hdr.Set("WARC-Warcinfo-ID", h.warcInfoID)
- hdr.Set("Content-Length", strconv.Itoa(len(respPayload)))
- w = h.warc.NewRecord(hdr)
- w.Write(respPayload)
- w.Close()
+ if werr := h.writeWARCRecord("response", resp.Request.URL.String(), respPayload); werr != nil {
+ return werr
+ }
return extractLinks(c, u, depth, resp, err)
}
-func newWarcSaveHandler(w *warc.Writer) crawl.Handler {
+func newWarcSaveHandler(w *warc.Writer) (crawl.Handler, error) {
info := strings.Join([]string{
"Software: crawl/1.0\r\n",
"Format: WARC File Format 1.0\r\n",
@@ -116,13 +125,18 @@ func newWarcSaveHandler(w *warc.Writer) crawl.Handler {
hdr.Set("WARC-Type", "warcinfo")
hdr.Set("WARC-Warcinfo-ID", hdr.Get("WARC-Record-ID"))
hdr.Set("Content-Length", strconv.Itoa(len(info)))
- hdrw := w.NewRecord(hdr)
- io.WriteString(hdrw, info)
- hdrw.Close()
+ hdrw, err := w.NewRecord(hdr)
+ if err != nil {
+ return nil, err
+ }
+ if _, err := io.WriteString(hdrw, info); err != nil {
+ return nil, err
+ }
+ hdrw.Close() // nolint
return &warcSaveHandler{
warc: w,
warcInfoID: hdr.Get("WARC-Record-ID"),
- }
+ }, nil
}
type crawlStats struct {
@@ -149,7 +163,7 @@ func (c *crawlStats) Dump() {
c.lock.Lock()
defer c.lock.Unlock()
rate := float64(c.bytes) / time.Since(c.start).Seconds() / 1000
- fmt.Fprintf(os.Stderr, "stats: downloaded %d bytes (%.4g KB/s), status: %v\n", c.bytes, rate, c.states)
+ fmt.Fprintf(os.Stderr, "stats: downloaded %d bytes (%.4g KB/s), status: %v\n", c.bytes, rate, c.states) // nolint
}
var stats *crawlStats
@@ -201,11 +215,6 @@ func main() {
defer pprof.StopCPUProfile()
}
- outf, err := os.Create(*outputFile)
- if err != nil {
- log.Fatal(err)
- }
-
seeds := crawl.MustParseURLs(flag.Args())
scope := crawl.AND(
crawl.NewSchemeScope(strings.Split(*validSchemes, ",")),
@@ -217,10 +226,17 @@ func main() {
scope = crawl.OR(scope, crawl.NewIncludeRelatedScope())
}
+ outf, err := os.Create(*outputFile)
+ if err != nil {
+ log.Fatal(err)
+ }
w := warc.NewWriter(outf)
- defer w.Close()
+ defer w.Close() // nolint
- saver := newWarcSaveHandler(w)
+ saver, err := newWarcSaveHandler(w)
+ if err != nil {
+ log.Fatal(err)
+ }
crawler, err := crawl.NewCrawler(*dbPath, seeds, scope, crawl.FetcherFunc(fetch), crawl.NewRedirectHandler(saver))
if err != nil {
@@ -240,13 +256,12 @@ func main() {
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
crawler.Run(*concurrency)
-
crawler.Close()
if signaled.Load().(bool) {
os.Exit(1)
}
if !*keepDb {
- os.RemoveAll(*dbPath)
+ os.RemoveAll(*dbPath) // nolint
}
}
diff --git a/cmd/links/links.go b/cmd/links/links.go
index e8b0fcb..9cd741f 100644
--- a/cmd/links/links.go
+++ b/cmd/links/links.go
@@ -15,20 +15,25 @@ import (
)
var (
- dbPath = flag.String("state", "crawldb", "crawl state database path")
concurrency = flag.Int("c", 10, "concurrent workers")
depth = flag.Int("depth", 10, "maximum link depth")
validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols")
)
func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error {
+ if err != nil {
+ return err
+ }
+
links, err := analysis.GetLinks(resp)
if err != nil {
return err
}
for _, link := range links {
- c.Enqueue(link, depth+1)
+ if err := c.Enqueue(link, depth+1); err != nil {
+ return err
+ }
}
return nil
@@ -49,4 +54,5 @@ func main() {
log.Fatal(err)
}
crawler.Run(*concurrency)
+ crawler.Close()
}
diff --git a/crawler.go b/crawler.go
index aef628f..f6670c1 100644
--- a/crawler.go
+++ b/crawler.go
@@ -144,10 +144,10 @@ type Crawler struct {
}
// Enqueue a (possibly new) URL for processing.
-func (c *Crawler) Enqueue(link Outlink, depth int) {
+func (c *Crawler) Enqueue(link Outlink, depth int) error {
// See if it's in scope.
if !c.scope.Check(link, depth) {
- return
+ return nil
}
// Normalize the URL.
@@ -161,16 +161,20 @@ func (c *Crawler) Enqueue(link Outlink, depth int) {
var info URLInfo
ukey := []byte(fmt.Sprintf("url/%s", urlStr))
if err := c.db.GetObj(ukey, &info); err == nil {
- return
+ return nil
}
// Store the URL in the queue, and store an empty URLInfo to
// make sure that subsequent calls to Enqueue with the same
// URL will fail.
wb := new(leveldb.Batch)
- c.queue.Add(wb, urlStr, depth, time.Now())
- c.db.PutObjBatch(wb, ukey, &info)
- c.db.Write(wb, nil)
+ if err := c.queue.Add(wb, urlStr, depth, time.Now()); err != nil {
+ return err
+ }
+ if err := c.db.PutObjBatch(wb, ukey, &info); err != nil {
+ return err
+ }
+ return c.db.Write(wb, nil)
}
var scanInterval = 1 * time.Second
@@ -210,7 +214,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
// Ignore errors, we can work with an empty object.
urlkey := []byte(fmt.Sprintf("url/%s", p.URL))
var info URLInfo
- c.db.GetObj(urlkey, &info)
+ c.db.GetObj(urlkey, &info) // nolint
info.CrawledAt = time.Now()
info.URL = p.URL
@@ -230,18 +234,17 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
wb := new(leveldb.Batch)
if httpErr == nil {
- respBody.Close()
+ respBody.Close() // nolint
// Remove the URL from the queue if the fetcher was successful.
c.queue.Release(wb, p)
} else {
log.Printf("error retrieving %s: %v", p.URL, httpErr)
- c.queue.Retry(wb, p, 300*time.Second)
+ Must(c.queue.Retry(wb, p, 300*time.Second))
}
- c.db.PutObjBatch(wb, urlkey, &info)
-
- c.db.Write(wb, nil)
+ Must(c.db.PutObjBatch(wb, urlkey, &info))
+ Must(c.db.Write(wb, nil))
}
}
@@ -279,7 +282,9 @@ func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler
c.stopping.Store(false)
// Recover active tasks.
- c.queue.Recover()
+ if err := c.queue.Recover(); err != nil {
+ return nil, err
+ }
return c, nil
}
@@ -289,7 +294,7 @@ func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler
func (c *Crawler) Run(concurrency int) {
// Load initial seeds into the queue.
for _, u := range c.seeds {
- c.Enqueue(Outlink{URL: u, Tag: TagPrimary}, 0)
+ Must(c.Enqueue(Outlink{URL: u, Tag: TagPrimary}, 0))
}
// Start some runners and wait until they're done.
@@ -313,7 +318,7 @@ func (c *Crawler) Stop() {
// Close the database and release resources associated with the crawler state.
func (c *Crawler) Close() {
- c.db.Close()
+ c.db.Close() // nolint
}
type redirectHandler struct {
@@ -330,11 +335,11 @@ func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http.
} else if resp.StatusCode > 300 && resp.StatusCode < 400 {
location := resp.Header.Get("Location")
if location != "" {
- locationURL, err := resp.Request.URL.Parse(location)
- if err != nil {
- log.Printf("error parsing Location header: %v", err)
+ locationURL, uerr := resp.Request.URL.Parse(location)
+ if uerr != nil {
+ log.Printf("error parsing Location header: %v", uerr)
} else {
- c.Enqueue(Outlink{URL: locationURL, Tag: TagPrimary}, depth+1)
+ Must(c.Enqueue(Outlink{URL: locationURL, Tag: TagPrimary}, depth+1))
}
}
} else {
@@ -348,3 +353,11 @@ func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http.
func NewRedirectHandler(wrap Handler) Handler {
return &redirectHandler{wrap}
}
+
+// Must will abort the program with a message when we encounter an
+// error that we can't recover from.
+func Must(err error) {
+ if err != nil {
+ log.Fatalf("fatal error: %v", err)
+ }
+}
diff --git a/queue.go b/queue.go
index da4a7b8..ee0e7ed 100644
--- a/queue.go
+++ b/queue.go
@@ -20,8 +20,7 @@ var (
queuePrefix = []byte("queue")
activePrefix = []byte("queue_active")
- queueKeySep = []byte{'/'}
- queueKeySepP1 = []byte{'/' + 1}
+ queueKeySep = []byte{'/'}
)
type queuePair struct {
@@ -45,7 +44,9 @@ func (q *queue) Scan(ch chan<- queuePair) error {
continue
}
p.key = iter.Key()
- q.acquire(p)
+ if err := q.acquire(p); err != nil {
+ return err
+ }
ch <- p
n++
}
@@ -57,19 +58,24 @@ func (q *queue) Scan(ch chan<- queuePair) error {
}
// Add an item to the pending work queue.
-func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) {
+func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) error {
t := uint64(when.UnixNano())
qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep)
- q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth})
+ return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth})
}
-func (q *queue) acquire(qp queuePair) {
+func (q *queue) acquire(qp queuePair) error {
wb := new(leveldb.Batch)
- q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp)
+ if err := q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp); err != nil {
+ return err
+ }
wb.Delete(qp.key)
- q.db.Write(wb, nil)
+ if err := q.db.Write(wb, nil); err != nil {
+ return err
+ }
atomic.AddInt32(&q.numActive, 1)
+ return nil
}
// Release an item from the queue. Processing for this item is done.
@@ -79,16 +85,19 @@ func (q *queue) Release(wb *leveldb.Batch, qp queuePair) {
}
// Retry processing this item at a later time.
-func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) {
+func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) error {
wb.Delete(activeQueueKey(qp.key))
- q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay))
+ if err := q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)); err != nil {
+ return err
+ }
atomic.AddInt32(&q.numActive, -1)
+ return nil
}
// Recover moves all active tasks to the pending queue. To be
// called at startup to recover tasks that were active when the
// previous run terminated.
-func (q *queue) Recover() {
+func (q *queue) Recover() error {
wb := new(leveldb.Batch)
prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep)
@@ -100,11 +109,13 @@ func (q *queue) Recover() {
continue
}
p.key = iter.Key()[len(activePrefix)+1:]
- q.db.PutObjBatch(wb, p.key, &p)
+ if err := q.db.PutObjBatch(wb, p.key, &p); err != nil {
+ return err
+ }
wb.Delete(iter.Key())
}
- q.db.Write(wb, nil)
+ return q.db.Write(wb, nil)
}
func encodeUint64(n uint64) []byte {
diff --git a/warc/warc.go b/warc/warc.go
index 49ab7a0..6914c1b 100644
--- a/warc/warc.go
+++ b/warc/warc.go
@@ -47,12 +47,17 @@ func (h Header) Get(key string) string {
}
// Encode the header to a Writer.
-func (h Header) Encode(w io.Writer) {
- fmt.Fprintf(w, "%s\r\n", warcVersion)
+func (h Header) Encode(w io.Writer) error {
+ if _, err := fmt.Fprintf(w, "%s\r\n", warcVersion); err != nil {
+ return err
+ }
for hdr, value := range h {
- fmt.Fprintf(w, "%s: %s\r\n", hdr, value)
+ if _, err := fmt.Fprintf(w, "%s: %s\r\n", hdr, value); err != nil {
+ return err
+ }
}
- fmt.Fprintf(w, "\r\n")
+ _, err := fmt.Fprintf(w, "\r\n")
+ return err
}
// NewHeader returns a Header with its own unique ID and the
@@ -80,26 +85,31 @@ type recordWriter struct {
func (rw *recordWriter) Close() error {
// Add the end-of-record marker.
- fmt.Fprintf(rw, "\r\n\r\n")
-
+ _, err := fmt.Fprintf(rw, "\r\n\r\n")
<-rw.lockCh
-
- return nil
+ return err
}
// NewRecord starts a new WARC record with the provided header. The
// caller must call Close on the returned writer before creating the
// next record. Note that this function may block until that condition
-// is satisfied.
-func (w *Writer) NewRecord(hdr Header) io.WriteCloser {
+// is satisfied. If this function returns an error, the state of the
+// Writer is invalid and it should no longer be used.
+func (w *Writer) NewRecord(hdr Header) (io.WriteCloser, error) {
w.lockCh <- true
if w.gzwriter != nil {
- w.gzwriter.Close()
+ w.gzwriter.Close() // nolint
+ }
+ var err error
+ w.gzwriter, err = gzip.NewWriterLevel(w.writer, gzip.BestCompression)
+ if err != nil {
+ return nil, err
}
- w.gzwriter, _ = gzip.NewWriterLevel(w.writer, gzip.BestCompression)
w.gzwriter.Header.Name = hdr.Get("WARC-Record-ID")
- hdr.Encode(w.gzwriter)
- return &recordWriter{Writer: w.gzwriter, lockCh: w.lockCh}
+ if err = hdr.Encode(w.gzwriter); err != nil {
+ return nil, err
+ }
+ return &recordWriter{Writer: w.gzwriter, lockCh: w.lockCh}, nil
}
// Close the WARC writer and flush all buffers. This will also call