aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/syndtr/goleveldb/leveldb/session_util.go')
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/session_util.go271
1 files changed, 271 insertions, 0 deletions
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go
new file mode 100644
index 0000000..9232893
--- /dev/null
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go
@@ -0,0 +1,271 @@
+// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
+// All rights reserved.
+//
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package leveldb
+
+import (
+ "fmt"
+ "sync/atomic"
+
+ "github.com/syndtr/goleveldb/leveldb/journal"
+ "github.com/syndtr/goleveldb/leveldb/storage"
+)
+
+// Logging.
+
+type dropper struct {
+ s *session
+ fd storage.FileDesc
+}
+
+func (d dropper) Drop(err error) {
+ if e, ok := err.(*journal.ErrCorrupted); ok {
+ d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason)
+ } else {
+ d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err)
+ }
+}
+
+func (s *session) log(v ...interface{}) { s.stor.Log(fmt.Sprint(v...)) }
+func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) }
+
+// File utils.
+
+func (s *session) newTemp() storage.FileDesc {
+ num := atomic.AddInt64(&s.stTempFileNum, 1) - 1
+ return storage.FileDesc{storage.TypeTemp, num}
+}
+
+func (s *session) addFileRef(fd storage.FileDesc, ref int) int {
+ ref += s.fileRef[fd.Num]
+ if ref > 0 {
+ s.fileRef[fd.Num] = ref
+ } else if ref == 0 {
+ delete(s.fileRef, fd.Num)
+ } else {
+ panic(fmt.Sprintf("negative ref: %v", fd))
+ }
+ return ref
+}
+
+// Session state.
+
+// Get current version. This will incr version ref, must call
+// version.release (exactly once) after use.
+func (s *session) version() *version {
+ s.vmu.Lock()
+ defer s.vmu.Unlock()
+ s.stVersion.incref()
+ return s.stVersion
+}
+
+func (s *session) tLen(level int) int {
+ s.vmu.Lock()
+ defer s.vmu.Unlock()
+ return s.stVersion.tLen(level)
+}
+
+// Set current version to v.
+func (s *session) setVersion(v *version) {
+ s.vmu.Lock()
+ defer s.vmu.Unlock()
+ // Hold by session. It is important to call this first before releasing
+ // current version, otherwise the still used files might get released.
+ v.incref()
+ if s.stVersion != nil {
+ // Release current version.
+ s.stVersion.releaseNB()
+ }
+ s.stVersion = v
+}
+
+// Get current unused file number.
+func (s *session) nextFileNum() int64 {
+ return atomic.LoadInt64(&s.stNextFileNum)
+}
+
+// Set current unused file number to num.
+func (s *session) setNextFileNum(num int64) {
+ atomic.StoreInt64(&s.stNextFileNum, num)
+}
+
+// Mark file number as used.
+func (s *session) markFileNum(num int64) {
+ nextFileNum := num + 1
+ for {
+ old, x := s.stNextFileNum, nextFileNum
+ if old > x {
+ x = old
+ }
+ if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
+ break
+ }
+ }
+}
+
+// Allocate a file number.
+func (s *session) allocFileNum() int64 {
+ return atomic.AddInt64(&s.stNextFileNum, 1) - 1
+}
+
+// Reuse given file number.
+func (s *session) reuseFileNum(num int64) {
+ for {
+ old, x := s.stNextFileNum, num
+ if old != x+1 {
+ x = old
+ }
+ if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
+ break
+ }
+ }
+}
+
+// Set compaction ptr at given level; need external synchronization.
+func (s *session) setCompPtr(level int, ik internalKey) {
+ if level >= len(s.stCompPtrs) {
+ newCompPtrs := make([]internalKey, level+1)
+ copy(newCompPtrs, s.stCompPtrs)
+ s.stCompPtrs = newCompPtrs
+ }
+ s.stCompPtrs[level] = append(internalKey{}, ik...)
+}
+
+// Get compaction ptr at given level; need external synchronization.
+func (s *session) getCompPtr(level int) internalKey {
+ if level >= len(s.stCompPtrs) {
+ return nil
+ }
+ return s.stCompPtrs[level]
+}
+
+// Manifest related utils.
+
+// Fill given session record obj with current states; need external
+// synchronization.
+func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
+ r.setNextFileNum(s.nextFileNum())
+
+ if snapshot {
+ if !r.has(recJournalNum) {
+ r.setJournalNum(s.stJournalNum)
+ }
+
+ if !r.has(recSeqNum) {
+ r.setSeqNum(s.stSeqNum)
+ }
+
+ for level, ik := range s.stCompPtrs {
+ if ik != nil {
+ r.addCompPtr(level, ik)
+ }
+ }
+
+ r.setComparer(s.icmp.uName())
+ }
+}
+
+// Mark if record has been committed, this will update session state;
+// need external synchronization.
+func (s *session) recordCommited(rec *sessionRecord) {
+ if rec.has(recJournalNum) {
+ s.stJournalNum = rec.journalNum
+ }
+
+ if rec.has(recPrevJournalNum) {
+ s.stPrevJournalNum = rec.prevJournalNum
+ }
+
+ if rec.has(recSeqNum) {
+ s.stSeqNum = rec.seqNum
+ }
+
+ for _, r := range rec.compPtrs {
+ s.setCompPtr(r.level, internalKey(r.ikey))
+ }
+}
+
+// Create a new manifest file; need external synchronization.
+func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
+ fd := storage.FileDesc{storage.TypeManifest, s.allocFileNum()}
+ writer, err := s.stor.Create(fd)
+ if err != nil {
+ return
+ }
+ jw := journal.NewWriter(writer)
+
+ if v == nil {
+ v = s.version()
+ defer v.release()
+ }
+ if rec == nil {
+ rec = &sessionRecord{}
+ }
+ s.fillRecord(rec, true)
+ v.fillRecord(rec)
+
+ defer func() {
+ if err == nil {
+ s.recordCommited(rec)
+ if s.manifest != nil {
+ s.manifest.Close()
+ }
+ if s.manifestWriter != nil {
+ s.manifestWriter.Close()
+ }
+ if !s.manifestFd.Zero() {
+ s.stor.Remove(s.manifestFd)
+ }
+ s.manifestFd = fd
+ s.manifestWriter = writer
+ s.manifest = jw
+ } else {
+ writer.Close()
+ s.stor.Remove(fd)
+ s.reuseFileNum(fd.Num)
+ }
+ }()
+
+ w, err := jw.Next()
+ if err != nil {
+ return
+ }
+ err = rec.encode(w)
+ if err != nil {
+ return
+ }
+ err = jw.Flush()
+ if err != nil {
+ return
+ }
+ err = s.stor.SetMeta(fd)
+ return
+}
+
+// Flush record to disk.
+func (s *session) flushManifest(rec *sessionRecord) (err error) {
+ s.fillRecord(rec, false)
+ w, err := s.manifest.Next()
+ if err != nil {
+ return
+ }
+ err = rec.encode(w)
+ if err != nil {
+ return
+ }
+ err = s.manifest.Flush()
+ if err != nil {
+ return
+ }
+ if !s.o.GetNoSync() {
+ err = s.manifestWriter.Sync()
+ if err != nil {
+ return
+ }
+ }
+ s.recordCommited(rec)
+ return
+}