diff options
author | Jordan <me@jordan.im> | 2022-03-24 09:08:13 -0700 |
---|---|---|
committer | Jordan <me@jordan.im> | 2022-03-24 09:08:13 -0700 |
commit | 6355aa4310ff0c32b056580e812ca6f0e2a5ee2f (patch) | |
tree | 3a3008d4d50e5e19f6805b1e1e03460e202048f9 /cmd | |
parent | a39310f111cef49ff630cc12fdebabc4df37ec28 (diff) | |
download | crawl-6355aa4310ff0c32b056580e812ca6f0e2a5ee2f.tar.gz crawl-6355aa4310ff0c32b056580e812ca6f0e2a5ee2f.zip |
links, crawl: dramatically reduce memory usage
to prevent excessive memory usage and OOM crashes, rather than store and
pass around response bodies in memory buffers, let's store them
temporarily on the filesystem wget-style and delete them when processed
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/crawl/crawl.go | 86 |
1 files changed, 73 insertions, 13 deletions
diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go index 8f84837..8f28bc4 100644 --- a/cmd/crawl/crawl.go +++ b/cmd/crawl/crawl.go @@ -113,8 +113,8 @@ func (f dnsMapFlag) Set(s string) error { return nil } -func extractLinks(p crawl.Publisher, u string, depth int, resp *http.Response, _ error) error { - links, err := analysis.GetLinks(resp) +func extractLinks(p crawl.Publisher, u string, depth int, resp *http.Response, body *os.File, _ error) error { + links, err := analysis.GetLinks(resp, body) if err != nil { // This is not a fatal error, just a bad web page. return nil @@ -141,26 +141,51 @@ type warcSaveHandler struct { numWritten int } -func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error { +func (h *warcSaveHandler) writeWARCRecord(typ, uri string, header []byte, body *os.File) error { hdr := warc.NewHeader() hdr.Set("WARC-Type", typ) hdr.Set("WARC-Target-URI", uri) hdr.Set("WARC-Warcinfo-ID", h.warcInfoID) - hdr.Set("Content-Length", strconv.Itoa(len(data))) - w, err := h.warc.NewRecord(hdr) + var size int64 + fi, err := body.Stat() if err != nil { + fmt.Println(err) return err } - if _, err := w.Write(data); err != nil { + size += fi.Size() + size += int64(len(string(header))) + hdr.Set("Content-Length", strconv.FormatInt(size, 10)) + + w, err := h.warc.NewRecord(hdr) + if err != nil { + fmt.Println(err) return err } + if header != nil { + if _, err := w.Write(header); err != nil { + fmt.Println(err) + return err + } + } + + scanner := bufio.NewScanner(body) + scanner.Split(bufio.ScanBytes) + for scanner.Scan() { + if _, err := w.Write(scanner.Bytes()); err != nil { + fmt.Println(err) + return err + } + } return w.Close() } + func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, tag, depth int, resp *http.Response, _ error) error { // Read the response body (so we can save it to the WARC // output) and replace it with a buffer. + + /* data, derr := ioutil.ReadAll(resp.Body) if derr != nil { // Errors at this stage are usually transport-level errors, @@ -168,29 +193,56 @@ func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, tag, depth int, re return crawl.ErrRetryRequest } resp.Body = ioutil.NopCloser(bytes.NewReader(data)) + */ + + // Keep temporary file to store request/response data + r, _ := ioutil.TempFile("temp", "crawl") + defer r.Close() + + w, _ := os.OpenFile(r.Name(), os.O_RDWR, 0777) + defer w.Close() + defer os.Remove(r.Name()) // Dump the request to the WARC output. - var b bytes.Buffer - if werr := resp.Request.Write(&b); werr != nil { + if werr := resp.Request.Write(w); werr != nil { return werr } - if werr := h.writeWARCRecord("request", resp.Request.URL.String(), b.Bytes()); werr != nil { + if werr := h.writeWARCRecord("request", resp.Request.URL.String(), nil, r); werr != nil { return werr } + // Seek to start; we've written since last read + if _, err := r.Seek(0, io.SeekStart); err != nil { + return err + } + + w.Close() + w, _ = os.OpenFile(r.Name(), os.O_RDWR, 0777) + defer w.Close() + + // Write response body to tmp file + if _, err := io.Copy(w, resp.Body); err != nil { + return err + } + // Dump the response. statusLine := fmt.Sprintf("HTTP/1.1 %s", resp.Status) - respPayload := bytes.Join( - [][]byte{[]byte(statusLine), hdr2str(resp.Header), data}, + respHeader := bytes.Join( + [][]byte{[]byte(statusLine), hdr2str(resp.Header), []byte("")}, []byte{'\r', '\n'}, ) - if werr := h.writeWARCRecord("response", resp.Request.URL.String(), respPayload); werr != nil { + if werr := h.writeWARCRecord("response", resp.Request.URL.String(), respHeader, r); werr != nil { return werr } + // Seek to start; we've written since last read + if _, err := r.Seek(0, io.SeekStart); err != nil { + return err + } + h.numWritten++ - return extractLinks(p, u, depth, resp, nil) + return extractLinks(p, u, depth, resp, r, nil) } func newWarcSaveHandler(w *warc.Writer) (crawl.Handler, error) { @@ -327,6 +379,14 @@ func main() { } } + // Create directory to (temporarily) store response bodies + if _, err := os.Stat("temp"); os.IsNotExist(err) { + err := os.Mkdir("temp", 0700) + if err != nil { + log.Fatal(err) + } + } + // Write seed list (assembled URLs) to seed_urls file f, err := os.Create("seed_urls") if err != nil { |