From 6355aa4310ff0c32b056580e812ca6f0e2a5ee2f Mon Sep 17 00:00:00 2001 From: Jordan Date: Thu, 24 Mar 2022 09:08:13 -0700 Subject: links, crawl: dramatically reduce memory usage to prevent excessive memory usage and OOM crashes, rather than store and pass around response bodies in memory buffers, let's store them temporarily on the filesystem wget-style and delete them when processed --- cmd/crawl/crawl.go | 86 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 73 insertions(+), 13 deletions(-) (limited to 'cmd') diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go index 8f84837..8f28bc4 100644 --- a/cmd/crawl/crawl.go +++ b/cmd/crawl/crawl.go @@ -113,8 +113,8 @@ func (f dnsMapFlag) Set(s string) error { return nil } -func extractLinks(p crawl.Publisher, u string, depth int, resp *http.Response, _ error) error { - links, err := analysis.GetLinks(resp) +func extractLinks(p crawl.Publisher, u string, depth int, resp *http.Response, body *os.File, _ error) error { + links, err := analysis.GetLinks(resp, body) if err != nil { // This is not a fatal error, just a bad web page. return nil @@ -141,26 +141,51 @@ type warcSaveHandler struct { numWritten int } -func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error { +func (h *warcSaveHandler) writeWARCRecord(typ, uri string, header []byte, body *os.File) error { hdr := warc.NewHeader() hdr.Set("WARC-Type", typ) hdr.Set("WARC-Target-URI", uri) hdr.Set("WARC-Warcinfo-ID", h.warcInfoID) - hdr.Set("Content-Length", strconv.Itoa(len(data))) - w, err := h.warc.NewRecord(hdr) + var size int64 + fi, err := body.Stat() if err != nil { + fmt.Println(err) return err } - if _, err := w.Write(data); err != nil { + size += fi.Size() + size += int64(len(string(header))) + hdr.Set("Content-Length", strconv.FormatInt(size, 10)) + + w, err := h.warc.NewRecord(hdr) + if err != nil { + fmt.Println(err) return err } + if header != nil { + if _, err := w.Write(header); err != nil { + fmt.Println(err) + return err + } + } + + scanner := bufio.NewScanner(body) + scanner.Split(bufio.ScanBytes) + for scanner.Scan() { + if _, err := w.Write(scanner.Bytes()); err != nil { + fmt.Println(err) + return err + } + } return w.Close() } + func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, tag, depth int, resp *http.Response, _ error) error { // Read the response body (so we can save it to the WARC // output) and replace it with a buffer. + + /* data, derr := ioutil.ReadAll(resp.Body) if derr != nil { // Errors at this stage are usually transport-level errors, @@ -168,29 +193,56 @@ func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, tag, depth int, re return crawl.ErrRetryRequest } resp.Body = ioutil.NopCloser(bytes.NewReader(data)) + */ + + // Keep temporary file to store request/response data + r, _ := ioutil.TempFile("temp", "crawl") + defer r.Close() + + w, _ := os.OpenFile(r.Name(), os.O_RDWR, 0777) + defer w.Close() + defer os.Remove(r.Name()) // Dump the request to the WARC output. - var b bytes.Buffer - if werr := resp.Request.Write(&b); werr != nil { + if werr := resp.Request.Write(w); werr != nil { return werr } - if werr := h.writeWARCRecord("request", resp.Request.URL.String(), b.Bytes()); werr != nil { + if werr := h.writeWARCRecord("request", resp.Request.URL.String(), nil, r); werr != nil { return werr } + // Seek to start; we've written since last read + if _, err := r.Seek(0, io.SeekStart); err != nil { + return err + } + + w.Close() + w, _ = os.OpenFile(r.Name(), os.O_RDWR, 0777) + defer w.Close() + + // Write response body to tmp file + if _, err := io.Copy(w, resp.Body); err != nil { + return err + } + // Dump the response. statusLine := fmt.Sprintf("HTTP/1.1 %s", resp.Status) - respPayload := bytes.Join( - [][]byte{[]byte(statusLine), hdr2str(resp.Header), data}, + respHeader := bytes.Join( + [][]byte{[]byte(statusLine), hdr2str(resp.Header), []byte("")}, []byte{'\r', '\n'}, ) - if werr := h.writeWARCRecord("response", resp.Request.URL.String(), respPayload); werr != nil { + if werr := h.writeWARCRecord("response", resp.Request.URL.String(), respHeader, r); werr != nil { return werr } + // Seek to start; we've written since last read + if _, err := r.Seek(0, io.SeekStart); err != nil { + return err + } + h.numWritten++ - return extractLinks(p, u, depth, resp, nil) + return extractLinks(p, u, depth, resp, r, nil) } func newWarcSaveHandler(w *warc.Writer) (crawl.Handler, error) { @@ -327,6 +379,14 @@ func main() { } } + // Create directory to (temporarily) store response bodies + if _, err := os.Stat("temp"); os.IsNotExist(err) { + err := os.Mkdir("temp", 0700) + if err != nil { + log.Fatal(err) + } + } + // Write seed list (assembled URLs) to seed_urls file f, err := os.Create("seed_urls") if err != nil { -- cgit v1.2.3-54-g00ecf