diff options
Diffstat (limited to 'vendor/github.com/syndtr/goleveldb/leveldb/session.go')
-rw-r--r-- | vendor/github.com/syndtr/goleveldb/leveldb/session.go | 55 |
1 files changed, 42 insertions, 13 deletions
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session.go b/vendor/github.com/syndtr/goleveldb/leveldb/session.go index 3f391f9..7310209 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session.go @@ -47,15 +47,24 @@ type session struct { o *cachedOptions icmp *iComparer tops *tOps - fileRef map[int64]int manifest *journal.Writer manifestWriter storage.Writer manifestFd storage.FileDesc - stCompPtrs []internalKey // compaction pointers; need external synchronization - stVersion *version // current version - vmu sync.Mutex + stCompPtrs []internalKey // compaction pointers; need external synchronization + stVersion *version // current version + ntVersionId int64 // next version id to assign + refCh chan *vTask + relCh chan *vTask + deltaCh chan *vDelta + abandon chan int64 + closeC chan struct{} + closeW sync.WaitGroup + vmu sync.Mutex + + // Testing fields + fileRefCh chan chan map[int64]int // channel used to pass current reference stat } // Creates new initialized session instance. @@ -68,13 +77,21 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { return } s = &session{ - stor: newIStorage(stor), - storLock: storLock, - fileRef: make(map[int64]int), + stor: newIStorage(stor), + storLock: storLock, + refCh: make(chan *vTask), + relCh: make(chan *vTask), + deltaCh: make(chan *vDelta), + abandon: make(chan int64), + fileRefCh: make(chan chan map[int64]int), + closeC: make(chan struct{}), } s.setOptions(o) s.tops = newTableOps(s) - s.setVersion(newVersion(s)) + + s.closeW.Add(1) + go s.refLoop() + s.setVersion(nil, newVersion(s)) s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed") return } @@ -90,7 +107,11 @@ func (s *session) close() { } s.manifest = nil s.manifestWriter = nil - s.setVersion(&version{s: s, closing: true}) + s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionId}) + + // Close all background goroutines + close(s.closeC) + s.closeW.Wait() } // Release session lock. @@ -180,19 +201,27 @@ func (s *session) recover() (err error) { } s.manifestFd = fd - s.setVersion(staging.finish()) + s.setVersion(rec, staging.finish(false)) s.setNextFileNum(rec.nextFileNum) s.recordCommited(rec) return nil } // Commit session; need external synchronization. -func (s *session) commit(r *sessionRecord) (err error) { +func (s *session) commit(r *sessionRecord, trivial bool) (err error) { v := s.version() defer v.release() // spawn new version based on current version - nv := v.spawn(r) + nv := v.spawn(r, trivial) + + // abandon useless version id to prevent blocking version processing loop. + defer func() { + if err != nil { + s.abandon <- nv.id + s.logf("commit@abandon useless vid D%d", nv.id) + } + }() if s.manifest == nil { // manifest journal writer not yet created, create one @@ -203,7 +232,7 @@ func (s *session) commit(r *sessionRecord) (err error) { // finally, apply new version if no error rise if err == nil { - s.setVersion(nv) + s.setVersion(r, nv) } return |