From 6355aa4310ff0c32b056580e812ca6f0e2a5ee2f Mon Sep 17 00:00:00 2001 From: Jordan Date: Thu, 24 Mar 2022 09:08:13 -0700 Subject: links, crawl: dramatically reduce memory usage to prevent excessive memory usage and OOM crashes, rather than store and pass around response bodies in memory buffers, let's store them temporarily on the filesystem wget-style and delete them when processed --- README.md | 2 ++ analysis/links.go | 11 +++---- cmd/crawl/crawl.go | 86 +++++++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 81 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 128088c..d187cab 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,8 @@ which make crawl more amenable to serve as a drop-in replacement for [wpull](https://github.com/ArchiveTeam/wpull)/[grab-site](https://github.com/ArchiveTeam/grab-site). Notable changes include: +* dramatically reduce memory usage; (temporarily) write responses to + the filesystem rather than pass data around in memory buffers * --bind, support making outbound requests from a particular interface * --resume, directory containing the crawl state to continue from * infinite recursion depth by default diff --git a/analysis/links.go b/analysis/links.go index c0663fa..f1b3e99 100644 --- a/analysis/links.go +++ b/analysis/links.go @@ -7,6 +7,7 @@ import ( "io" "io/ioutil" "net/http" + "os" "regexp" "strings" @@ -41,12 +42,12 @@ type rawOutlink struct { // GetLinks returns all the links found in a document. Currently only // parses HTML pages and CSS stylesheets. -func GetLinks(resp *http.Response) ([]crawl.Outlink, error) { +func GetLinks(resp *http.Response, body *os.File) ([]crawl.Outlink, error) { // Parse outbound links relative to the request URI, and // return unique results. var result []crawl.Outlink links := make(map[string]crawl.Outlink) - for _, l := range extractLinks(resp) { + for _, l := range extractLinks(resp, body) { // Skip data: URLs altogether. if strings.HasPrefix(l.URL, "data:") { continue @@ -64,13 +65,13 @@ func GetLinks(resp *http.Response) ([]crawl.Outlink, error) { return result, nil } -func extractLinks(resp *http.Response) []rawOutlink { +func extractLinks(resp *http.Response, body *os.File) []rawOutlink { ctype := resp.Header.Get("Content-Type") switch { case strings.HasPrefix(ctype, "text/html"): - return extractLinksFromHTML(resp.Body, nil) + return extractLinksFromHTML(body, nil) case strings.HasPrefix(ctype, "text/css"): - return extractLinksFromCSS(resp.Body, nil) + return extractLinksFromCSS(body, nil) default: return nil } diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go index 8f84837..8f28bc4 100644 --- a/cmd/crawl/crawl.go +++ b/cmd/crawl/crawl.go @@ -113,8 +113,8 @@ func (f dnsMapFlag) Set(s string) error { return nil } -func extractLinks(p crawl.Publisher, u string, depth int, resp *http.Response, _ error) error { - links, err := analysis.GetLinks(resp) +func extractLinks(p crawl.Publisher, u string, depth int, resp *http.Response, body *os.File, _ error) error { + links, err := analysis.GetLinks(resp, body) if err != nil { // This is not a fatal error, just a bad web page. return nil @@ -141,26 +141,51 @@ type warcSaveHandler struct { numWritten int } -func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error { +func (h *warcSaveHandler) writeWARCRecord(typ, uri string, header []byte, body *os.File) error { hdr := warc.NewHeader() hdr.Set("WARC-Type", typ) hdr.Set("WARC-Target-URI", uri) hdr.Set("WARC-Warcinfo-ID", h.warcInfoID) - hdr.Set("Content-Length", strconv.Itoa(len(data))) - w, err := h.warc.NewRecord(hdr) + var size int64 + fi, err := body.Stat() if err != nil { + fmt.Println(err) return err } - if _, err := w.Write(data); err != nil { + size += fi.Size() + size += int64(len(string(header))) + hdr.Set("Content-Length", strconv.FormatInt(size, 10)) + + w, err := h.warc.NewRecord(hdr) + if err != nil { + fmt.Println(err) return err } + if header != nil { + if _, err := w.Write(header); err != nil { + fmt.Println(err) + return err + } + } + + scanner := bufio.NewScanner(body) + scanner.Split(bufio.ScanBytes) + for scanner.Scan() { + if _, err := w.Write(scanner.Bytes()); err != nil { + fmt.Println(err) + return err + } + } return w.Close() } + func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, tag, depth int, resp *http.Response, _ error) error { // Read the response body (so we can save it to the WARC // output) and replace it with a buffer. + + /* data, derr := ioutil.ReadAll(resp.Body) if derr != nil { // Errors at this stage are usually transport-level errors, @@ -168,29 +193,56 @@ func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, tag, depth int, re return crawl.ErrRetryRequest } resp.Body = ioutil.NopCloser(bytes.NewReader(data)) + */ + + // Keep temporary file to store request/response data + r, _ := ioutil.TempFile("temp", "crawl") + defer r.Close() + + w, _ := os.OpenFile(r.Name(), os.O_RDWR, 0777) + defer w.Close() + defer os.Remove(r.Name()) // Dump the request to the WARC output. - var b bytes.Buffer - if werr := resp.Request.Write(&b); werr != nil { + if werr := resp.Request.Write(w); werr != nil { return werr } - if werr := h.writeWARCRecord("request", resp.Request.URL.String(), b.Bytes()); werr != nil { + if werr := h.writeWARCRecord("request", resp.Request.URL.String(), nil, r); werr != nil { return werr } + // Seek to start; we've written since last read + if _, err := r.Seek(0, io.SeekStart); err != nil { + return err + } + + w.Close() + w, _ = os.OpenFile(r.Name(), os.O_RDWR, 0777) + defer w.Close() + + // Write response body to tmp file + if _, err := io.Copy(w, resp.Body); err != nil { + return err + } + // Dump the response. statusLine := fmt.Sprintf("HTTP/1.1 %s", resp.Status) - respPayload := bytes.Join( - [][]byte{[]byte(statusLine), hdr2str(resp.Header), data}, + respHeader := bytes.Join( + [][]byte{[]byte(statusLine), hdr2str(resp.Header), []byte("")}, []byte{'\r', '\n'}, ) - if werr := h.writeWARCRecord("response", resp.Request.URL.String(), respPayload); werr != nil { + if werr := h.writeWARCRecord("response", resp.Request.URL.String(), respHeader, r); werr != nil { return werr } + // Seek to start; we've written since last read + if _, err := r.Seek(0, io.SeekStart); err != nil { + return err + } + h.numWritten++ - return extractLinks(p, u, depth, resp, nil) + return extractLinks(p, u, depth, resp, r, nil) } func newWarcSaveHandler(w *warc.Writer) (crawl.Handler, error) { @@ -327,6 +379,14 @@ func main() { } } + // Create directory to (temporarily) store response bodies + if _, err := os.Stat("temp"); os.IsNotExist(err) { + err := os.Mkdir("temp", 0700) + if err != nil { + log.Fatal(err) + } + } + // Write seed list (assembled URLs) to seed_urls file f, err := os.Create("seed_urls") if err != nil { -- cgit v1.2.3-54-g00ecf