aboutsummaryrefslogtreecommitdiff
path: root/worker
diff options
context:
space:
mode:
authorKoni Marti <koni.marti@gmail.com>2024-01-20 11:31:03 +0100
committerRobin Jarry <robin@jarry.cc>2024-01-25 21:55:17 +0100
commit1980744f7bf9e147abf649d37a2fa7dddd4e7254 (patch)
treedba9967e81d4c65c287dd5c9c1e26c5f269891b7 /worker
parent3452c9233f623c4049098b66911ae82fc14e119c (diff)
downloadaerc-1980744f7bf9e147abf649d37a2fa7dddd4e7254.tar.gz
aerc-1980744f7bf9e147abf649d37a2fa7dddd4e7254.zip
idler: improve the imap idler
Rewrite the imap idler to make it more fault tolerant and prevent hangs (and possibly short writes). Fixes: https://todo.sr.ht/~rjarry/aerc/208 Signed-off-by: Koni Marti <koni.marti@gmail.com> Tested-by: Karel Balej <balejk@matfyz.cz> Acked-by: Robin Jarry <robin@jarry.cc>
Diffstat (limited to 'worker')
-rw-r--r--worker/imap/configure.go2
-rw-r--r--worker/imap/extensions/xgmext/client.go8
-rw-r--r--worker/imap/idler.go203
-rw-r--r--worker/imap/observer.go3
-rw-r--r--worker/imap/worker.go98
-rw-r--r--worker/middleware/gmailworker.go24
6 files changed, 182 insertions, 156 deletions
diff --git a/worker/imap/configure.go b/worker/imap/configure.go
index 49464689..9d4a02de 100644
--- a/worker/imap/configure.go
+++ b/worker/imap/configure.go
@@ -166,7 +166,7 @@ func (w *IMAPWorker) handleConfigure(msg *types.Configure) error {
if w.config.cacheEnabled {
w.initCacheDb(msg.Config.Name)
}
- w.idler = newIdler(w.config, w.worker)
+ w.idler = newIdler(w.config, w.worker, w.executeIdle)
w.observer = newObserver(w.config, w.worker)
if name, ok := msg.Config.Params["folder-map"]; ok {
diff --git a/worker/imap/extensions/xgmext/client.go b/worker/imap/extensions/xgmext/client.go
index 3107e642..62d081b2 100644
--- a/worker/imap/extensions/xgmext/client.go
+++ b/worker/imap/extensions/xgmext/client.go
@@ -46,7 +46,13 @@ func (h handler) fetchThreadIds(uids []uint32) ([]string, error) {
go func() {
defer log.PanicHandler()
for msg := range messages {
- m[msg.Items[thriditem].(string)] = struct{}{}
+ if msg == nil {
+ continue
+ }
+ item, ok := msg.Items[thriditem].(string)
+ if ok {
+ m[item] = struct{}{}
+ }
}
done <- nil
}()
diff --git a/worker/imap/idler.go b/worker/imap/idler.go
index aa61776d..369ebd2e 100644
--- a/worker/imap/idler.go
+++ b/worker/imap/idler.go
@@ -2,156 +2,131 @@ package imap
import (
"fmt"
- "sync"
"time"
"git.sr.ht/~rjarry/aerc/log"
"git.sr.ht/~rjarry/aerc/worker/types"
"github.com/emersion/go-imap"
- "github.com/emersion/go-imap/client"
)
-var (
- errIdleTimeout = fmt.Errorf("idle timeout")
- errIdleModeHangs = fmt.Errorf("idle mode hangs; waiting to reconnect")
-)
+var errIdleTimeout = fmt.Errorf("idle timeout")
// idler manages the idle mode of the imap server. Enter idle mode if there's
// no other task and leave idle mode when a new task arrives. Idle mode is only
// used when the client is ready and connected. After a connection loss, make
// sure that idling returns gracefully and the worker remains responsive.
type idler struct {
- sync.Mutex
- config imapConfig
- client *imapClient
- worker types.WorkerInteractor
- stop chan struct{}
- done chan error
- waiting bool
- idleing bool
+ client *imapClient
+ debouncer *time.Timer
+ debounce time.Duration
+ timeout time.Duration
+ worker types.WorkerInteractor
+ stop chan struct{}
+ start chan struct{}
+ done chan error
}
-func newIdler(cfg imapConfig, w types.WorkerInteractor) *idler {
- return &idler{config: cfg, worker: w, done: make(chan error)}
+func newIdler(cfg imapConfig, w types.WorkerInteractor, startIdler chan struct{}) *idler {
+ return &idler{
+ debouncer: nil,
+ debounce: cfg.idle_debounce,
+ timeout: cfg.idle_timeout,
+ worker: w,
+ stop: make(chan struct{}),
+ start: startIdler,
+ done: make(chan error),
+ }
}
func (i *idler) SetClient(c *imapClient) {
- i.Lock()
i.client = c
- i.Unlock()
}
-func (i *idler) setWaiting(wait bool) {
- i.Lock()
- i.waiting = wait
- i.Unlock()
-}
-
-func (i *idler) isWaiting() bool {
- i.Lock()
- defer i.Unlock()
- return i.waiting
-}
-
-func (i *idler) isReady() bool {
- i.Lock()
- defer i.Unlock()
- return (!i.waiting && i.client != nil &&
- i.client.State() == imap.SelectedState)
-}
-
-func (i *idler) setIdleing(v bool) {
- i.Lock()
- defer i.Unlock()
- i.idleing = v
-}
-
-func (i *idler) isIdleing() bool {
- i.Lock()
- defer i.Unlock()
- return i.idleing
+func (i *idler) ready() bool {
+ return (i.client != nil && i.client.State() == imap.SelectedState)
}
func (i *idler) Start() {
- switch {
- case i.isReady():
- i.stop = make(chan struct{})
-
- go func() {
- defer log.PanicHandler()
- select {
- case <-i.stop:
- // debounce idle
- i.done <- nil
- case <-time.After(i.config.idle_debounce):
- // enter idle mode
- i.setIdleing(true)
- now := time.Now()
- err := i.client.Idle(i.stop,
- &client.IdleOptions{
- LogoutTimeout: 0,
- PollInterval: 0,
- })
- i.setIdleing(false)
- i.done <- err
- i.log("elapsed idle time: %v", time.Since(now))
- }
- }()
-
- case i.isWaiting():
- i.log("not started: wait for idle to exit")
- default:
- i.log("not started: client not ready")
+ if !i.ready() {
+ return
}
-}
-func (i *idler) Stop() error {
- var reterr error
- switch {
- case i.isReady():
+ select {
+ case <-i.stop:
+ // stop channel is nil (probably after a debounce), we don't
+ // want to close it
+ default:
close(i.stop)
+ }
+
+ // create new stop channel
+ i.stop = make(chan struct{})
+
+ // clear done channel
+ clearing := true
+ for clearing {
select {
- case err := <-i.done:
- if err != nil {
- i.log("<=(idle) with err: %v", err)
- }
- reterr = nil
- case <-time.After(i.config.idle_timeout):
- i.worker.PostMessage(&types.Done{
- Message: types.RespondTo(&types.Disconnect{}),
- }, nil)
-
- i.waitOnIdle()
-
- reterr = errIdleTimeout
+ case <-i.done:
+ continue
+ default:
+ clearing = false
}
- case i.isWaiting():
- reterr = errIdleModeHangs
- default:
- reterr = nil
}
- return reterr
+
+ i.worker.Tracef("idler (start): start idle after debounce")
+ i.debouncer = time.AfterFunc(i.debounce, func() {
+ i.start <- struct{}{}
+ i.worker.Tracef("idler (start): started")
+ })
}
-func (i *idler) waitOnIdle() {
- i.setWaiting(true)
+func (i *idler) Execute() {
+ if !i.ready() {
+ return
+ }
+
+ // we need to call client.Idle in a goroutine since it is blocking call
+ // and we still want to receive messages
go func() {
defer log.PanicHandler()
- err := <-i.done
- if err == nil {
- i.worker.PostMessage(&types.Done{
- Message: types.RespondTo(&types.Connect{}),
- }, nil)
- } else {
- i.log("<=(idle) waited; with err: %v", err)
+
+ start := time.Now()
+ err := i.client.Idle(i.stop, nil)
+ if err != nil {
+ i.worker.Errorf("idle returned error: %v", err)
}
- i.setWaiting(false)
- i.stop = make(chan struct{})
- i.Start()
+ i.worker.Tracef("idler (execute): idleing for %s", time.Since(start))
+
+ i.done <- err
}()
}
-func (i *idler) log(format string, v ...interface{}) {
- msg := fmt.Sprintf(format, v...)
- i.worker.Tracef("idler (%p) [idle:%t,wait:%t] %s", i, i.isIdleing(), i.isWaiting(), msg)
+func (i *idler) Stop() error {
+ if !i.ready() {
+ return nil
+ }
+
+ select {
+ case <-i.stop:
+ i.worker.Debugf("idler (stop): idler already stopped?")
+ return nil
+ default:
+ close(i.stop)
+ }
+
+ if i.debouncer != nil {
+ if i.debouncer.Stop() {
+ i.worker.Tracef("idler (stop): debounced")
+ return nil
+ }
+ }
+
+ select {
+ case err := <-i.done:
+ i.worker.Tracef("idler (stop): idle stopped: %v", err)
+ return err
+ case <-time.After(i.timeout):
+ i.worker.Errorf("idler (stop): cannot stop idle (timeout)")
+ return errIdleTimeout
+ }
}
diff --git a/worker/imap/observer.go b/worker/imap/observer.go
index 7367ff58..7a604a1a 100644
--- a/worker/imap/observer.go
+++ b/worker/imap/observer.go
@@ -105,9 +105,6 @@ func (o *observer) Stop() {
}
func (o *observer) DelayedReconnect() error {
- if o.client == nil {
- return nil
- }
var wait time.Duration
var reterr error
diff --git a/worker/imap/worker.go b/worker/imap/worker.go
index 7ef759d4..391f365a 100644
--- a/worker/imap/worker.go
+++ b/worker/imap/worker.go
@@ -80,29 +80,37 @@ type IMAPWorker struct {
threadAlgorithm sortthread.ThreadAlgorithm
liststatus bool
+
+ executeIdle chan struct{}
}
func NewIMAPWorker(worker *types.Worker) (types.Backend, error) {
return &IMAPWorker{
- updates: make(chan client.Update, 50),
- worker: worker,
- selected: &imap.MailboxStatus{},
- idler: newIdler(imapConfig{}, worker),
- observer: newObserver(imapConfig{}, worker),
- caps: &models.Capabilities{},
+ updates: make(chan client.Update, 50),
+ worker: worker,
+ selected: &imap.MailboxStatus{},
+ idler: nil, // will be set in configure()
+ observer: nil, // will be set in configure()
+ caps: &models.Capabilities{},
+ executeIdle: make(chan struct{}),
}, nil
}
func (w *IMAPWorker) newClient(c *client.Client) {
- c.Updates = w.updates
+ c.Updates = nil
w.client = &imapClient{
c,
sortthread.NewThreadClient(c),
sortthread.NewSortClient(c),
extensions.NewListStatusClient(c),
}
- w.idler.SetClient(w.client)
- w.observer.SetClient(w.client)
+ if w.idler != nil {
+ w.idler.SetClient(w.client)
+ c.Updates = w.updates
+ }
+ if w.observer != nil {
+ w.observer.SetClient(w.client)
+ }
sort, err := w.client.sort.SupportSort()
if err == nil && sort {
w.caps.Sort = true
@@ -125,7 +133,7 @@ func (w *IMAPWorker) newClient(c *client.Client) {
xgmext, err := w.client.Support("X-GM-EXT-1")
if err == nil && xgmext && w.config.useXGMEXT {
w.worker.Debugf("Server Capability found: X-GM-EXT-1")
- w.worker = middleware.NewGmailWorker(w.worker, w.client.Client, w.idler)
+ w.worker = middleware.NewGmailWorker(w.worker, w.client.Client)
}
if err == nil && !xgmext && w.config.useXGMEXT {
w.worker.Infof("X-GM-EXT-1 requested, but it is not supported")
@@ -133,13 +141,6 @@ func (w *IMAPWorker) newClient(c *client.Client) {
}
func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
- defer func() {
- w.idler.Start()
- }()
- if err := w.idler.Stop(); err != nil {
- return err
- }
-
var reterr error // will be returned at the end, needed to support idle
// when client is nil allow only certain messages to be handled
@@ -200,12 +201,14 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
case *types.Disconnect:
w.observer.SetAutoReconnect(false)
w.observer.Stop()
- if w.client == nil || w.client.State() != imap.SelectedState {
+
+ if w.client == nil || (w.client != nil && w.client.State() != imap.SelectedState) {
reterr = errNotConnected
break
}
if err := w.client.Logout(); err != nil {
+ w.terminate()
reterr = err
break
}
@@ -298,10 +301,64 @@ func (w *IMAPWorker) handleImapUpdate(update client.Update) {
}
}
+func (w *IMAPWorker) terminate() {
+ if w.observer != nil {
+ w.observer.Stop()
+ w.observer.SetClient(nil)
+ }
+
+ if w.client != nil {
+ w.client.Updates = nil
+ if err := w.client.Terminate(); err != nil {
+ w.worker.Errorf("could not terminate connection: %v", err)
+ }
+ }
+
+ w.client = nil
+ w.selected = &imap.MailboxStatus{}
+
+ if w.idler != nil {
+ w.idler.SetClient(nil)
+ }
+}
+
+func (w *IMAPWorker) stopIdler() error {
+ if w.idler == nil {
+ return nil
+ }
+
+ if err := w.idler.Stop(); err != nil {
+ w.terminate()
+ w.observer.EmitIfNotConnected()
+ w.worker.Errorf("idler stopped with error:%v", err)
+ return err
+ }
+
+ return nil
+}
+
+func (w *IMAPWorker) startIdler() {
+ if w.idler == nil {
+ return
+ }
+
+ w.idler.Start()
+}
+
func (w *IMAPWorker) Run() {
for {
select {
case msg := <-w.worker.Actions():
+
+ if err := w.stopIdler(); err != nil {
+ w.worker.PostMessage(&types.Error{
+ Message: types.RespondTo(msg),
+ Error: err,
+ }, nil)
+ break
+ }
+ w.worker.Tracef("ready to handle %T", msg)
+
msg = w.worker.ProcessAction(msg)
if err := w.handleMessage(msg); errors.Is(err, errUnsupported) {
@@ -315,8 +372,13 @@ func (w *IMAPWorker) Run() {
}, nil)
}
+ w.startIdler()
+
case update := <-w.updates:
w.handleImapUpdate(update)
+
+ case <-w.executeIdle:
+ w.idler.Execute()
}
}
}
diff --git a/worker/middleware/gmailworker.go b/worker/middleware/gmailworker.go
index 807f7bff..f9924732 100644
--- a/worker/middleware/gmailworker.go
+++ b/worker/middleware/gmailworker.go
@@ -8,20 +8,14 @@ import (
"github.com/emersion/go-imap/client"
)
-type idler interface {
- Start()
- Stop() error
-}
-
type gmailWorker struct {
types.WorkerInteractor
mu sync.Mutex
client *client.Client
- idler idler
}
// NewGmailWorker returns an IMAP middleware for the X-GM-EXT-1 extension
-func NewGmailWorker(base types.WorkerInteractor, c *client.Client, i idler,
+func NewGmailWorker(base types.WorkerInteractor, c *client.Client,
) types.WorkerInteractor {
base.Infof("loading worker middleware: X-GM-EXT-1")
@@ -29,37 +23,30 @@ func NewGmailWorker(base types.WorkerInteractor, c *client.Client, i idler,
for iter := base; iter != nil; iter = iter.Unwrap() {
if g, ok := iter.(*gmailWorker); ok {
base.Infof("already loaded; resetting")
- err := g.reset(c, i)
+ err := g.reset(c)
if err != nil {
base.Errorf("reset failed: %v", err)
}
return base
}
}
- return &gmailWorker{WorkerInteractor: base, client: c, idler: i}
+ return &gmailWorker{WorkerInteractor: base, client: c}
}
func (g *gmailWorker) Unwrap() types.WorkerInteractor {
return g.WorkerInteractor
}
-func (g *gmailWorker) reset(c *client.Client, i idler) error {
+func (g *gmailWorker) reset(c *client.Client) error {
g.mu.Lock()
defer g.mu.Unlock()
g.client = c
- g.idler = i
return nil
}
func (g *gmailWorker) ProcessAction(msg types.WorkerMessage) types.WorkerMessage {
- switch msg := msg.(type) {
- case *types.FetchMessageHeaders:
+ if msg, ok := msg.(*types.FetchMessageHeaders); ok && len(msg.Uids) > 0 {
g.mu.Lock()
- err := g.idler.Stop()
- if err != nil {
- g.Errorf("idler reported an error: %v", err)
- break
- }
handler := xgmext.NewHandler(g.client)
uids, err := handler.FetchEntireThreads(msg.Uids)
@@ -71,7 +58,6 @@ func (g *gmailWorker) ProcessAction(msg types.WorkerMessage) types.WorkerMessage
msg.Uids = uids
}
- g.idler.Start()
g.mu.Unlock()
}
return g.WorkerInteractor.ProcessAction(msg)