aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorJordan <me@jordan.im>2022-03-24 09:08:13 -0700
committerJordan <me@jordan.im>2022-03-24 09:08:13 -0700
commit6355aa4310ff0c32b056580e812ca6f0e2a5ee2f (patch)
tree3a3008d4d50e5e19f6805b1e1e03460e202048f9 /cmd
parenta39310f111cef49ff630cc12fdebabc4df37ec28 (diff)
downloadcrawl-6355aa4310ff0c32b056580e812ca6f0e2a5ee2f.tar.gz
crawl-6355aa4310ff0c32b056580e812ca6f0e2a5ee2f.zip
links, crawl: dramatically reduce memory usage
to prevent excessive memory usage and OOM crashes, rather than store and pass around response bodies in memory buffers, let's store them temporarily on the filesystem wget-style and delete them when processed
Diffstat (limited to 'cmd')
-rw-r--r--cmd/crawl/crawl.go86
1 files changed, 73 insertions, 13 deletions
diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go
index 8f84837..8f28bc4 100644
--- a/cmd/crawl/crawl.go
+++ b/cmd/crawl/crawl.go
@@ -113,8 +113,8 @@ func (f dnsMapFlag) Set(s string) error {
return nil
}
-func extractLinks(p crawl.Publisher, u string, depth int, resp *http.Response, _ error) error {
- links, err := analysis.GetLinks(resp)
+func extractLinks(p crawl.Publisher, u string, depth int, resp *http.Response, body *os.File, _ error) error {
+ links, err := analysis.GetLinks(resp, body)
if err != nil {
// This is not a fatal error, just a bad web page.
return nil
@@ -141,26 +141,51 @@ type warcSaveHandler struct {
numWritten int
}
-func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error {
+func (h *warcSaveHandler) writeWARCRecord(typ, uri string, header []byte, body *os.File) error {
hdr := warc.NewHeader()
hdr.Set("WARC-Type", typ)
hdr.Set("WARC-Target-URI", uri)
hdr.Set("WARC-Warcinfo-ID", h.warcInfoID)
- hdr.Set("Content-Length", strconv.Itoa(len(data)))
- w, err := h.warc.NewRecord(hdr)
+ var size int64
+ fi, err := body.Stat()
if err != nil {
+ fmt.Println(err)
return err
}
- if _, err := w.Write(data); err != nil {
+ size += fi.Size()
+ size += int64(len(string(header)))
+ hdr.Set("Content-Length", strconv.FormatInt(size, 10))
+
+ w, err := h.warc.NewRecord(hdr)
+ if err != nil {
+ fmt.Println(err)
return err
}
+ if header != nil {
+ if _, err := w.Write(header); err != nil {
+ fmt.Println(err)
+ return err
+ }
+ }
+
+ scanner := bufio.NewScanner(body)
+ scanner.Split(bufio.ScanBytes)
+ for scanner.Scan() {
+ if _, err := w.Write(scanner.Bytes()); err != nil {
+ fmt.Println(err)
+ return err
+ }
+ }
return w.Close()
}
+
func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, tag, depth int, resp *http.Response, _ error) error {
// Read the response body (so we can save it to the WARC
// output) and replace it with a buffer.
+
+ /*
data, derr := ioutil.ReadAll(resp.Body)
if derr != nil {
// Errors at this stage are usually transport-level errors,
@@ -168,29 +193,56 @@ func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, tag, depth int, re
return crawl.ErrRetryRequest
}
resp.Body = ioutil.NopCloser(bytes.NewReader(data))
+ */
+
+ // Keep temporary file to store request/response data
+ r, _ := ioutil.TempFile("temp", "crawl")
+ defer r.Close()
+
+ w, _ := os.OpenFile(r.Name(), os.O_RDWR, 0777)
+ defer w.Close()
+ defer os.Remove(r.Name())
// Dump the request to the WARC output.
- var b bytes.Buffer
- if werr := resp.Request.Write(&b); werr != nil {
+ if werr := resp.Request.Write(w); werr != nil {
return werr
}
- if werr := h.writeWARCRecord("request", resp.Request.URL.String(), b.Bytes()); werr != nil {
+ if werr := h.writeWARCRecord("request", resp.Request.URL.String(), nil, r); werr != nil {
return werr
}
+ // Seek to start; we've written since last read
+ if _, err := r.Seek(0, io.SeekStart); err != nil {
+ return err
+ }
+
+ w.Close()
+ w, _ = os.OpenFile(r.Name(), os.O_RDWR, 0777)
+ defer w.Close()
+
+ // Write response body to tmp file
+ if _, err := io.Copy(w, resp.Body); err != nil {
+ return err
+ }
+
// Dump the response.
statusLine := fmt.Sprintf("HTTP/1.1 %s", resp.Status)
- respPayload := bytes.Join(
- [][]byte{[]byte(statusLine), hdr2str(resp.Header), data},
+ respHeader := bytes.Join(
+ [][]byte{[]byte(statusLine), hdr2str(resp.Header), []byte("")},
[]byte{'\r', '\n'},
)
- if werr := h.writeWARCRecord("response", resp.Request.URL.String(), respPayload); werr != nil {
+ if werr := h.writeWARCRecord("response", resp.Request.URL.String(), respHeader, r); werr != nil {
return werr
}
+ // Seek to start; we've written since last read
+ if _, err := r.Seek(0, io.SeekStart); err != nil {
+ return err
+ }
+
h.numWritten++
- return extractLinks(p, u, depth, resp, nil)
+ return extractLinks(p, u, depth, resp, r, nil)
}
func newWarcSaveHandler(w *warc.Writer) (crawl.Handler, error) {
@@ -327,6 +379,14 @@ func main() {
}
}
+ // Create directory to (temporarily) store response bodies
+ if _, err := os.Stat("temp"); os.IsNotExist(err) {
+ err := os.Mkdir("temp", 0700)
+ if err != nil {
+ log.Fatal(err)
+ }
+ }
+
// Write seed list (assembled URLs) to seed_urls file
f, err := os.Create("seed_urls")
if err != nil {