diff options
author | ale <ale@incal.net> | 2017-12-19 09:05:37 +0000 |
---|---|---|
committer | ale <ale@incal.net> | 2017-12-19 09:05:37 +0000 |
commit | ddecd3760f89ef9ef7765bf445024c402ab4ad5a (patch) | |
tree | 9af76df39803c62d5657fea195d7f1924a07eb7d /crawler.go | |
parent | 03f5e29656ffdbcca651e9839bcfad6661e4c4e0 (diff) | |
download | crawl-ddecd3760f89ef9ef7765bf445024c402ab4ad5a.tar.gz crawl-ddecd3760f89ef9ef7765bf445024c402ab4ad5a.zip |
Exit gracefully on signals
Diffstat (limited to 'crawler.go')
-rw-r--r-- | crawler.go | 38 |
1 files changed, 33 insertions, 5 deletions
@@ -10,6 +10,7 @@ import ( "net/http" "net/url" "sync" + "sync/atomic" "time" "github.com/PuerkitoBio/purell" @@ -136,6 +137,9 @@ type Crawler struct { fetcher Fetcher handler Handler + stopCh chan bool + stopping atomic.Value + enqueueMx sync.Mutex } @@ -169,16 +173,25 @@ func (c *Crawler) Enqueue(link Outlink, depth int) { c.db.Write(wb, nil) } +var scanInterval = 1 * time.Second + // Scan the queue for URLs until there are no more. func (c *Crawler) process() <-chan queuePair { - ch := make(chan queuePair) + ch := make(chan queuePair, 100) go func() { - for range time.Tick(2 * time.Second) { - if err := c.queue.Scan(ch); err != nil { - break + t := time.NewTicker(scanInterval) + defer t.Stop() + defer close(ch) + for { + select { + case <-t.C: + if err := c.queue.Scan(ch); err != nil { + return + } + case <-c.stopCh: + return } } - close(ch) }() return ch } @@ -186,6 +199,13 @@ func (c *Crawler) process() <-chan queuePair { // Main worker loop. func (c *Crawler) urlHandler(queue <-chan queuePair) { for p := range queue { + // Stop flag needs to short-circuit the queue (which + // is buffered), or it's going to take a while before + // we actually stop. + if c.stopping.Load().(bool) { + return + } + // Retrieve the URLInfo object from the crawl db. // Ignore errors, we can work with an empty object. urlkey := []byte(fmt.Sprintf("url/%s", p.URL)) @@ -254,7 +274,9 @@ func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler handler: h, seeds: seeds, scope: scope, + stopCh: make(chan bool), } + c.stopping.Store(false) // Recover active tasks. c.queue.Recover() @@ -283,6 +305,12 @@ func (c *Crawler) Run(concurrency int) { wg.Wait() } +// Stop a running crawl. This will cause a running Run function to return. +func (c *Crawler) Stop() { + c.stopping.Store(true) + close(c.stopCh) +} + // Close the database and release resources associated with the crawler state. func (c *Crawler) Close() { c.db.Close() |