aboutsummaryrefslogtreecommitdiff
path: root/crawler.go
diff options
context:
space:
mode:
authorale <ale@incal.net>2017-12-19 09:05:37 +0000
committerale <ale@incal.net>2017-12-19 09:05:37 +0000
commitddecd3760f89ef9ef7765bf445024c402ab4ad5a (patch)
tree9af76df39803c62d5657fea195d7f1924a07eb7d /crawler.go
parent03f5e29656ffdbcca651e9839bcfad6661e4c4e0 (diff)
downloadcrawl-ddecd3760f89ef9ef7765bf445024c402ab4ad5a.tar.gz
crawl-ddecd3760f89ef9ef7765bf445024c402ab4ad5a.zip
Exit gracefully on signals
Diffstat (limited to 'crawler.go')
-rw-r--r--crawler.go38
1 files changed, 33 insertions, 5 deletions
diff --git a/crawler.go b/crawler.go
index b3c4a7b..aef628f 100644
--- a/crawler.go
+++ b/crawler.go
@@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"sync"
+ "sync/atomic"
"time"
"github.com/PuerkitoBio/purell"
@@ -136,6 +137,9 @@ type Crawler struct {
fetcher Fetcher
handler Handler
+ stopCh chan bool
+ stopping atomic.Value
+
enqueueMx sync.Mutex
}
@@ -169,16 +173,25 @@ func (c *Crawler) Enqueue(link Outlink, depth int) {
c.db.Write(wb, nil)
}
+var scanInterval = 1 * time.Second
+
// Scan the queue for URLs until there are no more.
func (c *Crawler) process() <-chan queuePair {
- ch := make(chan queuePair)
+ ch := make(chan queuePair, 100)
go func() {
- for range time.Tick(2 * time.Second) {
- if err := c.queue.Scan(ch); err != nil {
- break
+ t := time.NewTicker(scanInterval)
+ defer t.Stop()
+ defer close(ch)
+ for {
+ select {
+ case <-t.C:
+ if err := c.queue.Scan(ch); err != nil {
+ return
+ }
+ case <-c.stopCh:
+ return
}
}
- close(ch)
}()
return ch
}
@@ -186,6 +199,13 @@ func (c *Crawler) process() <-chan queuePair {
// Main worker loop.
func (c *Crawler) urlHandler(queue <-chan queuePair) {
for p := range queue {
+ // Stop flag needs to short-circuit the queue (which
+ // is buffered), or it's going to take a while before
+ // we actually stop.
+ if c.stopping.Load().(bool) {
+ return
+ }
+
// Retrieve the URLInfo object from the crawl db.
// Ignore errors, we can work with an empty object.
urlkey := []byte(fmt.Sprintf("url/%s", p.URL))
@@ -254,7 +274,9 @@ func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler
handler: h,
seeds: seeds,
scope: scope,
+ stopCh: make(chan bool),
}
+ c.stopping.Store(false)
// Recover active tasks.
c.queue.Recover()
@@ -283,6 +305,12 @@ func (c *Crawler) Run(concurrency int) {
wg.Wait()
}
+// Stop a running crawl. This will cause a running Run function to return.
+func (c *Crawler) Stop() {
+ c.stopping.Store(true)
+ close(c.stopCh)
+}
+
// Close the database and release resources associated with the crawler state.
func (c *Crawler) Close() {
c.db.Close()