aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Borg <jakob@nym.se>2014-09-02 09:43:42 +0200
committerJakob Borg <jakob@nym.se>2014-09-02 09:44:07 +0200
commitf633bdddf0c5ffbfe954b2d409fd121051c80eb7 (patch)
treea930a9f9f4fe169e526e445b30e049edf8ef02c1
parentde0b91d1570ae5a6a3d1d26f725e07a6b82d59cf (diff)
downloadsyncthing-f633bdddf0c5ffbfe954b2d409fd121051c80eb7.tar.gz
syncthing-f633bdddf0c5ffbfe954b2d409fd121051c80eb7.zip
Update goleveldb
-rw-r--r--Godeps/Godeps.json2
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go11
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go38
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go23
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go6
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go11
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go9
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go70
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go10
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go14
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go17
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go26
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go2
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go217
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go4
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go (renamed from Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go)103
16 files changed, 384 insertions, 179 deletions
diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json
index 065c61690..f8368e416 100644
--- a/Godeps/Godeps.json
+++ b/Godeps/Godeps.json
@@ -49,7 +49,7 @@
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
- "Rev": "59d87758aeaab5ab6ed289c773349500228a1557"
+ "Rev": "2b99e8d4757bf06eeab1b0485d80b8ae1c088874"
},
{
"ImportPath": "github.com/vitrun/qart/coding",
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go
index fe398f03a..49f82f0fb 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go
@@ -40,10 +40,21 @@ type Cache interface {
// Size returns entire alive cache objects size.
Size() int
+ // NumObjects returns number of alive objects.
+ NumObjects() int
+
// GetNamespace gets cache namespace with the given id.
// GetNamespace is never return nil.
GetNamespace(id uint64) Namespace
+ // PurgeNamespace purges cache namespace with the given id from this cache tree.
+ // Also read Namespace.Purge.
+ PurgeNamespace(id uint64, fin PurgeFin)
+
+ // ZapNamespace detaches cache namespace with the given id from this cache tree.
+ // Also read Namespace.Zap.
+ ZapNamespace(id uint64)
+
// Purge purges all cache namespace from this cache tree.
// This is behave the same as calling Namespace.Purge method on all cache namespace.
Purge(fin PurgeFin)
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go
index a1504b159..a29b5d088 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go
@@ -15,11 +15,11 @@ import (
// lruCache represent a LRU cache state.
type lruCache struct {
- mu sync.Mutex
- recent lruNode
- table map[uint64]*lruNs
- capacity int
- used, size int
+ mu sync.Mutex
+ recent lruNode
+ table map[uint64]*lruNs
+ capacity int
+ used, size, alive int
}
// NewLRUCache creates a new initialized LRU cache with the given capacity.
@@ -51,6 +51,12 @@ func (c *lruCache) Size() int {
return c.size
}
+func (c *lruCache) NumObjects() int {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ return c.alive
+}
+
// SetCapacity set cache capacity.
func (c *lruCache) SetCapacity(capacity int) {
c.mu.Lock()
@@ -77,6 +83,23 @@ func (c *lruCache) GetNamespace(id uint64) Namespace {
return ns
}
+func (c *lruCache) ZapNamespace(id uint64) {
+ c.mu.Lock()
+ if ns, exist := c.table[id]; exist {
+ ns.zapNB()
+ delete(c.table, id)
+ }
+ c.mu.Unlock()
+}
+
+func (c *lruCache) PurgeNamespace(id uint64, fin PurgeFin) {
+ c.mu.Lock()
+ if ns, exist := c.table[id]; exist {
+ ns.purgeNB(fin)
+ }
+ c.mu.Unlock()
+}
+
// Purge purge entire cache.
func (c *lruCache) Purge(fin PurgeFin) {
c.mu.Lock()
@@ -158,11 +181,12 @@ func (ns *lruNs) Get(key uint64, setf SetFunc) Handle {
}
ns.table[key] = node
+ ns.lru.size += charge
+ ns.lru.alive++
if charge > 0 {
node.ref++
node.rInsert(&ns.lru.recent)
ns.lru.used += charge
- ns.lru.size += charge
ns.lru.evict()
}
}
@@ -322,8 +346,10 @@ func (n *lruNode) derefNB() {
// Remove elemement.
delete(n.ns.table, n.key)
n.ns.lru.size -= n.charge
+ n.ns.lru.alive--
n.fin()
}
+ n.value = nil
} else if n.ref < 0 {
panic("leveldb/cache: lruCache: negative node reference")
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
index 59b9017d3..979d0ac4a 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
@@ -14,6 +14,7 @@ import (
"runtime"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/syndtr/goleveldb/leveldb/iterator"
@@ -35,7 +36,7 @@ type DB struct {
// MemDB.
memMu sync.RWMutex
- memPool *util.Pool
+ memPool chan *memdb.DB
mem, frozenMem *memDB
journal *journal.Writer
journalWriter storage.Writer
@@ -47,6 +48,9 @@ type DB struct {
snapsMu sync.Mutex
snapsRoot snapshotElement
+ // Stats.
+ aliveSnaps, aliveIters int32
+
// Write.
writeC chan *Batch
writeMergedC chan bool
@@ -80,7 +84,7 @@ func openDB(s *session) (*DB, error) {
// Initial sequence
seq: s.stSeq,
// MemDB
- memPool: util.NewPool(1),
+ memPool: make(chan *memdb.DB, 1),
// Write
writeC: make(chan *Batch),
writeMergedC: make(chan bool),
@@ -122,6 +126,7 @@ func openDB(s *session) (*DB, error) {
go db.tCompaction()
go db.mCompaction()
go db.jWriter()
+ go db.mpoolDrain()
s.logf("db@open done T·%v", time.Since(start))
@@ -568,7 +573,7 @@ func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, er
}
defer m.decref()
- mk, mv, me := m.db.Find(ikey)
+ mk, mv, me := m.mdb.Find(ikey)
if me == nil {
ukey, _, t, ok := parseIkey(mk)
if ok && db.s.icmp.uCompare(ukey, key) == 0 {
@@ -657,6 +662,14 @@ func (db *DB) GetSnapshot() (*Snapshot, error) {
// Returns sstables list for each level.
// leveldb.blockpool
// Returns block pool stats.
+// leveldb.cachedblock
+// Returns size of cached block.
+// leveldb.openedtables
+// Returns number of opened tables.
+// leveldb.alivesnaps
+// Returns number of alive snapshots.
+// leveldb.aliveiters
+// Returns number of alive iterators.
func (db *DB) GetProperty(name string) (value string, err error) {
err = db.ok()
if err != nil {
@@ -712,6 +725,10 @@ func (db *DB) GetProperty(name string) (value string, err error) {
}
case p == "openedtables":
value = fmt.Sprintf("%d", db.s.tops.cache.Size())
+ case p == "alivesnaps":
+ value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
+ case p == "aliveiters":
+ value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
default:
err = errors.New("leveldb: GetProperty: unknown property: " + name)
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
index e5e74d7e1..b38cdedc5 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
@@ -221,10 +221,10 @@ func (db *DB) memCompaction() {
c := newCMem(db.s)
stats := new(cStatsStaging)
- db.logf("mem@flush N·%d S·%s", mem.db.Len(), shortenb(mem.db.Size()))
+ db.logf("mem@flush N·%d S·%s", mem.mdb.Len(), shortenb(mem.mdb.Size()))
// Don't compact empty memdb.
- if mem.db.Len() == 0 {
+ if mem.mdb.Len() == 0 {
db.logf("mem@flush skipping")
// drop frozen mem
db.dropFrozenMem()
@@ -242,7 +242,7 @@ func (db *DB) memCompaction() {
db.compactionTransact("mem@flush", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
defer stats.stopTimer()
- return c.flush(mem.db, -1)
+ return c.flush(mem.mdb, -1)
}, func() error {
for _, r := range c.rec.addedTables {
db.logf("mem@flush rollback @%d", r.num)
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go
index d028768d5..c34c7abae 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go
@@ -10,6 +10,7 @@ import (
"errors"
"runtime"
"sync"
+ "sync/atomic"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
@@ -38,11 +39,11 @@ func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It
ti := v.getIterators(slice, ro)
n := len(ti) + 2
i := make([]iterator.Iterator, 0, n)
- emi := em.db.NewIterator(slice)
+ emi := em.mdb.NewIterator(slice)
emi.SetReleaser(&memdbReleaser{m: em})
i = append(i, emi)
if fm != nil {
- fmi := fm.db.NewIterator(slice)
+ fmi := fm.mdb.NewIterator(slice)
fmi.SetReleaser(&memdbReleaser{m: fm})
i = append(i, fmi)
}
@@ -66,6 +67,7 @@ func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *d
}
rawIter := db.newRawIterator(islice, ro)
iter := &dbIter{
+ db: db,
icmp: db.s.icmp,
iter: rawIter,
seq: seq,
@@ -73,6 +75,7 @@ func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *d
key: make([]byte, 0),
value: make([]byte, 0),
}
+ atomic.AddInt32(&db.aliveIters, 1)
runtime.SetFinalizer(iter, (*dbIter).Release)
return iter
}
@@ -89,6 +92,7 @@ const (
// dbIter represent an interator states over a database session.
type dbIter struct {
+ db *DB
icmp *iComparer
iter iterator.Iterator
seq uint64
@@ -303,6 +307,7 @@ func (i *dbIter) Release() {
if i.releaser != nil {
i.releaser.Release()
+ i.releaser = nil
}
i.dir = dirReleased
@@ -310,6 +315,8 @@ func (i *dbIter) Release() {
i.value = nil
i.iter.Release()
i.iter = nil
+ atomic.AddInt32(&i.db.aliveIters, -1)
+ i.db = nil
}
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go
index 31340bdd0..fb1ce85b9 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go
@@ -9,6 +9,7 @@ package leveldb
import (
"runtime"
"sync"
+ "sync/atomic"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
@@ -81,7 +82,7 @@ func (db *DB) minSeq() uint64 {
type Snapshot struct {
db *DB
elem *snapshotElement
- mu sync.Mutex
+ mu sync.RWMutex
released bool
}
@@ -91,6 +92,7 @@ func (db *DB) newSnapshot() *Snapshot {
db: db,
elem: db.acquireSnapshot(),
}
+ atomic.AddInt32(&db.aliveSnaps, 1)
runtime.SetFinalizer(snap, (*Snapshot).Release)
return snap
}
@@ -105,8 +107,8 @@ func (snap *Snapshot) Get(key []byte, ro *opt.ReadOptions) (value []byte, err er
if err != nil {
return
}
- snap.mu.Lock()
- defer snap.mu.Unlock()
+ snap.mu.RLock()
+ defer snap.mu.RUnlock()
if snap.released {
err = ErrSnapshotReleased
return
@@ -160,6 +162,7 @@ func (snap *Snapshot) Release() {
snap.released = true
snap.db.releaseSnapshot(snap.elem)
+ atomic.AddInt32(&snap.db.aliveSnaps, -1)
snap.db = nil
snap.elem = nil
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go
index 807cc6c7a..24ecab504 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go
@@ -8,16 +8,16 @@ package leveldb
import (
"sync/atomic"
+ "time"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/memdb"
- "github.com/syndtr/goleveldb/leveldb/util"
)
type memDB struct {
- pool *util.Pool
- db *memdb.DB
- ref int32
+ db *DB
+ mdb *memdb.DB
+ ref int32
}
func (m *memDB) incref() {
@@ -26,7 +26,13 @@ func (m *memDB) incref() {
func (m *memDB) decref() {
if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
- m.pool.Put(m)
+ // Only put back memdb with std capacity.
+ if m.mdb.Capacity() == m.db.s.o.GetWriteBuffer() {
+ m.mdb.Reset()
+ m.db.mpoolPut(m.mdb)
+ }
+ m.db = nil
+ m.mdb = nil
} else if ref < 0 {
panic("negative memdb ref")
}
@@ -42,6 +48,41 @@ func (db *DB) addSeq(delta uint64) {
atomic.AddUint64(&db.seq, delta)
}
+func (db *DB) mpoolPut(mem *memdb.DB) {
+ defer func() {
+ recover()
+ }()
+ select {
+ case db.memPool <- mem:
+ default:
+ }
+}
+
+func (db *DB) mpoolGet() *memdb.DB {
+ select {
+ case mem := <-db.memPool:
+ return mem
+ default:
+ return nil
+ }
+}
+
+func (db *DB) mpoolDrain() {
+ ticker := time.NewTicker(30 * time.Second)
+ for {
+ select {
+ case <-ticker.C:
+ select {
+ case <-db.memPool:
+ default:
+ }
+ case _, _ = <-db.closeC:
+ close(db.memPool)
+ return
+ }
+ }
+}
+
// Create new memdb and froze the old one; need external synchronization.
// newMem only called synchronously by the writer.
func (db *DB) newMem(n int) (mem *memDB, err error) {
@@ -70,18 +111,15 @@ func (db *DB) newMem(n int) (mem *memDB, err error) {
db.journalWriter = w
db.journalFile = file
db.frozenMem = db.mem
- mem, ok := db.memPool.Get().(*memDB)
- if ok && mem.db.Capacity() >= n {
- mem.db.Reset()
- mem.incref()
- } else {
- mem = &memDB{
- pool: db.memPool,
- db: memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n)),
- ref: 1,
- }
+ mdb := db.mpoolGet()
+ if mdb == nil || mdb.Capacity() < n {
+ mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
+ }
+ mem = &memDB{
+ db: db,
+ mdb: mdb,
+ ref: 2,
}
- mem.incref()
db.mem = mem
// The seq only incremented by the writer. And whoever called newMem
// should hold write lock, so no need additional synchronization here.
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go
index 7f15b4b65..5aadc12d7 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go
@@ -1577,7 +1577,11 @@ func TestDb_BloomFilter(t *testing.T) {
return fmt.Sprintf("key%06d", i)
}
- n := 10000
+ const (
+ n = 10000
+ indexOverheat = 19898
+ filterOverheat = 19799
+ )
// Populate multiple layers
for i := 0; i < n; i++ {
@@ -1601,7 +1605,7 @@ func TestDb_BloomFilter(t *testing.T) {
cnt := int(h.stor.ReadCounter())
t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt)
- if min, max := n, n+2*n/100; cnt < min || cnt > max {
+ if min, max := n+indexOverheat+filterOverheat, n+indexOverheat+filterOverheat+2*n/100; cnt < min || cnt > max {
t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt)
}
@@ -1612,7 +1616,7 @@ func TestDb_BloomFilter(t *testing.T) {
}
cnt = int(h.stor.ReadCounter())
t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt)
- if max := 3 * n / 100; cnt > max {
+ if max := 3*n/100 + indexOverheat + filterOverheat; cnt > max {
t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt)
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
index 755108590..82725a9ee 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
@@ -75,7 +75,7 @@ func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
mem = nil
}
}()
- nn = mem.db.Free()
+ nn = mem.mdb.Free()
switch {
case v.tLen(0) >= kL0_SlowdownWritesTrigger && !delayed:
delayed = true
@@ -90,13 +90,13 @@ func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
}
default:
// Allow memdb to grow if it has no entry.
- if mem.db.Len() == 0 {
+ if mem.mdb.Len() == 0 {
nn = n
} else {
mem.decref()
mem, err = db.rotateMem(n)
if err == nil {
- nn = mem.db.Free()
+ nn = mem.mdb.Free()
} else {
nn = 0
}
@@ -190,7 +190,7 @@ drain:
return
case db.journalC <- b:
// Write into memdb
- b.memReplay(mem.db)
+ b.memReplay(mem.mdb)
}
// Wait for journal writer
select {
@@ -200,7 +200,7 @@ drain:
case err = <-db.journalAckC:
if err != nil {
// Revert memdb if error detected
- b.revertMemReplay(mem.db)
+ b.revertMemReplay(mem.mdb)
return
}
}
@@ -209,7 +209,7 @@ drain:
if err != nil {
return
}
- b.memReplay(mem.db)
+ b.memReplay(mem.mdb)
}
// Set last seq number.
@@ -271,7 +271,7 @@ func (db *DB) CompactRange(r util.Range) error {
// Check for overlaps in memdb.
mem := db.getEffectiveMem()
defer mem.decref()
- if isMemOverlaps(db.s.icmp, mem.db, r.Start, r.Limit) {
+ if isMemOverlaps(db.s.icmp, mem.mdb, r.Start, r.Limit) {
// Memdb compaction.
if _, err := db.rotateMem(0); err != nil {
<-db.writeLockC
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go
index 241184481..21e7a8a7f 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go
@@ -30,13 +30,16 @@ const (
type noCache struct{}
-func (noCache) SetCapacity(capacity int) {}
-func (noCache) Capacity() int { return 0 }
-func (noCache) Used() int { return 0 }
-func (noCache) Size() int { return 0 }
-func (noCache) GetNamespace(id uint64) cache.Namespace { return nil }
-func (noCache) Purge(fin cache.PurgeFin) {}
-func (noCache) Zap() {}
+func (noCache) SetCapacity(capacity int) {}
+func (noCache) Capacity() int { return 0 }
+func (noCache) Used() int { return 0 }
+func (noCache) Size() int { return 0 }
+func (noCache) NumObjects() int { return 0 }
+func (noCache) GetNamespace(id uint64) cache.Namespace { return nil }
+func (noCache) PurgeNamespace(id uint64, fin cache.PurgeFin) {}
+func (noCache) ZapNamespace(id uint64) {}
+func (noCache) Purge(fin cache.PurgeFin) {}
+func (noCache) Zap() {}
var NoCache cache.Cache = noCache{}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go
index 1c3ff3249..a1b04d827 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go
@@ -7,7 +7,6 @@
package leveldb
import (
- "io"
"sort"
"sync/atomic"
@@ -323,15 +322,6 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
return
}
-type tableWrapper struct {
- *table.Reader
- closer io.Closer
-}
-
-func (tr tableWrapper) Release() {
- tr.closer.Close()
-}
-
// Opens table. It returns a cache handle, which should
// be released after use.
func (t *tOps) open(f *tFile) (ch cache.Handle, err error) {
@@ -347,7 +337,7 @@ func (t *tOps) open(f *tFile) (ch cache.Handle, err error) {
if bc := t.s.o.GetBlockCache(); bc != nil {
bcacheNS = bc.GetNamespace(num)
}
- return 1, tableWrapper{table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o), r}
+ return 1, table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o)
})
if ch == nil && err == nil {
err = ErrClosed
@@ -363,7 +353,7 @@ func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []b
return nil, nil, err
}
defer ch.Release()
- return ch.Value().(tableWrapper).Find(key, ro)
+ return ch.Value().(*table.Reader).Find(key, ro)
}
// Returns approximate offset of the given key.
@@ -372,10 +362,9 @@ func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) {
if err != nil {
return
}
- _offset, err := ch.Value().(tableWrapper).OffsetOf(key)
- offset = uint64(_offset)
- ch.Release()
- return
+ defer ch.Release()
+ offset_, err := ch.Value().(*table.Reader).OffsetOf(key)
+ return uint64(offset_), err
}
// Creates an iterator from the given table.
@@ -384,7 +373,7 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
if err != nil {
return iterator.NewEmptyIterator(err)
}
- iter := ch.Value().(tableWrapper).NewIterator(slice, ro)
+ iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
iter.SetReleaser(ch)
return iter
}
@@ -401,7 +390,7 @@ func (t *tOps) remove(f *tFile) {
t.s.logf("table@remove removed @%d", num)
}
if bc := t.s.o.GetBlockCache(); bc != nil {
- bc.GetNamespace(num).Zap()
+ bc.ZapNamespace(num)
}
}
})
@@ -411,6 +400,7 @@ func (t *tOps) remove(f *tFile) {
// regadless still used or not.
func (t *tOps) close() {
t.cache.Zap()
+ t.bpool.Close()
}
// Creates new initialized table ops instance.
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go
index ca598f4f5..9032751b9 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go
@@ -40,7 +40,7 @@ var _ = testutil.Defer(func() {
data := bw.buf.Bytes()
restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
return &block{
- cmp: comparer.DefaultComparer,
+ tr: &Reader{cmp: comparer.DefaultComparer},
data: data,
restartsLen: restartsLen,
restartsOffset: len(data) - (restartsLen+1)*4,
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go
index f397ac4f8..5ec2b3e53 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go
@@ -37,8 +37,7 @@ func max(x, y int) int {
}
type block struct {
- bpool *util.BufferPool
- cmp comparer.BasicComparer
+ tr *Reader
data []byte
restartsLen int
restartsOffset int
@@ -47,31 +46,25 @@ type block struct {
}
func (b *block) seek(rstart, rlimit int, key []byte) (index, offset int, err error) {
- n := b.restartsOffset
- data := b.data
- cmp := b.cmp
-
index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
- offset := int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):]))
- offset += 1 // shared always zero, since this is a restart point
- v1, n1 := binary.Uvarint(data[offset:]) // key length
- _, n2 := binary.Uvarint(data[offset+n1:]) // value length
+ offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
+ offset += 1 // shared always zero, since this is a restart point
+ v1, n1 := binary.Uvarint(b.data[offset:]) // key length
+ _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
m := offset + n1 + n2
- return cmp.Compare(data[m:m+int(v1)], key) > 0
+ return b.tr.cmp.Compare(b.data[m:m+int(v1)], key) > 0
}) + rstart - 1
if index < rstart {
// The smallest key is greater-than key sought.
index = rstart
}
- offset = int(binary.LittleEndian.Uint32(data[n+4*index:]))
+ offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
return
}
func (b *block) restartIndex(rstart, rlimit, offset int) int {
- n := b.restartsOffset
- data := b.data
return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
- return int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):])) > offset
+ return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset
}) + rstart - 1
}
@@ -141,10 +134,10 @@ func (b *block) newIterator(slice *util.Range, inclLimit bool, cache util.Releas
}
func (b *block) Release() {
- if b.bpool != nil {
- b.bpool.Put(b.data)
- b.bpool = nil
+ if b.tr.bpool != nil {
+ b.tr.bpool.Put(b.data)
}
+ b.tr = nil
b.data = nil
}
@@ -270,7 +263,7 @@ func (i *blockIter) Seek(key []byte) bool {
i.dir = dirForward
}
for i.Next() {
- if i.block.cmp.Compare(i.key, key) >= 0 {
+ if i.block.tr.cmp.Compare(i.key, key) >= 0 {
return true
}
}
@@ -479,7 +472,7 @@ func (i *blockIter) Error() error {
}
type filterBlock struct {
- filter filter.Filter
+ tr *Reader
data []byte
oOffset int
baseLg uint
@@ -493,7 +486,7 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool {
n := int(binary.LittleEndian.Uint32(o))
m := int(binary.LittleEndian.Uint32(o[4:]))
if n < m && m <= b.oOffset {
- return b.filter.Contains(b.data[n:m], key)
+ return b.tr.filter.Contains(b.data[n:m], key)
} else if n == m {
return false
}
@@ -501,10 +494,17 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool {
return true
}
+func (b *filterBlock) Release() {
+ if b.tr.bpool != nil {
+ b.tr.bpool.Put(b.data)
+ }
+ b.tr = nil
+ b.data = nil
+}
+
type indexIter struct {
- blockIter
- tableReader *Reader
- slice *util.Range
+ *blockIter
+ slice *util.Range
// Options
checksum bool
fillCache bool
@@ -523,7 +523,7 @@ func (i *indexIter) Get() iterator.Iterator {
if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) {
slice = i.slice
}
- return i.tableReader.getDataIter(dataBH, slice, i.checksum, i.fillCache)
+ return i.blockIter.block.tr.getDataIter(dataBH, slice, i.checksum, i.fillCache)
}
// Reader is a table reader.
@@ -538,9 +538,8 @@ type Reader struct {
checksum bool
strictIter bool
- dataEnd int64
- indexBlock *block
- filterBlock *filterBlock
+ dataEnd int64
+ indexBH, filterBH blockHandle
}
func verifyChecksum(data []byte) bool {
@@ -557,6 +556,7 @@ func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) {
}
if checksum || r.checksum {
if !verifyChecksum(data) {
+ r.bpool.Put(data)
return nil, errors.New("leveldb/table: Reader: invalid block (checksum mismatch)")
}
}
@@ -575,6 +575,7 @@ func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) {
return nil, err
}
default:
+ r.bpool.Put(data)
return nil, fmt.Errorf("leveldb/table: Reader: unknown block compression type: %d", data[bh.length])
}
return data, nil
@@ -587,7 +588,7 @@ func (r *Reader) readBlock(bh blockHandle, checksum bool) (*block, error) {
}
restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
b := &block{
- cmp: r.cmp,
+ tr: r,
data: data,
restartsLen: restartsLen,
restartsOffset: len(data) - (restartsLen+1)*4,
@@ -596,7 +597,44 @@ func (r *Reader) readBlock(bh blockHandle, checksum bool) (*block, error) {
return b, nil
}
-func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterBlock, error) {
+func (r *Reader) readBlockCached(bh blockHandle, checksum, fillCache bool) (*block, util.Releaser, error) {
+ if r.cache != nil {
+ var err error
+ ch := r.cache.Get(bh.offset, func() (charge int, value interface{}) {
+ if !fillCache {
+ return 0, nil
+ }
+ var b *block
+ b, err = r.readBlock(bh, checksum)
+ if err != nil {
+ return 0, nil
+ }
+ return cap(b.data), b
+ })
+ if ch != nil {
+ b, ok := ch.Value().(*block)
+ if !ok {
+ ch.Release()
+ return nil, nil, errors.New("leveldb/table: Reader: inconsistent block type")
+ }
+ if !b.checksum && (r.checksum || checksum) {
+ if !verifyChecksum(b.data) {
+ ch.Release()
+ return nil, nil, errors.New("leveldb/table: Reader: invalid block (checksum mismatch)")
+ }
+ b.checksum = true
+ }
+ return b, ch, err
+ } else if err != nil {
+ return nil, nil, err
+ }
+ }
+
+ b, err := r.readBlock(bh, checksum)
+ return b, b, err
+}
+
+func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) {
data, err := r.readRawBlock(bh, true)
if err != nil {
return nil, err
@@ -611,7 +649,7 @@ func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterB
return nil, errors.New("leveldb/table: Reader: invalid filter block (invalid offset)")
}
b := &filterBlock{
- filter: filter,
+ tr: r,
data: data,
oOffset: oOffset,
baseLg: uint(data[n-1]),
@@ -620,42 +658,42 @@ func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterB
return b, nil
}
-func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator {
+func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) {
if r.cache != nil {
- // Get/set block cache.
var err error
- cache := r.cache.Get(dataBH.offset, func() (charge int, value interface{}) {
+ ch := r.cache.Get(bh.offset, func() (charge int, value interface{}) {
if !fillCache {
return 0, nil
}
- var dataBlock *block
- dataBlock, err = r.readBlock(dataBH, checksum)
+ var b *filterBlock
+ b, err = r.readFilterBlock(bh)
if err != nil {
return 0, nil
}
- return int(dataBH.length), dataBlock
+ return cap(b.data), b
})
- if err != nil {
- return iterator.NewEmptyIterator(err)
- }
- if cache != nil {
- dataBlock := cache.Value().(*block)
- if !dataBlock.checksum && (r.checksum || checksum) {
- if !verifyChecksum(dataBlock.data) {
- return iterator.NewEmptyIterator(errors.New("leveldb/table: Reader: invalid block (checksum mismatch)"))
- }
- dataBlock.checksum = true
+ if ch != nil {
+ b, ok := ch.Value().(*filterBlock)
+ if !ok {
+ ch.Release()
+ return nil, nil, errors.New("leveldb/table: Reader: inconsistent block type")
}
- iter := dataBlock.newIterator(slice, false, cache)
- return iter
+ return b, ch, err
+ } else if err != nil {
+ return nil, nil, err
}
}
- dataBlock, err := r.readBlock(dataBH, checksum)
+
+ b, err := r.readFilterBlock(bh)
+ return b, b, err
+}
+
+func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator {
+ b, rel, err := r.readBlockCached(dataBH, checksum, fillCache)
if err != nil {
return iterator.NewEmptyIterator(err)
}
- iter := dataBlock.newIterator(slice, false, dataBlock)
- return iter
+ return b.newIterator(slice, false, rel)
}
// NewIterator creates an iterator from the table.
@@ -669,18 +707,21 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi
// when not used.
//
// Also read Iterator documentation of the leveldb/iterator package.
-
func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
if r.err != nil {
return iterator.NewEmptyIterator(r.err)
}
+ fillCache := !ro.GetDontFillCache()
+ b, rel, err := r.readBlockCached(r.indexBH, true, fillCache)
+ if err != nil {
+ return iterator.NewEmptyIterator(err)
+ }
index := &indexIter{
- blockIter: *r.indexBlock.newIterator(slice, true, nil),
- tableReader: r,
- slice: slice,
- checksum: ro.GetStrict(opt.StrictBlockChecksum),
- fillCache: !ro.GetDontFillCache(),
+ blockIter: b.newIterator(slice, true, rel),
+ slice: slice,
+ checksum: ro.GetStrict(opt.StrictBlockChecksum),
+ fillCache: !ro.GetDontFillCache(),
}
return iterator.NewIndexedIterator(index, r.strictIter || ro.GetStrict(opt.StrictIterator), false)
}
@@ -697,7 +738,13 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err
return
}
- index := r.indexBlock.newIterator(nil, true, nil)
+ indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true)
+ if err != nil {
+ return
+ }
+ defer rel.Release()
+
+ index := indexBlock.newIterator(nil, true, nil)
defer index.Release()
if !index.Seek(key) {
err = index.Error()
@@ -711,9 +758,15 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err
err = errors.New("leveldb/table: Reader: invalid table (bad data block handle)")
return
}
- if r.filterBlock != nil && !r.filterBlock.contains(dataBH.offset, key) {
- err = ErrNotFound
- return
+ if r.filter != nil {
+ filterBlock, rel, ferr := r.readFilterBlockCached(r.filterBH, true)
+ if ferr == nil {
+ if !filterBlock.contains(dataBH.offset, key) {
+ rel.Release()
+ return nil, nil, ErrNotFound
+ }
+ rel.Release()
+ }
}
data := r.getDataIter(dataBH, nil, ro.GetStrict(opt.StrictBlockChecksum), !ro.GetDontFillCache())
defer data.Release()
@@ -760,7 +813,13 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
return
}
- index := r.indexBlock.newIterator(nil, true, nil)
+ indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true)
+ if err != nil {
+ return
+ }
+ defer rel.Release()
+
+ index := indexBlock.newIterator(nil, true, nil)
defer index.Release()
if index.Seek(key) {
dataBH, n := decodeBlockHandle(index.Value())
@@ -778,6 +837,17 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
return
}
+// Release implements util.Releaser.
+// It also close the file if it is an io.Closer.
+func (r *Reader) Release() {
+ if closer, ok := r.reader.(io.Closer); ok {
+ closer.Close()
+ }
+ r.reader = nil
+ r.cache = nil
+ r.bpool = nil
+}
+
// NewReader creates a new initialized table reader for the file.
// The cache and bpool is optional and can be nil.
//
@@ -817,16 +887,11 @@ func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.Buf
return r
}
// Decode the index block handle.
- indexBH, n := decodeBlockHandle(footer[n:])
+ r.indexBH, n = decodeBlockHandle(footer[n:])
if n == 0 {
r.err = errors.New("leveldb/table: Reader: invalid table (bad index block handle)")
return r
}
- // Read index block.
- r.indexBlock, r.err = r.readBlock(indexBH, true)
- if r.err != nil {
- return r
- }
// Read metaindex block.
metaBlock, err := r.readBlock(metaBH, true)
if err != nil {
@@ -842,32 +907,28 @@ func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.Buf
continue
}
fn := key[7:]
- var filter filter.Filter
if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn {
- filter = f0
+ r.filter = f0
} else {
for _, f0 := range o.GetAltFilters() {
if f0.Name() == fn {
- filter = f0
+ r.filter = f0
break
}
}
}
- if filter != nil {
+ if r.filter != nil {
filterBH, n := decodeBlockHandle(metaIter.Value())
if n == 0 {
continue
}
+ r.filterBH = filterBH
// Update data end.
r.dataEnd = int64(filterBH.offset)
- filterBlock, err := r.readFilterBlock(filterBH, filter)
- if err != nil {
- continue
- }
- r.filterBlock = filterBlock
break
}
}
metaIter.Release()
+ metaBlock.Release()
return r
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go
index 0751cf529..fda541fde 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go
@@ -111,7 +111,9 @@ var _ = testutil.Defer(func() {
testutil.AllKeyValueTesting(nil, Build)
Describe("with one key per block", Test(testutil.KeyValue_Generate(nil, 9, 1, 10, 512, 512), func(r *Reader) {
It("should have correct blocks number", func() {
- Expect(r.indexBlock.restartsLen).Should(Equal(9))
+ indexBlock, err := r.readBlock(r.indexBH, true)
+ Expect(err).To(BeNil())
+ Expect(indexBlock.restartsLen).Should(Equal(9))
})
}))
})
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go
index 957f953b5..554e28ebd 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go
@@ -19,15 +19,21 @@ type buffer struct {
// BufferPool is a 'buffer pool'.
type BufferPool struct {
- pool [4]chan []byte
- size [3]uint32
- sizeMiss [3]uint32
- baseline0 int
- baseline1 int
- baseline2 int
+ pool [6]chan []byte
+ size [5]uint32
+ sizeMiss [5]uint32
+ sizeHalf [5]uint32
+ baseline [4]int
+ baselinex0 int
+ baselinex1 int
+ baseline0 int
+ baseline1 int
+ baseline2 int
+ close chan struct{}
get uint32
put uint32
+ half uint32
less uint32
equal uint32
greater uint32
@@ -35,16 +41,15 @@ type BufferPool struct {
}
func (p *BufferPool) poolNum(n int) int {
- switch {
- case n <= p.baseline0:
+ if n <= p.baseline0 && n > p.baseline0/2 {
return 0
- case n <= p.baseline1:
- return 1
- case n <= p.baseline2:
- return 2
- default:
- return 3
}
+ for i, x := range p.baseline {
+ if n <= x {
+ return i + 1
+ }
+ }
+ return len(p.baseline) + 1
}
// Get returns buffer with length of n.
@@ -59,13 +64,22 @@ func (p *BufferPool) Get(n int) []byte {
case b := <-pool:
switch {
case cap(b) > n:
- atomic.AddUint32(&p.less, 1)
- return b[:n]
+ if cap(b)-n >= n {
+ atomic.AddUint32(&p.half, 1)
+ select {
+ case pool <- b:
+ default:
+ }
+ return make([]byte, n)
+ } else {
+ atomic.AddUint32(&p.less, 1)
+ return b[:n]
+ }
case cap(b) == n:
atomic.AddUint32(&p.equal, 1)
return b[:n]
default:
- panic("not reached")
+ atomic.AddUint32(&p.greater, 1)
}
default:
atomic.AddUint32(&p.miss, 1)
@@ -79,8 +93,23 @@ func (p *BufferPool) Get(n int) []byte {
case b := <-pool:
switch {
case cap(b) > n:
- atomic.AddUint32(&p.less, 1)
- return b[:n]
+ if cap(b)-n >= n {
+ atomic.AddUint32(&p.half, 1)
+ sizeHalfPtr := &p.sizeHalf[poolNum-1]
+ if atomic.AddUint32(sizeHalfPtr, 1) == 20 {
+ atomic.StoreUint32(sizePtr, uint32(cap(b)/2))
+ atomic.StoreUint32(sizeHalfPtr, 0)
+ } else {
+ select {
+ case pool <- b:
+ default:
+ }
+ }
+ return make([]byte, n)
+ } else {
+ atomic.AddUint32(&p.less, 1)
+ return b[:n]
+ }
case cap(b) == n:
atomic.AddUint32(&p.equal, 1)
return b[:n]
@@ -126,20 +155,34 @@ func (p *BufferPool) Put(b []byte) {
}
+func (p *BufferPool) Close() {
+ select {
+ case p.close <- struct{}{}:
+ default:
+ }
+}
+
func (p *BufferPool) String() string {
- return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v G·%d P·%d <·%d =·%d >·%d M·%d}",
- p.baseline0, p.size, p.sizeMiss, p.get, p.put, p.less, p.equal, p.greater, p.miss)
+ return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}",
+ p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss)
}
func (p *BufferPool) drain() {
+ ticker := time.NewTicker(2 * time.Second)
for {
- time.Sleep(1 * time.Second)
select {
- case <-p.pool[0]:
- case <-p.pool[1]:
- case <-p.pool[2]:
- case <-p.pool[3]:
- default:
+ case <-ticker.C:
+ for _, ch := range p.pool {
+ select {
+ case <-ch:
+ default:
+ }
+ }
+ case <-p.close:
+ for _, ch := range p.pool {
+ close(ch)
+ }
+ return
}
}
}
@@ -151,10 +194,10 @@ func NewBufferPool(baseline int) *BufferPool {
}
p := &BufferPool{
baseline0: baseline,
- baseline1: baseline * 2,
- baseline2: baseline * 4,
+ baseline: [...]int{baseline / 4, baseline / 2, baseline * 2, baseline * 4},
+ close: make(chan struct{}, 1),
}
- for i, cap := range []int{6, 6, 3, 1} {
+ for i, cap := range []int{2, 2, 4, 4, 2, 1} {
p.pool[i] = make(chan []byte, cap)
}
go p.drain()