aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/syndtr/goleveldb
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/syndtr/goleveldb')
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/db.go79
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go40
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/db_write.go11
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/session.go4
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/storage.go63
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go300
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_plan9.go4
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go12
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go8
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go8
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/table.go2
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/util.go2
12 files changed, 405 insertions, 128 deletions
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db.go b/vendor/github.com/syndtr/goleveldb/leveldb/db.go
index ea5595e..e7ac065 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/db.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/db.go
@@ -35,6 +35,7 @@ type DB struct {
// Stats. Need 64-bit alignment.
cWriteDelay int64 // The cumulative duration of write delays
cWriteDelayN int32 // The cumulative number of write delays
+ inWritePaused int32 // The indicator whether write operation is paused by compaction
aliveSnaps, aliveIters int32
// Session.
@@ -906,6 +907,8 @@ func (db *DB) GetSnapshot() (*Snapshot, error) {
// Returns the number of files at level 'n'.
// leveldb.stats
// Returns statistics of the underlying DB.
+// leveldb.iostats
+// Returns statistics of effective disk read and write.
// leveldb.writedelay
// Returns cumulative write delay caused by compaction.
// leveldb.sstables
@@ -959,9 +962,14 @@ func (db *DB) GetProperty(name string) (value string, err error) {
level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
float64(read)/1048576.0, float64(write)/1048576.0)
}
+ case p == "iostats":
+ value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f",
+ float64(db.s.stor.reads())/1048576.0,
+ float64(db.s.stor.writes())/1048576.0)
case p == "writedelay":
writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay))
- value = fmt.Sprintf("DelayN:%d Delay:%s", writeDelayN, writeDelay)
+ paused := atomic.LoadInt32(&db.inWritePaused) == 1
+ value = fmt.Sprintf("DelayN:%d Delay:%s Paused:%t", writeDelayN, writeDelay, paused)
case p == "sstables":
for level, tables := range v.levels {
value += fmt.Sprintf("--- level %d ---\n", level)
@@ -990,6 +998,75 @@ func (db *DB) GetProperty(name string) (value string, err error) {
return
}
+// DBStats is database statistics.
+type DBStats struct {
+ WriteDelayCount int32
+ WriteDelayDuration time.Duration
+ WritePaused bool
+
+ AliveSnapshots int32
+ AliveIterators int32
+
+ IOWrite uint64
+ IORead uint64
+
+ BlockCacheSize int
+ OpenedTablesCount int
+
+ LevelSizes []int64
+ LevelTablesCounts []int
+ LevelRead []int64
+ LevelWrite []int64
+ LevelDurations []time.Duration
+}
+
+// Stats populates s with database statistics.
+func (db *DB) Stats(s *DBStats) error {
+ err := db.ok()
+ if err != nil {
+ return err
+ }
+
+ s.IORead = db.s.stor.reads()
+ s.IOWrite = db.s.stor.writes()
+ s.WriteDelayCount = atomic.LoadInt32(&db.cWriteDelayN)
+ s.WriteDelayDuration = time.Duration(atomic.LoadInt64(&db.cWriteDelay))
+ s.WritePaused = atomic.LoadInt32(&db.inWritePaused) == 1
+
+ s.OpenedTablesCount = db.s.tops.cache.Size()
+ if db.s.tops.bcache != nil {
+ s.BlockCacheSize = db.s.tops.bcache.Size()
+ } else {
+ s.BlockCacheSize = 0
+ }
+
+ s.AliveIterators = atomic.LoadInt32(&db.aliveIters)
+ s.AliveSnapshots = atomic.LoadInt32(&db.aliveSnaps)
+
+ s.LevelDurations = s.LevelDurations[:0]
+ s.LevelRead = s.LevelRead[:0]
+ s.LevelWrite = s.LevelWrite[:0]
+ s.LevelSizes = s.LevelSizes[:0]
+ s.LevelTablesCounts = s.LevelTablesCounts[:0]
+
+ v := db.s.version()
+ defer v.release()
+
+ for level, tables := range v.levels {
+ duration, read, write := db.compStats.getStat(level)
+ if len(tables) == 0 && duration == 0 {
+ continue
+ }
+ s.LevelDurations = append(s.LevelDurations, duration)
+ s.LevelRead = append(s.LevelRead, read)
+ s.LevelWrite = append(s.LevelWrite, write)
+ s.LevelSizes = append(s.LevelSizes, tables.size())
+ s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables))
+ }
+
+ return nil
+}
+
// SizeOf calculates approximate sizes of the given key ranges.
// The length of the returned sizes are equal with the length of the given
// ranges. The returned sizes measure storage space usage, so if the user
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go
index b6563e8..28e5090 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go
@@ -640,6 +640,16 @@ func (db *DB) tableNeedCompaction() bool {
return v.needCompaction()
}
+// resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted.
+func (db *DB) resumeWrite() bool {
+ v := db.s.version()
+ defer v.release()
+ if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() {
+ return true
+ }
+ return false
+}
+
func (db *DB) pauseCompaction(ch chan<- struct{}) {
select {
case ch <- struct{}{}:
@@ -653,6 +663,7 @@ type cCmd interface {
}
type cAuto struct {
+ // Note for table compaction, an empty ackC represents it's a compaction waiting command.
ackC chan<- error
}
@@ -765,8 +776,10 @@ func (db *DB) mCompaction() {
}
func (db *DB) tCompaction() {
- var x cCmd
- var ackQ []cCmd
+ var (
+ x cCmd
+ ackQ, waitQ []cCmd
+ )
defer func() {
if x := recover(); x != nil {
@@ -778,6 +791,10 @@ func (db *DB) tCompaction() {
ackQ[i].ack(ErrClosed)
ackQ[i] = nil
}
+ for i := range waitQ {
+ waitQ[i].ack(ErrClosed)
+ waitQ[i] = nil
+ }
if x != nil {
x.ack(ErrClosed)
}
@@ -795,12 +812,25 @@ func (db *DB) tCompaction() {
return
default:
}
+ // Resume write operation as soon as possible.
+ if len(waitQ) > 0 && db.resumeWrite() {
+ for i := range waitQ {
+ waitQ[i].ack(nil)
+ waitQ[i] = nil
+ }
+ waitQ = waitQ[:0]
+ }
} else {
for i := range ackQ {
ackQ[i].ack(nil)
ackQ[i] = nil
}
ackQ = ackQ[:0]
+ for i := range waitQ {
+ waitQ[i].ack(nil)
+ waitQ[i] = nil
+ }
+ waitQ = waitQ[:0]
select {
case x = <-db.tcompCmdC:
case ch := <-db.tcompPauseC:
@@ -813,7 +843,11 @@ func (db *DB) tCompaction() {
if x != nil {
switch cmd := x.(type) {
case cAuto:
- ackQ = append(ackQ, x)
+ if cmd.ackC != nil {
+ waitQ = append(waitQ, x)
+ } else {
+ ackQ = append(ackQ, x)
+ }
case cRange:
x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
default:
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go
index 3662d95..db0c1be 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go
@@ -89,7 +89,11 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
return false
case tLen >= pauseTrigger:
delayed = true
+ // Set the write paused flag explicitly.
+ atomic.StoreInt32(&db.inWritePaused, 1)
err = db.compTriggerWait(db.tcompCmdC)
+ // Unset the write paused flag.
+ atomic.StoreInt32(&db.inWritePaused, 0)
if err != nil {
return false
}
@@ -146,7 +150,7 @@ func (db *DB) unlockWrite(overflow bool, merged int, err error) {
}
}
-// ourBatch if defined should equal with batch.
+// ourBatch is batch that we can modify.
func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
// Try to flush memdb. This method would also trying to throttle writes
// if it is too fast and compaction cannot catch-up.
@@ -215,6 +219,11 @@ func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
}
}
+ // Release ourBatch if any.
+ if ourBatch != nil {
+ defer db.batchPool.Put(ourBatch)
+ }
+
// Seq number.
seq := db.seq + 1
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session.go b/vendor/github.com/syndtr/goleveldb/leveldb/session.go
index ad68a87..3f391f9 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/session.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/session.go
@@ -42,7 +42,7 @@ type session struct {
stTempFileNum int64
stSeqNum uint64 // last mem compacted seq; need external synchronization
- stor storage.Storage
+ stor *iStorage
storLock storage.Locker
o *cachedOptions
icmp *iComparer
@@ -68,7 +68,7 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
return
}
s = &session{
- stor: stor,
+ stor: newIStorage(stor),
storLock: storLock,
fileRef: make(map[int64]int),
}
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage.go
new file mode 100644
index 0000000..d45fb5d
--- /dev/null
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage.go
@@ -0,0 +1,63 @@
+package leveldb
+
+import (
+ "github.com/syndtr/goleveldb/leveldb/storage"
+ "sync/atomic"
+)
+
+type iStorage struct {
+ storage.Storage
+ read uint64
+ write uint64
+}
+
+func (c *iStorage) Open(fd storage.FileDesc) (storage.Reader, error) {
+ r, err := c.Storage.Open(fd)
+ return &iStorageReader{r, c}, err
+}
+
+func (c *iStorage) Create(fd storage.FileDesc) (storage.Writer, error) {
+ w, err := c.Storage.Create(fd)
+ return &iStorageWriter{w, c}, err
+}
+
+func (c *iStorage) reads() uint64 {
+ return atomic.LoadUint64(&c.read)
+}
+
+func (c *iStorage) writes() uint64 {
+ return atomic.LoadUint64(&c.write)
+}
+
+// newIStorage returns the given storage wrapped by iStorage.
+func newIStorage(s storage.Storage) *iStorage {
+ return &iStorage{s, 0, 0}
+}
+
+type iStorageReader struct {
+ storage.Reader
+ c *iStorage
+}
+
+func (r *iStorageReader) Read(p []byte) (n int, err error) {
+ n, err = r.Reader.Read(p)
+ atomic.AddUint64(&r.c.read, uint64(n))
+ return n, err
+}
+
+func (r *iStorageReader) ReadAt(p []byte, off int64) (n int, err error) {
+ n, err = r.Reader.ReadAt(p, off)
+ atomic.AddUint64(&r.c.read, uint64(n))
+ return n, err
+}
+
+type iStorageWriter struct {
+ storage.Writer
+ c *iStorage
+}
+
+func (w *iStorageWriter) Write(p []byte) (n int, err error) {
+ n, err = w.Writer.Write(p)
+ atomic.AddUint64(&w.c.write, uint64(n))
+ return n, err
+}
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go
index 1189dec..9ba71fd 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go
@@ -9,10 +9,12 @@ package storage
import (
"errors"
"fmt"
+ "io"
"io/ioutil"
"os"
"path/filepath"
"runtime"
+ "sort"
"strconv"
"strings"
"sync"
@@ -42,6 +44,30 @@ func (lock *fileStorageLock) Unlock() {
}
}
+type int64Slice []int64
+
+func (p int64Slice) Len() int { return len(p) }
+func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
+func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+
+func writeFileSynced(filename string, data []byte, perm os.FileMode) error {
+ f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
+ if err != nil {
+ return err
+ }
+ n, err := f.Write(data)
+ if err == nil && n < len(data) {
+ err = io.ErrShortWrite
+ }
+ if err1 := f.Sync(); err == nil {
+ err = err1
+ }
+ if err1 := f.Close(); err == nil {
+ err = err1
+ }
+ return err
+}
+
const logSizeThreshold = 1024 * 1024 // 1 MiB
// fileStorage is a file-system backed storage.
@@ -60,7 +86,7 @@ type fileStorage struct {
day int
}
-// OpenFile returns a new filesytem-backed storage implementation with the given
+// OpenFile returns a new filesystem-backed storage implementation with the given
// path. This also acquire a file lock, so any subsequent attempt to open the
// same path will fail.
//
@@ -189,7 +215,8 @@ func (fs *fileStorage) doLog(t time.Time, str string) {
// write
fs.buf = append(fs.buf, []byte(str)...)
fs.buf = append(fs.buf, '\n')
- fs.logw.Write(fs.buf)
+ n, _ := fs.logw.Write(fs.buf)
+ fs.logSize += int64(n)
}
func (fs *fileStorage) Log(str string) {
@@ -210,7 +237,46 @@ func (fs *fileStorage) log(str string) {
}
}
-func (fs *fileStorage) SetMeta(fd FileDesc) (err error) {
+func (fs *fileStorage) setMeta(fd FileDesc) error {
+ content := fsGenName(fd) + "\n"
+ // Check and backup old CURRENT file.
+ currentPath := filepath.Join(fs.path, "CURRENT")
+ if _, err := os.Stat(currentPath); err == nil {
+ b, err := ioutil.ReadFile(currentPath)
+ if err != nil {
+ fs.log(fmt.Sprintf("backup CURRENT: %v", err))
+ return err
+ }
+ if string(b) == content {
+ // Content not changed, do nothing.
+ return nil
+ }
+ if err := writeFileSynced(currentPath+".bak", b, 0644); err != nil {
+ fs.log(fmt.Sprintf("backup CURRENT: %v", err))
+ return err
+ }
+ } else if !os.IsNotExist(err) {
+ return err
+ }
+ path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
+ if err := writeFileSynced(path, []byte(content), 0644); err != nil {
+ fs.log(fmt.Sprintf("create CURRENT.%d: %v", fd.Num, err))
+ return err
+ }
+ // Replace CURRENT file.
+ if err := rename(path, currentPath); err != nil {
+ fs.log(fmt.Sprintf("rename CURRENT.%d: %v", fd.Num, err))
+ return err
+ }
+ // Sync root directory.
+ if err := syncDir(fs.path); err != nil {
+ fs.log(fmt.Sprintf("syncDir: %v", err))
+ return err
+ }
+ return nil
+}
+
+func (fs *fileStorage) SetMeta(fd FileDesc) error {
if !FileDescOk(fd) {
return ErrInvalidFile
}
@@ -223,44 +289,10 @@ func (fs *fileStorage) SetMeta(fd FileDesc) (err error) {
if fs.open < 0 {
return ErrClosed
}
- defer func() {
- if err != nil {
- fs.log(fmt.Sprintf("CURRENT: %v", err))
- }
- }()
- path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
- w, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
- if err != nil {
- return
- }
- _, err = fmt.Fprintln(w, fsGenName(fd))
- if err != nil {
- fs.log(fmt.Sprintf("write CURRENT.%d: %v", fd.Num, err))
- return
- }
- if err = w.Sync(); err != nil {
- fs.log(fmt.Sprintf("flush CURRENT.%d: %v", fd.Num, err))
- return
- }
- if err = w.Close(); err != nil {
- fs.log(fmt.Sprintf("close CURRENT.%d: %v", fd.Num, err))
- return
- }
- if err != nil {
- return
- }
- if err = rename(path, filepath.Join(fs.path, "CURRENT")); err != nil {
- fs.log(fmt.Sprintf("rename CURRENT.%d: %v", fd.Num, err))
- return
- }
- // Sync root directory.
- if err = syncDir(fs.path); err != nil {
- fs.log(fmt.Sprintf("syncDir: %v", err))
- }
- return
+ return fs.setMeta(fd)
}
-func (fs *fileStorage) GetMeta() (fd FileDesc, err error) {
+func (fs *fileStorage) GetMeta() (FileDesc, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
@@ -268,7 +300,7 @@ func (fs *fileStorage) GetMeta() (fd FileDesc, err error) {
}
dir, err := os.Open(fs.path)
if err != nil {
- return
+ return FileDesc{}, err
}
names, err := dir.Readdirnames(0)
// Close the dir first before checking for Readdirnames error.
@@ -276,94 +308,134 @@ func (fs *fileStorage) GetMeta() (fd FileDesc, err error) {
fs.log(fmt.Sprintf("close dir: %v", ce))
}
if err != nil {
- return
- }
- // Find latest CURRENT file.
- var rem []string
- var pend bool
- var cerr error
- for _, name := range names {
- if strings.HasPrefix(name, "CURRENT") {
- pend1 := len(name) > 7
- var pendNum int64
- // Make sure it is valid name for a CURRENT file, otherwise skip it.
- if pend1 {
- if name[7] != '.' || len(name) < 9 {
- fs.log(fmt.Sprintf("skipping %s: invalid file name", name))
- continue
- }
- var e1 error
- if pendNum, e1 = strconv.ParseInt(name[8:], 10, 0); e1 != nil {
- fs.log(fmt.Sprintf("skipping %s: invalid file num: %v", name, e1))
- continue
- }
+ return FileDesc{}, err
+ }
+ // Try this in order:
+ // - CURRENT.[0-9]+ ('pending rename' file, descending order)
+ // - CURRENT
+ // - CURRENT.bak
+ //
+ // Skip corrupted file or file that point to a missing target file.
+ type currentFile struct {
+ name string
+ fd FileDesc
+ }
+ tryCurrent := func(name string) (*currentFile, error) {
+ b, err := ioutil.ReadFile(filepath.Join(fs.path, name))
+ if err != nil {
+ if os.IsNotExist(err) {
+ err = os.ErrNotExist
}
- path := filepath.Join(fs.path, name)
- r, e1 := os.OpenFile(path, os.O_RDONLY, 0)
- if e1 != nil {
- return FileDesc{}, e1
+ return nil, err
+ }
+ var fd FileDesc
+ if len(b) < 1 || b[len(b)-1] != '\n' || !fsParseNamePtr(string(b[:len(b)-1]), &fd) {
+ fs.log(fmt.Sprintf("%s: corrupted content: %q", name, b))
+ err := &ErrCorrupted{
+ Err: errors.New("leveldb/storage: corrupted or incomplete CURRENT file"),
}
- b, e1 := ioutil.ReadAll(r)
- if e1 != nil {
- r.Close()
- return FileDesc{}, e1
+ return nil, err
+ }
+ if _, err := os.Stat(filepath.Join(fs.path, fsGenName(fd))); err != nil {
+ if os.IsNotExist(err) {
+ fs.log(fmt.Sprintf("%s: missing target file: %s", name, fd))
+ err = os.ErrNotExist
}
- var fd1 FileDesc
- if len(b) < 1 || b[len(b)-1] != '\n' || !fsParseNamePtr(string(b[:len(b)-1]), &fd1) {
- fs.log(fmt.Sprintf("skipping %s: corrupted or incomplete", name))
- if pend1 {
- rem = append(rem, name)
- }
- if !pend1 || cerr == nil {
- metaFd, _ := fsParseName(name)
- cerr = &ErrCorrupted{
- Fd: metaFd,
- Err: errors.New("leveldb/storage: corrupted or incomplete meta file"),
- }
- }
- } else if pend1 && pendNum != fd1.Num {
- fs.log(fmt.Sprintf("skipping %s: inconsistent pending-file num: %d vs %d", name, pendNum, fd1.Num))
- rem = append(rem, name)
- } else if fd1.Num < fd.Num {
- fs.log(fmt.Sprintf("skipping %s: obsolete", name))
- if pend1 {
- rem = append(rem, name)
- }
+ return nil, err
+ }
+ return &currentFile{name: name, fd: fd}, nil
+ }
+ tryCurrents := func(names []string) (*currentFile, error) {
+ var (
+ cur *currentFile
+ // Last corruption error.
+ lastCerr error
+ )
+ for _, name := range names {
+ var err error
+ cur, err = tryCurrent(name)
+ if err == nil {
+ break
+ } else if err == os.ErrNotExist {
+ // Fallback to the next file.
+ } else if isCorrupted(err) {
+ lastCerr = err
+ // Fallback to the next file.
} else {
- fd = fd1
- pend = pend1
+ // In case the error is due to permission, etc.
+ return nil, err
}
- if err := r.Close(); err != nil {
- fs.log(fmt.Sprintf("close %s: %v", name, err))
+ }
+ if cur == nil {
+ err := os.ErrNotExist
+ if lastCerr != nil {
+ err = lastCerr
}
+ return nil, err
}
+ return cur, nil
}
- // Don't remove any files if there is no valid CURRENT file.
- if fd.Zero() {
- if cerr != nil {
- err = cerr
- } else {
- err = os.ErrNotExist
+
+ // Try 'pending rename' files.
+ var nums []int64
+ for _, name := range names {
+ if strings.HasPrefix(name, "CURRENT.") && name != "CURRENT.bak" {
+ i, err := strconv.ParseInt(name[8:], 10, 64)
+ if err == nil {
+ nums = append(nums, i)
+ }
}
- return
}
- if !fs.readOnly {
- // Rename pending CURRENT file to an effective CURRENT.
- if pend {
- path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
- if err := rename(path, filepath.Join(fs.path, "CURRENT")); err != nil {
- fs.log(fmt.Sprintf("CURRENT.%d -> CURRENT: %v", fd.Num, err))
- }
+ var (
+ pendCur *currentFile
+ pendErr = os.ErrNotExist
+ pendNames []string
+ )
+ if len(nums) > 0 {
+ sort.Sort(sort.Reverse(int64Slice(nums)))
+ pendNames = make([]string, len(nums))
+ for i, num := range nums {
+ pendNames[i] = fmt.Sprintf("CURRENT.%d", num)
}
- // Remove obsolete or incomplete pending CURRENT files.
- for _, name := range rem {
- path := filepath.Join(fs.path, name)
- if err := os.Remove(path); err != nil {
- fs.log(fmt.Sprintf("remove %s: %v", name, err))
+ pendCur, pendErr = tryCurrents(pendNames)
+ if pendErr != nil && pendErr != os.ErrNotExist && !isCorrupted(pendErr) {
+ return FileDesc{}, pendErr
+ }
+ }
+
+ // Try CURRENT and CURRENT.bak.
+ curCur, curErr := tryCurrents([]string{"CURRENT", "CURRENT.bak"})
+ if curErr != nil && curErr != os.ErrNotExist && !isCorrupted(curErr) {
+ return FileDesc{}, curErr
+ }
+
+ // pendCur takes precedence, but guards against obsolete pendCur.
+ if pendCur != nil && (curCur == nil || pendCur.fd.Num > curCur.fd.Num) {
+ curCur = pendCur
+ }
+
+ if curCur != nil {
+ // Restore CURRENT file to proper state.
+ if !fs.readOnly && (curCur.name != "CURRENT" || len(pendNames) != 0) {
+ // Ignore setMeta errors, however don't delete obsolete files if we
+ // catch error.
+ if err := fs.setMeta(curCur.fd); err == nil {
+ // Remove 'pending rename' files.
+ for _, name := range pendNames {
+ if err := os.Remove(filepath.Join(fs.path, name)); err != nil {
+ fs.log(fmt.Sprintf("remove %s: %v", name, err))
+ }
+ }
}
}
+ return curCur.fd, nil
}
- return
+
+ // Nothing found.
+ if isCorrupted(pendErr) {
+ return FileDesc{}, pendErr
+ }
+ return FileDesc{}, curErr
}
func (fs *fileStorage) List(ft FileType) (fds []FileDesc, err error) {
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_plan9.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_plan9.go
index bab62bf..b829798 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_plan9.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_plan9.go
@@ -8,7 +8,6 @@ package storage
import (
"os"
- "path/filepath"
)
type plan9FileLock struct {
@@ -48,8 +47,7 @@ func rename(oldpath, newpath string) error {
}
}
- _, fname := filepath.Split(newpath)
- return os.Rename(oldpath, fname)
+ return os.Rename(oldpath, newpath)
}
func syncDir(name string) error {
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go
index 7e29915..d75f66a 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go
@@ -67,13 +67,25 @@ func isErrInvalid(err error) bool {
if err == os.ErrInvalid {
return true
}
+ // Go < 1.8
if syserr, ok := err.(*os.SyscallError); ok && syserr.Err == syscall.EINVAL {
return true
}
+ // Go >= 1.8 returns *os.PathError instead
+ if patherr, ok := err.(*os.PathError); ok && patherr.Err == syscall.EINVAL {
+ return true
+ }
return false
}
func syncDir(name string) error {
+ // As per fsync manpage, Linux seems to expect fsync on directory, however
+ // some system don't support this, so we will ignore syscall.EINVAL.
+ //
+ // From fsync(2):
+ // Calling fsync() does not necessarily ensure that the entry in the
+ // directory containing the file has also reached disk. For that an
+ // explicit fsync() on a file descriptor for the directory is also needed.
f, err := os.Open(name)
if err != nil {
return err
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go
index 9b0421f..838f1be 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go
@@ -12,7 +12,11 @@ import (
"sync"
)
-const typeShift = 3
+const typeShift = 4
+
+// Verify at compile-time that typeShift is large enough to cover all FileType
+// values by confirming that 0 == 0.
+var _ [0]struct{} = [TypeAll >> typeShift]struct{}{}
type memStorageLock struct {
ms *memStorage
@@ -143,7 +147,7 @@ func (ms *memStorage) Remove(fd FileDesc) error {
}
func (ms *memStorage) Rename(oldfd, newfd FileDesc) error {
- if FileDescOk(oldfd) || FileDescOk(newfd) {
+ if !FileDescOk(oldfd) || !FileDescOk(newfd) {
return ErrInvalidFile
}
if oldfd == newfd {
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go
index c16bce6..4e4a724 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go
@@ -55,6 +55,14 @@ type ErrCorrupted struct {
Err error
}
+func isCorrupted(err error) bool {
+ switch err.(type) {
+ case *ErrCorrupted:
+ return true
+ }
+ return false
+}
+
func (e *ErrCorrupted) Error() string {
if !e.Fd.Zero() {
return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd)
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table.go b/vendor/github.com/syndtr/goleveldb/leveldb/table.go
index 81d18a5..adf773f 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/table.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/table.go
@@ -451,7 +451,7 @@ func newTableOps(s *session) *tOps {
if !s.o.GetDisableBlockCache() {
var bcacher cache.Cacher
if s.o.GetBlockCacheCapacity() > 0 {
- bcacher = cache.NewLRU(s.o.GetBlockCacheCapacity())
+ bcacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity())
}
bcache = cache.NewCache(bcacher)
}
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/util.go b/vendor/github.com/syndtr/goleveldb/leveldb/util.go
index e572a32..0e2b519 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/util.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/util.go
@@ -20,7 +20,7 @@ func shorten(str string) string {
return str[:3] + ".." + str[len(str)-3:]
}
-var bunits = [...]string{"", "Ki", "Mi", "Gi"}
+var bunits = [...]string{"", "Ki", "Mi", "Gi", "Ti"}
func shortenb(bytes int) string {
i := 0