aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/syndtr/goleveldb/leveldb/session.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/syndtr/goleveldb/leveldb/session.go')
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/session.go55
1 files changed, 42 insertions, 13 deletions
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session.go b/vendor/github.com/syndtr/goleveldb/leveldb/session.go
index 3f391f9..7310209 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/session.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/session.go
@@ -47,15 +47,24 @@ type session struct {
o *cachedOptions
icmp *iComparer
tops *tOps
- fileRef map[int64]int
manifest *journal.Writer
manifestWriter storage.Writer
manifestFd storage.FileDesc
- stCompPtrs []internalKey // compaction pointers; need external synchronization
- stVersion *version // current version
- vmu sync.Mutex
+ stCompPtrs []internalKey // compaction pointers; need external synchronization
+ stVersion *version // current version
+ ntVersionId int64 // next version id to assign
+ refCh chan *vTask
+ relCh chan *vTask
+ deltaCh chan *vDelta
+ abandon chan int64
+ closeC chan struct{}
+ closeW sync.WaitGroup
+ vmu sync.Mutex
+
+ // Testing fields
+ fileRefCh chan chan map[int64]int // channel used to pass current reference stat
}
// Creates new initialized session instance.
@@ -68,13 +77,21 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
return
}
s = &session{
- stor: newIStorage(stor),
- storLock: storLock,
- fileRef: make(map[int64]int),
+ stor: newIStorage(stor),
+ storLock: storLock,
+ refCh: make(chan *vTask),
+ relCh: make(chan *vTask),
+ deltaCh: make(chan *vDelta),
+ abandon: make(chan int64),
+ fileRefCh: make(chan chan map[int64]int),
+ closeC: make(chan struct{}),
}
s.setOptions(o)
s.tops = newTableOps(s)
- s.setVersion(newVersion(s))
+
+ s.closeW.Add(1)
+ go s.refLoop()
+ s.setVersion(nil, newVersion(s))
s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
return
}
@@ -90,7 +107,11 @@ func (s *session) close() {
}
s.manifest = nil
s.manifestWriter = nil
- s.setVersion(&version{s: s, closing: true})
+ s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionId})
+
+ // Close all background goroutines
+ close(s.closeC)
+ s.closeW.Wait()
}
// Release session lock.
@@ -180,19 +201,27 @@ func (s *session) recover() (err error) {
}
s.manifestFd = fd
- s.setVersion(staging.finish())
+ s.setVersion(rec, staging.finish(false))
s.setNextFileNum(rec.nextFileNum)
s.recordCommited(rec)
return nil
}
// Commit session; need external synchronization.
-func (s *session) commit(r *sessionRecord) (err error) {
+func (s *session) commit(r *sessionRecord, trivial bool) (err error) {
v := s.version()
defer v.release()
// spawn new version based on current version
- nv := v.spawn(r)
+ nv := v.spawn(r, trivial)
+
+ // abandon useless version id to prevent blocking version processing loop.
+ defer func() {
+ if err != nil {
+ s.abandon <- nv.id
+ s.logf("commit@abandon useless vid D%d", nv.id)
+ }
+ }()
if s.manifest == nil {
// manifest journal writer not yet created, create one
@@ -203,7 +232,7 @@ func (s *session) commit(r *sessionRecord) (err error) {
// finally, apply new version if no error rise
if err == nil {
- s.setVersion(nv)
+ s.setVersion(r, nv)
}
return