diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/batch.go b/vendor/github.com/syndtr/goleveldb/leveldb/batch.go index 501006717..225920002 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/batch.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/batch.go @@ -9,13 +9,15 @@ package leveldb import ( "encoding/binary" "fmt" + "io" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/memdb" "github.com/syndtr/goleveldb/leveldb/storage" ) -// ErrBatchCorrupted records reason of batch corruption. +// ErrBatchCorrupted records reason of batch corruption. This error will be +// wrapped with errors.ErrCorrupted. type ErrBatchCorrupted struct { Reason string } @@ -29,8 +31,9 @@ func newErrBatchCorrupted(reason string) error { } const ( - batchHdrLen = 8 + 4 - batchGrowRec = 3000 + batchHeaderLen = 8 + 4 + batchGrowRec = 3000 + batchBufioSize = 16 ) // BatchReplay wraps basic batch operations. @@ -39,34 +42,46 @@ type BatchReplay interface { Delete(key []byte) } +type batchIndex struct { + keyType keyType + keyPos, keyLen int + valuePos, valueLen int +} + +func (index batchIndex) k(data []byte) []byte { + return data[index.keyPos : index.keyPos+index.keyLen] +} + +func (index batchIndex) v(data []byte) []byte { + if index.valueLen != 0 { + return data[index.valuePos : index.valuePos+index.valueLen] + } + return nil +} + +func (index batchIndex) kv(data []byte) (key, value []byte) { + return index.k(data), index.v(data) +} + // Batch is a write batch. type Batch struct { - data []byte - rLen, bLen int - seq uint64 - sync bool + data []byte + index []batchIndex + + // internalLen is sums of key/value pair length plus 8-bytes internal key. + internalLen int } func (b *Batch) grow(n int) { - off := len(b.data) - if off == 0 { - off = batchHdrLen - if b.data != nil { - b.data = b.data[:off] - } - } - if cap(b.data)-off < n { - if b.data == nil { - b.data = make([]byte, off, off+n) - } else { - odata := b.data - div := 1 - if b.rLen > batchGrowRec { - div = b.rLen / batchGrowRec - } - b.data = make([]byte, off, off+n+(off-batchHdrLen)/div) - copy(b.data, odata) + o := len(b.data) + if cap(b.data)-o < n { + div := 1 + if len(b.index) > batchGrowRec { + div = len(b.index) / batchGrowRec } + ndata := make([]byte, o, o+n+o/div) + copy(ndata, b.data) + b.data = ndata } } @@ -76,32 +91,36 @@ func (b *Batch) appendRec(kt keyType, key, value []byte) { n += binary.MaxVarintLen32 + len(value) } b.grow(n) - off := len(b.data) - data := b.data[:off+n] - data[off] = byte(kt) - off++ - off += binary.PutUvarint(data[off:], uint64(len(key))) - copy(data[off:], key) - off += len(key) + index := batchIndex{keyType: kt} + o := len(b.data) + data := b.data[:o+n] + data[o] = byte(kt) + o++ + o += binary.PutUvarint(data[o:], uint64(len(key))) + index.keyPos = o + index.keyLen = len(key) + o += copy(data[o:], key) if kt == keyTypeVal { - off += binary.PutUvarint(data[off:], uint64(len(value))) - copy(data[off:], value) - off += len(value) + o += binary.PutUvarint(data[o:], uint64(len(value))) + index.valuePos = o + index.valueLen = len(value) + o += copy(data[o:], value) } - b.data = data[:off] - b.rLen++ - // Include 8-byte ikey header - b.bLen += len(key) + len(value) + 8 + b.data = data[:o] + b.index = append(b.index, index) + b.internalLen += index.keyLen + index.valueLen + 8 } // Put appends 'put operation' of the given key/value pair to the batch. -// It is safe to modify the contents of the argument after Put returns. +// It is safe to modify the contents of the argument after Put returns but not +// before. func (b *Batch) Put(key, value []byte) { b.appendRec(keyTypeVal, key, value) } // Delete appends 'delete operation' of the given key to the batch. -// It is safe to modify the contents of the argument after Delete returns. +// It is safe to modify the contents of the argument after Delete returns but +// not before. func (b *Batch) Delete(key []byte) { b.appendRec(keyTypeDel, key, nil) } @@ -111,7 +130,7 @@ func (b *Batch) Delete(key []byte) { // The returned slice is not its own copy, so the contents should not be // modified. func (b *Batch) Dump() []byte { - return b.encode() + return b.data } // Load loads given slice into the batch. Previous contents of the batch @@ -119,144 +138,212 @@ func (b *Batch) Dump() []byte { // The given slice will not be copied and will be used as batch buffer, so // it is not safe to modify the contents of the slice. func (b *Batch) Load(data []byte) error { - return b.decode(0, data) + return b.decode(data, -1) } // Replay replays batch contents. func (b *Batch) Replay(r BatchReplay) error { - return b.decodeRec(func(i int, kt keyType, key, value []byte) error { - switch kt { + for _, index := range b.index { + switch index.keyType { case keyTypeVal: - r.Put(key, value) + r.Put(index.k(b.data), index.v(b.data)) case keyTypeDel: - r.Delete(key) + r.Delete(index.k(b.data)) } - return nil - }) + } + return nil } // Len returns number of records in the batch. func (b *Batch) Len() int { - return b.rLen + return len(b.index) } // Reset resets the batch. func (b *Batch) Reset() { b.data = b.data[:0] - b.seq = 0 - b.rLen = 0 - b.bLen = 0 - b.sync = false + b.index = b.index[:0] + b.internalLen = 0 } -func (b *Batch) init(sync bool) { - b.sync = sync -} - -func (b *Batch) append(p *Batch) { - if p.rLen > 0 { - b.grow(len(p.data) - batchHdrLen) - b.data = append(b.data, p.data[batchHdrLen:]...) - b.rLen += p.rLen - b.bLen += p.bLen - } - if p.sync { - b.sync = true - } -} - -// size returns sums of key/value pair length plus 8-bytes ikey. -func (b *Batch) size() int { - return b.bLen -} - -func (b *Batch) encode() []byte { - b.grow(0) - binary.LittleEndian.PutUint64(b.data, b.seq) - binary.LittleEndian.PutUint32(b.data[8:], uint32(b.rLen)) - - return b.data -} - -func (b *Batch) decode(prevSeq uint64, data []byte) error { - if len(data) < batchHdrLen { - return newErrBatchCorrupted("too short") - } - - b.seq = binary.LittleEndian.Uint64(data) - if b.seq < prevSeq { - return newErrBatchCorrupted("invalid sequence number") - } - b.rLen = int(binary.LittleEndian.Uint32(data[8:])) - if b.rLen < 0 { - return newErrBatchCorrupted("invalid records length") - } - // No need to be precise at this point, it won't be used anyway - b.bLen = len(data) - batchHdrLen - b.data = data - - return nil -} - -func (b *Batch) decodeRec(f func(i int, kt keyType, key, value []byte) error) error { - off := batchHdrLen - for i := 0; i < b.rLen; i++ { - if off >= len(b.data) { - return newErrBatchCorrupted("invalid records length") - } - - kt := keyType(b.data[off]) - if kt > keyTypeVal { - panic(kt) - return newErrBatchCorrupted("bad record: invalid type") - } - off++ - - x, n := binary.Uvarint(b.data[off:]) - off += n - if n <= 0 || off+int(x) > len(b.data) { - return newErrBatchCorrupted("bad record: invalid key length") - } - key := b.data[off : off+int(x)] - off += int(x) - var value []byte - if kt == keyTypeVal { - x, n := binary.Uvarint(b.data[off:]) - off += n - if n <= 0 || off+int(x) > len(b.data) { - return newErrBatchCorrupted("bad record: invalid value length") - } - value = b.data[off : off+int(x)] - off += int(x) - } - - if err := f(i, kt, key, value); err != nil { +func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error { + for i, index := range b.index { + if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil { return err } } - return nil } -func (b *Batch) memReplay(to *memdb.DB) error { - var ikScratch []byte - return b.decodeRec(func(i int, kt keyType, key, value []byte) error { - ikScratch = makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) - return to.Put(ikScratch, value) - }) +func (b *Batch) append(p *Batch) { + ob := len(b.data) + oi := len(b.index) + b.data = append(b.data, p.data...) + b.index = append(b.index, p.index...) + b.internalLen += p.internalLen + + // Updating index offset. + if ob != 0 { + for ; oi < len(b.index); oi++ { + index := &b.index[oi] + index.keyPos += ob + if index.valueLen != 0 { + index.valuePos += ob + } + } + } } -func (b *Batch) memDecodeAndReplay(prevSeq uint64, data []byte, to *memdb.DB) error { - if err := b.decode(prevSeq, data); err != nil { +func (b *Batch) decode(data []byte, expectedLen int) error { + b.data = data + b.index = b.index[:0] + b.internalLen = 0 + err := decodeBatch(data, func(i int, index batchIndex) error { + b.index = append(b.index, index) + b.internalLen += index.keyLen + index.valueLen + 8 + return nil + }) + if err != nil { return err } - return b.memReplay(to) + if expectedLen >= 0 && len(b.index) != expectedLen { + return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index))) + } + return nil } -func (b *Batch) revertMemReplay(to *memdb.DB) error { - var ikScratch []byte - return b.decodeRec(func(i int, kt keyType, key, value []byte) error { - ikScratch := makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) - return to.Delete(ikScratch) - }) +func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error { + var ik []byte + for i, index := range b.index { + ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType) + if err := mdb.Put(ik, index.v(b.data)); err != nil { + return err + } + } + return nil +} + +func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error { + var ik []byte + for i, index := range b.index { + ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType) + if err := mdb.Delete(ik); err != nil { + return err + } + } + return nil +} + +func newBatch() interface{} { + return &Batch{} +} + +func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error { + var index batchIndex + for i, o := 0, 0; o < len(data); i++ { + // Key type. + index.keyType = keyType(data[o]) + if index.keyType > keyTypeVal { + return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType))) + } + o++ + + // Key. + x, n := binary.Uvarint(data[o:]) + o += n + if n <= 0 || o+int(x) > len(data) { + return newErrBatchCorrupted("bad record: invalid key length") + } + index.keyPos = o + index.keyLen = int(x) + o += index.keyLen + + // Value. + if index.keyType == keyTypeVal { + x, n = binary.Uvarint(data[o:]) + o += n + if n <= 0 || o+int(x) > len(data) { + return newErrBatchCorrupted("bad record: invalid value length") + } + index.valuePos = o + index.valueLen = int(x) + o += index.valueLen + } else { + index.valuePos = 0 + index.valueLen = 0 + } + + if err := fn(i, index); err != nil { + return err + } + } + return nil +} + +func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) { + seq, batchLen, err = decodeBatchHeader(data) + if err != nil { + return 0, 0, err + } + if seq < expectSeq { + return 0, 0, newErrBatchCorrupted("invalid sequence number") + } + data = data[batchHeaderLen:] + var ik []byte + var decodedLen int + err = decodeBatch(data, func(i int, index batchIndex) error { + if i >= batchLen { + return newErrBatchCorrupted("invalid records length") + } + ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType) + if err := mdb.Put(ik, index.v(data)); err != nil { + return err + } + decodedLen++ + return nil + }) + if err == nil && decodedLen != batchLen { + err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen)) + } + return +} + +func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte { + dst = ensureBuffer(dst, batchHeaderLen) + binary.LittleEndian.PutUint64(dst, seq) + binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen)) + return dst +} + +func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) { + if len(data) < batchHeaderLen { + return 0, 0, newErrBatchCorrupted("too short") + } + + seq = binary.LittleEndian.Uint64(data) + batchLen = int(binary.LittleEndian.Uint32(data[8:])) + if batchLen < 0 { + return 0, 0, newErrBatchCorrupted("invalid records length") + } + return +} + +func batchesLen(batches []*Batch) int { + batchLen := 0 + for _, batch := range batches { + batchLen += batch.Len() + } + return batchLen +} + +func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error { + if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil { + return err + } + for _, batch := range batches { + if _, err := wr.Write(batch.data); err != nil { + return err + } + } + return nil } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/batch_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/batch_test.go index ce0925e08..62775f777 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/batch_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/batch_test.go @@ -8,116 +8,140 @@ package leveldb import ( "bytes" + "fmt" "testing" + "testing/quick" - "github.com/syndtr/goleveldb/leveldb/comparer" - "github.com/syndtr/goleveldb/leveldb/memdb" + "github.com/syndtr/goleveldb/leveldb/testutil" ) -type tbRec struct { - kt keyType - key, value []byte +func TestBatchHeader(t *testing.T) { + f := func(seq uint64, length uint32) bool { + encoded := encodeBatchHeader(nil, seq, int(length)) + decSeq, decLength, err := decodeBatchHeader(encoded) + return err == nil && decSeq == seq && decLength == int(length) + } + config := &quick.Config{ + Rand: testutil.NewRand(), + } + if err := quick.Check(f, config); err != nil { + t.Error(err) + } } -type testBatch struct { - rec []*tbRec +type batchKV struct { + kt keyType + k, v []byte } -func (p *testBatch) Put(key, value []byte) { - p.rec = append(p.rec, &tbRec{keyTypeVal, key, value}) -} - -func (p *testBatch) Delete(key []byte) { - p.rec = append(p.rec, &tbRec{keyTypeDel, key, nil}) -} - -func compareBatch(t *testing.T, b1, b2 *Batch) { - if b1.seq != b2.seq { - t.Errorf("invalid seq number want %d, got %d", b1.seq, b2.seq) - } - if b1.Len() != b2.Len() { - t.Fatalf("invalid record length want %d, got %d", b1.Len(), b2.Len()) - } - p1, p2 := new(testBatch), new(testBatch) - err := b1.Replay(p1) - if err != nil { - t.Fatal("error when replaying batch 1: ", err) - } - err = b2.Replay(p2) - if err != nil { - t.Fatal("error when replaying batch 2: ", err) - } - for i := range p1.rec { - r1, r2 := p1.rec[i], p2.rec[i] - if r1.kt != r2.kt { - t.Errorf("invalid type on record '%d' want %d, got %d", i, r1.kt, r2.kt) +func TestBatch(t *testing.T) { + var ( + kvs []batchKV + internalLen int + ) + batch := new(Batch) + rbatch := new(Batch) + abatch := new(Batch) + testBatch := func(i int, kt keyType, k, v []byte) error { + kv := kvs[i] + if kv.kt != kt { + return fmt.Errorf("invalid key type, index=%d: %d vs %d", i, kv.kt, kt) } - if !bytes.Equal(r1.key, r2.key) { - t.Errorf("invalid key on record '%d' want %s, got %s", i, string(r1.key), string(r2.key)) + if !bytes.Equal(kv.k, k) { + return fmt.Errorf("invalid key, index=%d", i) } - if r1.kt == keyTypeVal { - if !bytes.Equal(r1.value, r2.value) { - t.Errorf("invalid value on record '%d' want %s, got %s", i, string(r1.value), string(r2.value)) + if !bytes.Equal(kv.v, v) { + return fmt.Errorf("invalid value, index=%d", i) + } + return nil + } + f := func(ktr uint8, k, v []byte) bool { + kt := keyType(ktr % 2) + if kt == keyTypeVal { + batch.Put(k, v) + rbatch.Put(k, v) + kvs = append(kvs, batchKV{kt: kt, k: k, v: v}) + internalLen += len(k) + len(v) + 8 + } else { + batch.Delete(k) + rbatch.Delete(k) + kvs = append(kvs, batchKV{kt: kt, k: k}) + internalLen += len(k) + 8 + } + if batch.Len() != len(kvs) { + t.Logf("batch.Len: %d vs %d", len(kvs), batch.Len()) + return false + } + if batch.internalLen != internalLen { + t.Logf("abatch.internalLen: %d vs %d", internalLen, batch.internalLen) + return false + } + if len(kvs)%1000 == 0 { + if err := batch.replayInternal(testBatch); err != nil { + t.Logf("batch.replayInternal: %v", err) + return false + } + + abatch.append(rbatch) + rbatch.Reset() + if abatch.Len() != len(kvs) { + t.Logf("abatch.Len: %d vs %d", len(kvs), abatch.Len()) + return false + } + if abatch.internalLen != internalLen { + t.Logf("abatch.internalLen: %d vs %d", internalLen, abatch.internalLen) + return false + } + if err := abatch.replayInternal(testBatch); err != nil { + t.Logf("abatch.replayInternal: %v", err) + return false + } + + nbatch := new(Batch) + if err := nbatch.Load(batch.Dump()); err != nil { + t.Logf("nbatch.Load: %v", err) + return false + } + if nbatch.Len() != len(kvs) { + t.Logf("nbatch.Len: %d vs %d", len(kvs), nbatch.Len()) + return false + } + if nbatch.internalLen != internalLen { + t.Logf("nbatch.internalLen: %d vs %d", internalLen, nbatch.internalLen) + return false + } + if err := nbatch.replayInternal(testBatch); err != nil { + t.Logf("nbatch.replayInternal: %v", err) + return false } } - } -} - -func TestBatch_EncodeDecode(t *testing.T) { - b1 := new(Batch) - b1.seq = 10009 - b1.Put([]byte("key1"), []byte("value1")) - b1.Put([]byte("key2"), []byte("value2")) - b1.Delete([]byte("key1")) - b1.Put([]byte("k"), []byte("")) - b1.Put([]byte("zzzzzzzzzzz"), []byte("zzzzzzzzzzzzzzzzzzzzzzzz")) - b1.Delete([]byte("key10000")) - b1.Delete([]byte("k")) - buf := b1.encode() - b2 := new(Batch) - err := b2.decode(0, buf) - if err != nil { - t.Error("error when decoding batch: ", err) - } - compareBatch(t, b1, b2) -} - -func TestBatch_Append(t *testing.T) { - b1 := new(Batch) - b1.seq = 10009 - b1.Put([]byte("key1"), []byte("value1")) - b1.Put([]byte("key2"), []byte("value2")) - b1.Delete([]byte("key1")) - b1.Put([]byte("foo"), []byte("foovalue")) - b1.Put([]byte("bar"), []byte("barvalue")) - b2a := new(Batch) - b2a.seq = 10009 - b2a.Put([]byte("key1"), []byte("value1")) - b2a.Put([]byte("key2"), []byte("value2")) - b2a.Delete([]byte("key1")) - b2b := new(Batch) - b2b.Put([]byte("foo"), []byte("foovalue")) - b2b.Put([]byte("bar"), []byte("barvalue")) - b2a.append(b2b) - compareBatch(t, b1, b2a) - if b1.size() != b2a.size() { - t.Fatalf("invalid batch size want %d, got %d", b1.size(), b2a.size()) - } -} - -func TestBatch_Size(t *testing.T) { - b := new(Batch) - for i := 0; i < 2; i++ { - b.Put([]byte("key1"), []byte("value1")) - b.Put([]byte("key2"), []byte("value2")) - b.Delete([]byte("key1")) - b.Put([]byte("foo"), []byte("foovalue")) - b.Put([]byte("bar"), []byte("barvalue")) - mem := memdb.New(&iComparer{comparer.DefaultComparer}, 0) - b.memReplay(mem) - if b.size() != mem.Size() { - t.Errorf("invalid batch size calculation, want=%d got=%d", mem.Size(), b.size()) + if len(kvs)%10000 == 0 { + nbatch := new(Batch) + if err := batch.Replay(nbatch); err != nil { + t.Logf("batch.Replay: %v", err) + return false + } + if nbatch.Len() != len(kvs) { + t.Logf("nbatch.Len: %d vs %d", len(kvs), nbatch.Len()) + return false + } + if nbatch.internalLen != internalLen { + t.Logf("nbatch.internalLen: %d vs %d", internalLen, nbatch.internalLen) + return false + } + if err := nbatch.replayInternal(testBatch); err != nil { + t.Logf("nbatch.replayInternal: %v", err) + return false + } } - b.Reset() + return true } + config := &quick.Config{ + MaxCount: 40000, + Rand: testutil.NewRand(), + } + if err := quick.Check(f, config); err != nil { + t.Error(err) + } + t.Logf("length=%d internalLen=%d", len(kvs), internalLen) } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/bench_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/bench_test.go index 12a849621..435250c71 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/bench_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/bench_test.go @@ -104,7 +104,6 @@ func openDBBench(b *testing.B, noCompress bool) *dbBench { b.Fatal("cannot open db: ", err) } - runtime.GOMAXPROCS(runtime.NumCPU()) return p } @@ -260,7 +259,6 @@ func (p *dbBench) close() { p.keys = nil p.values = nil runtime.GC() - runtime.GOMAXPROCS(1) } func BenchmarkDBWrite(b *testing.B) { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go b/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go index f28e1bdea..c5940b232 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go @@ -16,7 +16,7 @@ import ( ) // Cacher provides interface to implements a caching functionality. -// An implementation must be goroutine-safe. +// An implementation must be safe for concurrent use. type Cacher interface { // Capacity returns cache capacity. Capacity() int diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache_test.go index b7b163845..6b017bd0c 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache_test.go @@ -50,14 +50,24 @@ func set(c *Cache, ns, key uint64, value Value, charge int, relf func()) *Handle }) } +type cacheMapTestParams struct { + nobjects, nhandles, concurrent, repeat int +} + func TestCacheMap(t *testing.T) { runtime.GOMAXPROCS(runtime.NumCPU()) - nsx := []struct { - nobjects, nhandles, concurrent, repeat int - }{ - {10000, 400, 50, 3}, - {100000, 1000, 100, 10}, + var params []cacheMapTestParams + if testing.Short() { + params = []cacheMapTestParams{ + {1000, 100, 20, 3}, + {10000, 300, 50, 10}, + } + } else { + params = []cacheMapTestParams{ + {10000, 400, 50, 3}, + {100000, 1000, 100, 10}, + } } var ( @@ -65,7 +75,7 @@ func TestCacheMap(t *testing.T) { handles [][]unsafe.Pointer ) - for _, x := range nsx { + for _, x := range params { objects = append(objects, make([]int32o, x.nobjects)) handles = append(handles, make([]unsafe.Pointer, x.nhandles)) } @@ -75,7 +85,7 @@ func TestCacheMap(t *testing.T) { wg := new(sync.WaitGroup) var done int32 - for ns, x := range nsx { + for ns, x := range params { for i := 0; i < x.concurrent; i++ { wg.Add(1) go func(ns, i, repeat int, objects []int32o, handles []unsafe.Pointer) { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/comparer.go b/vendor/github.com/syndtr/goleveldb/leveldb/comparer.go index 248bf7c21..448402b82 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/comparer.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/comparer.go @@ -6,7 +6,9 @@ package leveldb -import "github.com/syndtr/goleveldb/leveldb/comparer" +import ( + "github.com/syndtr/goleveldb/leveldb/comparer" +) type iComparer struct { ucmp comparer.Comparer @@ -33,12 +35,12 @@ func (icmp *iComparer) Name() string { } func (icmp *iComparer) Compare(a, b []byte) int { - x := icmp.ucmp.Compare(internalKey(a).ukey(), internalKey(b).ukey()) + x := icmp.uCompare(internalKey(a).ukey(), internalKey(b).ukey()) if x == 0 { if m, n := internalKey(a).num(), internalKey(b).num(); m > n { - x = -1 + return -1 } else if m < n { - x = 1 + return 1 } } return x @@ -46,30 +48,20 @@ func (icmp *iComparer) Compare(a, b []byte) int { func (icmp *iComparer) Separator(dst, a, b []byte) []byte { ua, ub := internalKey(a).ukey(), internalKey(b).ukey() - dst = icmp.ucmp.Separator(dst, ua, ub) - if dst == nil { - return nil + dst = icmp.uSeparator(dst, ua, ub) + if dst != nil && len(dst) < len(ua) && icmp.uCompare(ua, dst) < 0 { + // Append earliest possible number. + return append(dst, keyMaxNumBytes...) } - if len(dst) < len(ua) && icmp.uCompare(ua, dst) < 0 { - dst = append(dst, keyMaxNumBytes...) - } else { - // Did not close possibilities that n maybe longer than len(ub). - dst = append(dst, a[len(a)-8:]...) - } - return dst + return nil } func (icmp *iComparer) Successor(dst, b []byte) []byte { ub := internalKey(b).ukey() - dst = icmp.ucmp.Successor(dst, ub) - if dst == nil { - return nil + dst = icmp.uSuccessor(dst, ub) + if dst != nil && len(dst) < len(ub) && icmp.uCompare(ub, dst) < 0 { + // Append earliest possible number. + return append(dst, keyMaxNumBytes...) } - if len(dst) < len(ub) && icmp.uCompare(ub, dst) < 0 { - dst = append(dst, keyMaxNumBytes...) - } else { - // Did not close possibilities that n maybe longer than len(ub). - dst = append(dst, b[len(b)-8:]...) - } - return dst + return nil } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db.go b/vendor/github.com/syndtr/goleveldb/leveldb/db.go index 1aed20943..a02cb2c50 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db.go @@ -53,14 +53,13 @@ type DB struct { aliveSnaps, aliveIters int32 // Write. - writeC chan *Batch + batchPool sync.Pool + writeMergeC chan writeMerge writeMergedC chan bool writeLockC chan struct{} writeAckC chan error writeDelay time.Duration writeDelayN int - journalC chan *Batch - journalAckC chan error tr *Transaction // Compaction. @@ -94,12 +93,11 @@ func openDB(s *session) (*DB, error) { // Snapshot snapsList: list.New(), // Write - writeC: make(chan *Batch), + batchPool: sync.Pool{New: newBatch}, + writeMergeC: make(chan writeMerge), writeMergedC: make(chan bool), writeLockC: make(chan struct{}, 1), writeAckC: make(chan error), - journalC: make(chan *Batch), - journalAckC: make(chan error), // Compaction tcompCmdC: make(chan cCmd), tcompPauseC: make(chan chan<- struct{}), @@ -144,10 +142,10 @@ func openDB(s *session) (*DB, error) { if readOnly { db.SetReadOnly() } else { - db.closeW.Add(3) + db.closeW.Add(2) go db.tCompaction() go db.mCompaction() - go db.jWriter() + // go db.jWriter() } s.logf("db@open done T·%v", time.Since(start)) @@ -162,10 +160,10 @@ func openDB(s *session) (*DB, error) { // os.ErrExist error. // // Open will return an error with type of ErrCorrupted if corruption -// detected in the DB. Corrupted DB can be recovered with Recover -// function. +// detected in the DB. Use errors.IsCorrupted to test whether an error is +// due to corruption. Corrupted DB can be recovered with Recover function. // -// The returned DB instance is goroutine-safe. +// The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { s, err := newSession(stor, o) @@ -202,13 +200,13 @@ func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { // os.ErrExist error. // // OpenFile uses standard file-system backed storage implementation as -// desribed in the leveldb/storage package. +// described in the leveldb/storage package. // // OpenFile will return an error with type of ErrCorrupted if corruption -// detected in the DB. Corrupted DB can be recovered with Recover -// function. +// detected in the DB. Use errors.IsCorrupted to test whether an error is +// due to corruption. Corrupted DB can be recovered with Recover function. // -// The returned DB instance is goroutine-safe. +// The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func OpenFile(path string, o *opt.Options) (db *DB, err error) { stor, err := storage.OpenFile(path, o.GetReadOnly()) @@ -229,7 +227,7 @@ func OpenFile(path string, o *opt.Options) (db *DB, err error) { // The DB must already exist or it will returns an error. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // -// The returned DB instance is goroutine-safe. +// The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { s, err := newSession(stor, o) @@ -255,10 +253,10 @@ func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { // The DB must already exist or it will returns an error. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // -// RecoverFile uses standard file-system backed storage implementation as desribed +// RecoverFile uses standard file-system backed storage implementation as described // in the leveldb/storage package. // -// The returned DB instance is goroutine-safe. +// The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func RecoverFile(path string, o *opt.Options) (db *DB, err error) { stor, err := storage.OpenFile(path, false) @@ -504,10 +502,11 @@ func (db *DB) recoverJournal() error { checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) writeBuffer = db.s.o.GetWriteBuffer() - jr *journal.Reader - mdb = memdb.New(db.s.icmp, writeBuffer) - buf = &util.Buffer{} - batch = &Batch{} + jr *journal.Reader + mdb = memdb.New(db.s.icmp, writeBuffer) + buf = &util.Buffer{} + batchSeq uint64 + batchLen int ) for _, fd := range fds { @@ -526,7 +525,7 @@ func (db *DB) recoverJournal() error { } // Flush memdb and remove obsolete journal file. - if !ofd.Nil() { + if !ofd.Zero() { if mdb.Len() > 0 { if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { fr.Close() @@ -569,7 +568,8 @@ func (db *DB) recoverJournal() error { fr.Close() return errors.SetFd(err, fd) } - if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil { + batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) + if err != nil { if !strict && errors.IsCorrupted(err) { db.s.logf("journal error: %v (skipped)", err) // We won't apply sequence number as it might be corrupted. @@ -581,7 +581,7 @@ func (db *DB) recoverJournal() error { } // Save sequence number. - db.seq = batch.seq + uint64(batch.Len()) + db.seq = batchSeq + uint64(batchLen) // Flush it if large enough. if mdb.Size() >= writeBuffer { @@ -624,7 +624,7 @@ func (db *DB) recoverJournal() error { } // Remove the last obsolete journal file. - if !ofd.Nil() { + if !ofd.Zero() { db.s.stor.Remove(ofd) } @@ -661,9 +661,10 @@ func (db *DB) recoverJournalRO() error { db.logf("journal@recovery RO·Mode F·%d", len(fds)) var ( - jr *journal.Reader - buf = &util.Buffer{} - batch = &Batch{} + jr *journal.Reader + buf = &util.Buffer{} + batchSeq uint64 + batchLen int ) for _, fd := range fds { @@ -703,7 +704,8 @@ func (db *DB) recoverJournalRO() error { fr.Close() return errors.SetFd(err, fd) } - if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil { + batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) + if err != nil { if !strict && errors.IsCorrupted(err) { db.s.logf("journal error: %v (skipped)", err) // We won't apply sequence number as it might be corrupted. @@ -715,7 +717,7 @@ func (db *DB) recoverJournalRO() error { } // Save sequence number. - db.seq = batch.seq + uint64(batch.Len()) + db.seq = batchSeq + uint64(batchLen) } fr.Close() @@ -856,7 +858,7 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) { // NewIterator returns an iterator for the latest snapshot of the // underlying DB. -// The returned iterator is not goroutine-safe, but it is safe to use +// The returned iterator is not safe for concurrent use, but it is safe to use // multiple iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently with modifying its // underlying DB. The resultant key/value pairs are guaranteed to be diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go index 659f00dc6..2d0ad0753 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -96,7 +96,7 @@ noerr: default: goto haserr } - case _, _ = <-db.closeC: + case <-db.closeC: return } } @@ -113,7 +113,7 @@ haserr: goto hasperr default: } - case _, _ = <-db.closeC: + case <-db.closeC: return } } @@ -126,7 +126,7 @@ hasperr: case db.writeLockC <- struct{}{}: // Hold write lock, so that write won't pass-through. db.compWriteLocking = true - case _, _ = <-db.closeC: + case <-db.closeC: if db.compWriteLocking { // We should release the lock or Close will hang. <-db.writeLockC @@ -195,7 +195,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) { db.logf("%s exiting (persistent error %q)", name, perr) db.compactionExitTransact() } - case _, _ = <-db.closeC: + case <-db.closeC: db.logf("%s exiting", name) db.compactionExitTransact() } @@ -224,7 +224,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) { } select { case <-backoffT.C: - case _, _ = <-db.closeC: + case <-db.closeC: db.logf("%s exiting", name) db.compactionExitTransact() } @@ -288,7 +288,7 @@ func (db *DB) memCompaction() { case <-db.compPerErrC: close(resumeC) resumeC = nil - case _, _ = <-db.closeC: + case <-db.closeC: return } @@ -337,7 +337,7 @@ func (db *DB) memCompaction() { select { case <-resumeC: close(resumeC) - case _, _ = <-db.closeC: + case <-db.closeC: return } } @@ -378,7 +378,7 @@ func (b *tableCompactionBuilder) appendKV(key, value []byte) error { select { case ch := <-b.db.tcompPauseC: b.db.pauseCompaction(ch) - case _, _ = <-b.db.closeC: + case <-b.db.closeC: b.db.compactionExitTransact() default: } @@ -643,7 +643,7 @@ func (db *DB) tableNeedCompaction() bool { func (db *DB) pauseCompaction(ch chan<- struct{}) { select { case ch <- struct{}{}: - case _, _ = <-db.closeC: + case <-db.closeC: db.compactionExitTransact() } } @@ -697,14 +697,14 @@ func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) { case compC <- cAuto{ch}: case err = <-db.compErrC: return - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } // Wait cmd. select { case err = <-ch: case err = <-db.compErrC: - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } return err @@ -719,14 +719,14 @@ func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (e case compC <- cRange{level, min, max, ch}: case err := <-db.compErrC: return err - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } // Wait cmd. select { case err = <-ch: case err = <-db.compErrC: - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } return err @@ -758,7 +758,7 @@ func (db *DB) mCompaction() { default: panic("leveldb: unknown command") } - case _, _ = <-db.closeC: + case <-db.closeC: return } } @@ -791,7 +791,7 @@ func (db *DB) tCompaction() { case ch := <-db.tcompPauseC: db.pauseCompaction(ch) continue - case _, _ = <-db.closeC: + case <-db.closeC: return default: } @@ -806,7 +806,7 @@ func (db *DB) tCompaction() { case ch := <-db.tcompPauseC: db.pauseCompaction(ch) continue - case _, _ = <-db.closeC: + case <-db.closeC: return } } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go index 977f65ba5..2c69d2e53 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go @@ -59,7 +59,7 @@ func (db *DB) releaseSnapshot(se *snapshotElement) { } } -// Gets minimum sequence that not being snapshoted. +// Gets minimum sequence that not being snapshotted. func (db *DB) minSeq() uint64 { db.snapsMu.Lock() defer db.snapsMu.Unlock() @@ -131,7 +131,7 @@ func (snap *Snapshot) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) } // NewIterator returns an iterator for the snapshot of the underlying DB. -// The returned iterator is not goroutine-safe, but it is safe to use +// The returned iterator is not safe for concurrent use, but it is safe to use // multiple iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently with modifying its // underlying DB. The resultant key/value pairs are guaranteed to be diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go index b7de09e15..85b02d24b 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go @@ -99,7 +99,7 @@ func (db *DB) mpoolDrain() { case <-db.memPool: default: } - case _, _ = <-db.closeC: + case <-db.closeC: ticker.Stop() // Make sure the pool is drained. select { @@ -164,7 +164,7 @@ func (db *DB) getMems() (e, f *memDB) { return db.mem, db.frozenMem } -// Get frozen memdb. +// Get effective memdb. func (db *DB) getEffectiveMem() *memDB { db.memMu.RLock() defer db.memMu.RUnlock() diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_test.go index 6eaf0d558..602376b05 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_test.go @@ -1877,7 +1877,7 @@ func TestDB_ConcurrentIterator(t *testing.T) { } func TestDB_ConcurrentWrite(t *testing.T) { - const n, niter = 10, 10000 + const n, bk, niter = 10, 3, 10000 h := newDbHarness(t) defer h.close() @@ -1889,7 +1889,7 @@ func TestDB_ConcurrentWrite(t *testing.T) { go func(i int) { defer wg.Done() for k := 0; k < niter; k++ { - kstr := fmt.Sprintf("%d.%d", i, k) + kstr := fmt.Sprintf("put-%d.%d", i, k) vstr := fmt.Sprintf("v%d", k) h.put(kstr, vstr) // Key should immediately available after put returns. @@ -1897,6 +1897,24 @@ func TestDB_ConcurrentWrite(t *testing.T) { } }(i) } + for i := 0; i < n; i++ { + wg.Add(1) + batch := &Batch{} + go func(i int) { + defer wg.Done() + for k := 0; k < niter; k++ { + batch.Reset() + for j := 0; j < bk; j++ { + batch.Put([]byte(fmt.Sprintf("batch-%d.%d.%d", i, k, j)), []byte(fmt.Sprintf("v%d", k))) + } + h.write(batch) + // Key should immediately available after put returns. + for j := 0; j < bk; j++ { + h.getVal(fmt.Sprintf("batch-%d.%d.%d", i, k, j), fmt.Sprintf("v%d", k)) + } + } + }(i) + } wg.Wait() } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go index fca88037b..b8f7e7d21 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go @@ -59,8 +59,8 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) { } // NewIterator returns an iterator for the latest snapshot of the transaction. -// The returned iterator is not goroutine-safe, but it is safe to use multiple -// iterators concurrently, with each in a dedicated goroutine. +// The returned iterator is not safe for concurrent use, but it is safe to use +// multiple iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently while writes to the // transaction. The resultant key/value pairs are guaranteed to be consistent. // @@ -167,8 +167,8 @@ func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error { if tr.closed { return errTransactionDone } - return b.decodeRec(func(i int, kt keyType, key, value []byte) error { - return tr.put(kt, key, value) + return b.replayInternal(func(i int, kt keyType, k, v []byte) error { + return tr.put(kt, k, v) }) } @@ -179,7 +179,8 @@ func (tr *Transaction) setDone() { <-tr.db.writeLockC } -// Commit commits the transaction. +// Commit commits the transaction. If error is not nil, then the transaction is +// not committed, it can then either be retried or discarded. // // Other methods should not be called after transaction has been committed. func (tr *Transaction) Commit() error { @@ -192,24 +193,27 @@ func (tr *Transaction) Commit() error { if tr.closed { return errTransactionDone } - defer tr.setDone() if err := tr.flush(); err != nil { - tr.discard() + // Return error, lets user decide either to retry or discard + // transaction. return err } if len(tr.tables) != 0 { // Committing transaction. tr.rec.setSeqNum(tr.seq) tr.db.compCommitLk.Lock() - defer tr.db.compCommitLk.Unlock() + tr.stats.startTimer() + var cerr error for retry := 0; retry < 3; retry++ { - if err := tr.db.s.commit(&tr.rec); err != nil { - tr.db.logf("transaction@commit error R·%d %q", retry, err) + cerr = tr.db.s.commit(&tr.rec) + if cerr != nil { + tr.db.logf("transaction@commit error R·%d %q", retry, cerr) select { case <-time.After(time.Second): - case _, _ = <-tr.db.closeC: + case <-tr.db.closeC: tr.db.logf("transaction@commit exiting") - return err + tr.db.compCommitLk.Unlock() + return cerr } } else { // Success. Set db.seq. @@ -217,9 +221,26 @@ func (tr *Transaction) Commit() error { break } } + tr.stats.stopTimer() + if cerr != nil { + // Return error, lets user decide either to retry or discard + // transaction. + return cerr + } + + // Update compaction stats. This is safe as long as we hold compCommitLk. + tr.db.compStats.addStat(0, &tr.stats) + // Trigger table auto-compaction. tr.db.compTrigger(tr.db.tcompCmdC) + tr.db.compCommitLk.Unlock() + + // Additionally, wait compaction when certain threshold reached. + // Ignore error, returns error only if transaction can't be committed. + tr.db.waitCompaction() } + // Only mark as done if transaction committed successfully. + tr.setDone() return nil } @@ -245,10 +266,20 @@ func (tr *Transaction) Discard() { tr.lk.Unlock() } +func (db *DB) waitCompaction() error { + if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() { + return db.compTriggerWait(db.tcompCmdC) + } + return nil +} + // OpenTransaction opens an atomic DB transaction. Only one transaction can be -// opened at a time. Write will be blocked until the transaction is committed or -// discarded. -// The returned transaction handle is goroutine-safe. +// opened at a time. Subsequent call to Write and OpenTransaction will be blocked +// until in-flight transaction is committed or discarded. +// The returned transaction handle is safe for concurrent use. +// +// Transaction is expensive and can overwhelm compaction, especially if +// transaction size is small. Use with caution. // // The transaction must be closed once done, either by committing or discarding // the transaction. @@ -263,7 +294,7 @@ func (db *DB) OpenTransaction() (*Transaction, error) { case db.writeLockC <- struct{}{}: case err := <-db.compPerErrC: return nil, err - case _, _ = <-db.closeC: + case <-db.closeC: return nil, ErrClosed } @@ -278,6 +309,11 @@ func (db *DB) OpenTransaction() (*Transaction, error) { } } + // Wait compaction when certain threshold reached. + if err := db.waitCompaction(); err != nil { + return nil, err + } + tr := &Transaction{ db: db, seq: db.seq, diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_util.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_util.go index 7fd386ca4..7ecd960d2 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_util.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_util.go @@ -62,7 +62,7 @@ func (db *DB) checkAndCleanFiles() error { case storage.TypeManifest: keep = fd.Num >= db.s.manifestFd.Num case storage.TypeJournal: - if !db.frozenJournalFd.Nil() { + if !db.frozenJournalFd.Zero() { keep = fd.Num >= db.frozenJournalFd.Num } else { keep = fd.Num >= db.journalFd.Num diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go index 37acb3f3a..cc428b695 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go @@ -14,37 +14,23 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -func (db *DB) writeJournal(b *Batch) error { - w, err := db.journal.Next() +func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error { + wr, err := db.journal.Next() if err != nil { return err } - if _, err := w.Write(b.encode()); err != nil { + if err := writeBatchesWithHeader(wr, batches, seq); err != nil { return err } if err := db.journal.Flush(); err != nil { return err } - if b.sync { + if sync { return db.journalWriter.Sync() } return nil } -func (db *DB) jWriter() { - defer db.closeW.Done() - for { - select { - case b := <-db.journalC: - if b != nil { - db.journalAckC <- db.writeJournal(b) - } - case _, _ = <-db.closeC: - return - } - } -} - func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) { // Wait for pending memdb compaction. err = db.compTriggerWait(db.mcompCmdC) @@ -69,9 +55,9 @@ func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) { func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { delayed := false + slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger() + pauseTrigger := db.s.o.GetWriteL0PauseTrigger() flush := func() (retry bool) { - v := db.s.version() - defer v.release() mdb = db.getEffectiveMem() if mdb == nil { err = ErrClosed @@ -83,14 +69,15 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { mdb = nil } }() + tLen := db.s.tLen(0) mdbFree = mdb.Free() switch { - case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed: + case tLen >= slowdownTrigger && !delayed: delayed = true time.Sleep(time.Millisecond) case mdbFree >= n: return false - case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger(): + case tLen >= pauseTrigger: delayed = true err = db.compTriggerWait(db.tcompCmdC) if err != nil { @@ -127,159 +114,250 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { return } -// Write apply the given batch to the DB. The batch will be applied -// sequentially. -// -// It is safe to modify the contents of the arguments after Write returns. -func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { - err = db.ok() - if err != nil || b == nil || b.Len() == 0 { - return +type writeMerge struct { + sync bool + batch *Batch + keyType keyType + key, value []byte +} + +func (db *DB) unlockWrite(overflow bool, merged int, err error) { + for i := 0; i < merged; i++ { + db.writeAckC <- err + } + if overflow { + // Pass lock to the next write (that failed to merge). + db.writeMergedC <- false + } else { + // Release lock. + <-db.writeLockC + } +} + +// ourBatch if defined should equal with batch. +func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error { + // Try to flush memdb. This method would also trying to throttle writes + // if it is too fast and compaction cannot catch-up. + mdb, mdbFree, err := db.flush(batch.internalLen) + if err != nil { + db.unlockWrite(false, 0, err) + return err + } + defer mdb.decref() + + var ( + overflow bool + merged int + batches = []*Batch{batch} + ) + + if merge { + // Merge limit. + var mergeLimit int + if batch.internalLen > 128<<10 { + mergeLimit = (1 << 20) - batch.internalLen + } else { + mergeLimit = 128 << 10 + } + mergeCap := mdbFree - batch.internalLen + if mergeLimit > mergeCap { + mergeLimit = mergeCap + } + + merge: + for mergeLimit > 0 { + select { + case incoming := <-db.writeMergeC: + if incoming.batch != nil { + // Merge batch. + if incoming.batch.internalLen > mergeLimit { + overflow = true + break merge + } + batches = append(batches, incoming.batch) + mergeLimit -= incoming.batch.internalLen + } else { + // Merge put. + internalLen := len(incoming.key) + len(incoming.value) + 8 + if internalLen > mergeLimit { + overflow = true + break merge + } + if ourBatch == nil { + ourBatch = db.batchPool.Get().(*Batch) + ourBatch.Reset() + batches = append(batches, ourBatch) + } + // We can use same batch since concurrent write doesn't + // guarantee write order. + ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value) + mergeLimit -= internalLen + } + sync = sync || incoming.sync + merged++ + db.writeMergedC <- true + + default: + break merge + } + } } - b.init(wo.GetSync() && !db.s.o.GetNoSync()) + // Seq number. + seq := db.seq + 1 - if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() { - // Writes using transaction. - tr, err1 := db.OpenTransaction() - if err1 != nil { - return err1 + // Write journal. + if err := db.writeJournal(batches, seq, sync); err != nil { + db.unlockWrite(overflow, merged, err) + return err + } + + // Put batches. + for _, batch := range batches { + if err := batch.putMem(seq, mdb.DB); err != nil { + panic(err) } - if err1 := tr.Write(b, wo); err1 != nil { + seq += uint64(batch.Len()) + } + + // Incr seq number. + db.addSeq(uint64(batchesLen(batches))) + + // Rotate memdb if it's reach the threshold. + if batch.internalLen >= mdbFree { + db.rotateMem(0, false) + } + + db.unlockWrite(overflow, merged, nil) + return nil +} + +// Write apply the given batch to the DB. The batch records will be applied +// sequentially. Write might be used concurrently, when used concurrently and +// batch is small enough, write will try to merge the batches. Set NoWriteMerge +// option to true to disable write merge. +// +// It is safe to modify the contents of the arguments after Write returns but +// not before. Write will not modify content of the batch. +func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error { + if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 { + return err + } + + // If the batch size is larger than write buffer, it may justified to write + // using transaction instead. Using transaction the batch will be written + // into tables directly, skipping the journaling. + if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() { + tr, err := db.OpenTransaction() + if err != nil { + return err + } + if err := tr.Write(batch, wo); err != nil { tr.Discard() - return err1 + return err } return tr.Commit() } - // The write happen synchronously. - select { - case db.writeC <- b: - if <-db.writeMergedC { - return <-db.writeAckC - } - // Continue, the write lock already acquired by previous writer - // and handed out to us. - case db.writeLockC <- struct{}{}: - case err = <-db.compPerErrC: - return - case _, _ = <-db.closeC: - return ErrClosed - } + merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge() + sync := wo.GetSync() && !db.s.o.GetNoSync() - merged := 0 - danglingMerge := false - defer func() { - for i := 0; i < merged; i++ { - db.writeAckC <- err - } - if danglingMerge { - // Only one dangling merge at most, so this is safe. - db.writeMergedC <- false - } else { - <-db.writeLockC - } - }() - - mdb, mdbFree, err := db.flush(b.size()) - if err != nil { - return - } - defer mdb.decref() - - // Calculate maximum size of the batch. - m := 1 << 20 - if x := b.size(); x <= 128<<10 { - m = x + (128 << 10) - } - m = minInt(m, mdbFree) - - // Merge with other batch. -drain: - for b.size() < m && !b.sync { + // Acquire write lock. + if merge { select { - case nb := <-db.writeC: - if b.size()+nb.size() <= m { - b.append(nb) - db.writeMergedC <- true - merged++ - } else { - danglingMerge = true - break drain + case db.writeMergeC <- writeMerge{sync: sync, batch: batch}: + if <-db.writeMergedC { + // Write is merged. + return <-db.writeAckC } - default: - break drain - } - } - - // Set batch first seq number relative from last seq. - b.seq = db.seq + 1 - - // Write journal concurrently if it is large enough. - if b.size() >= (128 << 10) { - // Push the write batch to the journal writer - select { - case db.journalC <- b: - // Write into memdb - if berr := b.memReplay(mdb.DB); berr != nil { - panic(berr) - } - case err = <-db.compPerErrC: - return - case _, _ = <-db.closeC: - err = ErrClosed - return - } - // Wait for journal writer - select { - case err = <-db.journalAckC: - if err != nil { - // Revert memdb if error detected - if berr := b.revertMemReplay(mdb.DB); berr != nil { - panic(berr) - } - return - } - case _, _ = <-db.closeC: - err = ErrClosed - return + // Write is not merged, the write lock is handed to us. Continue. + case db.writeLockC <- struct{}{}: + // Write lock acquired. + case err := <-db.compPerErrC: + // Compaction error. + return err + case <-db.closeC: + // Closed + return ErrClosed } } else { - err = db.writeJournal(b) - if err != nil { - return - } - if berr := b.memReplay(mdb.DB); berr != nil { - panic(berr) + select { + case db.writeLockC <- struct{}{}: + // Write lock acquired. + case err := <-db.compPerErrC: + // Compaction error. + return err + case <-db.closeC: + // Closed + return ErrClosed } } - // Set last seq number. - db.addSeq(uint64(b.Len())) + return db.writeLocked(batch, nil, merge, sync) +} - if b.size() >= mdbFree { - db.rotateMem(0, false) +func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error { + if err := db.ok(); err != nil { + return err } - return + + merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge() + sync := wo.GetSync() && !db.s.o.GetNoSync() + + // Acquire write lock. + if merge { + select { + case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}: + if <-db.writeMergedC { + // Write is merged. + return <-db.writeAckC + } + // Write is not merged, the write lock is handed to us. Continue. + case db.writeLockC <- struct{}{}: + // Write lock acquired. + case err := <-db.compPerErrC: + // Compaction error. + return err + case <-db.closeC: + // Closed + return ErrClosed + } + } else { + select { + case db.writeLockC <- struct{}{}: + // Write lock acquired. + case err := <-db.compPerErrC: + // Compaction error. + return err + case <-db.closeC: + // Closed + return ErrClosed + } + } + + batch := db.batchPool.Get().(*Batch) + batch.Reset() + batch.appendRec(kt, key, value) + return db.writeLocked(batch, batch, merge, sync) } // Put sets the value for the given key. It overwrites any previous value -// for that key; a DB is not a multi-map. +// for that key; a DB is not a multi-map. Write merge also applies for Put, see +// Write. // -// It is safe to modify the contents of the arguments after Put returns. +// It is safe to modify the contents of the arguments after Put returns but not +// before. func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error { - b := new(Batch) - b.Put(key, value) - return db.Write(b, wo) + return db.putRec(keyTypeVal, key, value, wo) } -// Delete deletes the value for the given key. +// Delete deletes the value for the given key. Delete will not returns error if +// key doesn't exist. Write merge also applies for Delete, see Write. // -// It is safe to modify the contents of the arguments after Delete returns. +// It is safe to modify the contents of the arguments after Delete returns but +// not before. func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { - b := new(Batch) - b.Delete(key) - return db.Write(b, wo) + return db.putRec(keyTypeDel, key, nil, wo) } func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { @@ -308,7 +386,7 @@ func (db *DB) CompactRange(r util.Range) error { case db.writeLockC <- struct{}{}: case err := <-db.compPerErrC: return err - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } @@ -348,7 +426,7 @@ func (db *DB) SetReadOnly() error { db.compWriteLocking = true case err := <-db.compPerErrC: return err - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } @@ -357,7 +435,7 @@ func (db *DB) SetReadOnly() error { case db.compErrSetC <- ErrReadOnly: case perr := <-db.compPerErrC: return perr - case _, _ = <-db.closeC: + case <-db.closeC: return ErrClosed } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/errors.go b/vendor/github.com/syndtr/goleveldb/leveldb/errors.go index c8bd66a5a..de2649812 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/errors.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/errors.go @@ -10,6 +10,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/errors" ) +// Common errors. var ( ErrNotFound = errors.ErrNotFound ErrReadOnly = errors.New("leveldb: read-only mode") diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/errors/errors.go b/vendor/github.com/syndtr/goleveldb/leveldb/errors/errors.go index 9a0f6e2c1..8d6146b6f 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/errors/errors.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/errors/errors.go @@ -15,6 +15,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) +// Common errors. var ( ErrNotFound = New("leveldb: not found") ErrReleased = util.ErrReleased @@ -34,11 +35,10 @@ type ErrCorrupted struct { } func (e *ErrCorrupted) Error() string { - if !e.Fd.Nil() { + if !e.Fd.Zero() { return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd) - } else { - return e.Err.Error() } + return e.Err.Error() } // NewErrCorrupted creates new ErrCorrupted error. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/external_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/external_test.go index 4bc3a9104..669d77f36 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/external_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/external_test.go @@ -32,12 +32,12 @@ var _ = testutil.Defer(func() { db := newTestingDB(o, nil, nil) t := testutil.DBTesting{ DB: db, - Deleted: testutil.KeyValue_Generate(nil, 500, 1, 50, 5, 5).Clone(), + Deleted: testutil.KeyValue_Generate(nil, 500, 1, 1, 50, 5, 5).Clone(), } testutil.DoDBTesting(&t) db.TestClose() done <- true - }, 20.0) + }, 80.0) }) Describe("read test", func() { @@ -66,7 +66,7 @@ var _ = testutil.Defer(func() { Expect(err).NotTo(HaveOccurred()) t0 := &testutil.DBTesting{ DB: tr, - Deleted: testutil.KeyValue_Generate(nil, 200, 1, 50, 5, 5).Clone(), + Deleted: testutil.KeyValue_Generate(nil, 200, 1, 1, 50, 5, 5).Clone(), } testutil.DoDBTesting(t0) testutil.TestGet(tr, t0.Present) @@ -111,7 +111,7 @@ var _ = testutil.Defer(func() { db.TestClose() done <- true - }, 30.0) + }, 240.0) }) }) }) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/array_iter_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/array_iter_test.go index 1ed6d07cb..f16d0144e 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/array_iter_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/array_iter_test.go @@ -17,7 +17,7 @@ var _ = testutil.Defer(func() { Describe("Array iterator", func() { It("Should iterates and seeks correctly", func() { // Build key/value. - kv := testutil.KeyValue_Generate(nil, 70, 1, 5, 3, 3) + kv := testutil.KeyValue_Generate(nil, 70, 1, 1, 5, 3, 3) // Test the iterator. t := testutil.IteratorTesting{ diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/indexed_iter_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/indexed_iter_test.go index 72a797892..fde80167a 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/indexed_iter_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/indexed_iter_test.go @@ -52,7 +52,7 @@ var _ = testutil.Defer(func() { for _, x := range n { sum += x } - kv := testutil.KeyValue_Generate(nil, sum, 1, 10, 4, 4) + kv := testutil.KeyValue_Generate(nil, sum, 1, 1, 10, 4, 4) for i, j := 0, 0; i < len(n); i++ { for x := n[i]; x > 0; x-- { key, value := kv.Index(j) @@ -69,7 +69,7 @@ var _ = testutil.Defer(func() { } testutil.DoIteratorTesting(&t) done <- true - }, 1.5) + }, 15.0) } } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/iter.go b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/iter.go index c2522860b..3b5553274 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/iter.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/iter.go @@ -21,13 +21,13 @@ var ( // IteratorSeeker is the interface that wraps the 'seeks method'. type IteratorSeeker interface { // First moves the iterator to the first key/value pair. If the iterator - // only contains one key/value pair then First and Last whould moves + // only contains one key/value pair then First and Last would moves // to the same key/value pair. // It returns whether such pair exist. First() bool // Last moves the iterator to the last key/value pair. If the iterator - // only contains one key/value pair then First and Last whould moves + // only contains one key/value pair then First and Last would moves // to the same key/value pair. // It returns whether such pair exist. Last() bool @@ -48,7 +48,7 @@ type IteratorSeeker interface { Prev() bool } -// CommonIterator is the interface that wraps common interator methods. +// CommonIterator is the interface that wraps common iterator methods. type CommonIterator interface { IteratorSeeker @@ -71,14 +71,15 @@ type CommonIterator interface { // Iterator iterates over a DB's key/value pairs in key order. // -// When encouter an error any 'seeks method' will return false and will +// When encounter an error any 'seeks method' will return false and will // yield no key/value pairs. The error can be queried by calling the Error // method. Calling Release is still necessary. // // An iterator must be released after use, but it is not necessary to read // an iterator until exhaustion. -// Also, an iterator is not necessarily goroutine-safe, but it is safe to use -// multiple iterators concurrently, with each in a dedicated goroutine. +// Also, an iterator is not necessarily safe for concurrent use, but it is +// safe to use multiple iterators concurrently, with each in a dedicated +// goroutine. type Iterator interface { CommonIterator @@ -98,7 +99,7 @@ type Iterator interface { // // ErrorCallbackSetter implemented by indexed and merged iterator. type ErrorCallbackSetter interface { - // SetErrorCallback allows set an error callback of the coresponding + // SetErrorCallback allows set an error callback of the corresponding // iterator. Use nil to clear the callback. SetErrorCallback(f func(err error)) } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/merged_iter_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/merged_iter_test.go index e523b63e4..ee40881ea 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/merged_iter_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/merged_iter_test.go @@ -24,7 +24,7 @@ var _ = testutil.Defer(func() { // Build key/value. filledKV := make([]testutil.KeyValue, filled) - kv := testutil.KeyValue_Generate(nil, 100, 1, 10, 4, 4) + kv := testutil.KeyValue_Generate(nil, 100, 1, 1, 10, 4, 4) kv.Iterate(func(i int, key, value []byte) { filledKV[rnd.Intn(filled)].Put(key, value) }) @@ -49,7 +49,7 @@ var _ = testutil.Defer(func() { } testutil.DoIteratorTesting(&t) done <- true - }, 1.5) + }, 15.0) } } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go b/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go index 891098bb7..d094c3d0f 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go @@ -180,34 +180,37 @@ func (r *Reader) nextChunk(first bool) error { checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4]) length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6]) chunkType := r.buf[r.j+6] - + unprocBlock := r.n - r.j if checksum == 0 && length == 0 && chunkType == 0 { // Drop entire block. - m := r.n - r.j r.i = r.n r.j = r.n - return r.corrupt(m, "zero header", false) - } else { - m := r.n - r.j - r.i = r.j + headerSize - r.j = r.j + headerSize + int(length) - if r.j > r.n { - // Drop entire block. - r.i = r.n - r.j = r.n - return r.corrupt(m, "chunk length overflows block", false) - } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() { - // Drop entire block. - r.i = r.n - r.j = r.n - return r.corrupt(m, "checksum mismatch", false) - } + return r.corrupt(unprocBlock, "zero header", false) + } + if chunkType < fullChunkType || chunkType > lastChunkType { + // Drop entire block. + r.i = r.n + r.j = r.n + return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false) + } + r.i = r.j + headerSize + r.j = r.j + headerSize + int(length) + if r.j > r.n { + // Drop entire block. + r.i = r.n + r.j = r.n + return r.corrupt(unprocBlock, "chunk length overflows block", false) + } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() { + // Drop entire block. + r.i = r.n + r.j = r.n + return r.corrupt(unprocBlock, "checksum mismatch", false) } if first && chunkType != fullChunkType && chunkType != firstChunkType { - m := r.j - r.i + chunkLength := (r.j - r.i) + headerSize r.i = r.j // Report the error, but skip it. - return r.corrupt(m+headerSize, "orphan chunk", true) + return r.corrupt(chunkLength, "orphan chunk", true) } r.last = chunkType == fullChunkType || chunkType == lastChunkType return nil diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/key.go b/vendor/github.com/syndtr/goleveldb/leveldb/key.go index d0b80aaf9..ad8f51ec8 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/key.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/key.go @@ -37,14 +37,14 @@ func (kt keyType) String() string { case keyTypeVal: return "v" } - return "x" + return fmt.Sprintf("", uint(kt)) } // Value types encoded as the last component of internal keys. // Don't modify; this value are saved to disk. const ( - keyTypeDel keyType = iota - keyTypeVal + keyTypeDel = keyType(0) + keyTypeVal = keyType(1) ) // keyTypeSeek defines the keyType that should be passed when constructing an @@ -79,11 +79,7 @@ func makeInternalKey(dst, ukey []byte, seq uint64, kt keyType) internalKey { panic("leveldb: invalid type") } - if n := len(ukey) + 8; cap(dst) < n { - dst = make([]byte, n) - } else { - dst = dst[:n] - } + dst = ensureBuffer(dst, len(ukey)+8) copy(dst, ukey) binary.LittleEndian.PutUint64(dst[len(ukey):], (seq<<8)|uint64(kt)) return internalKey(dst) @@ -143,5 +139,5 @@ func (ik internalKey) String() string { if ukey, seq, kt, err := parseInternalKey(ik); err == nil { return fmt.Sprintf("%s,%s%d", shorten(string(ukey)), kt, seq) } - return "" + return fmt.Sprintf("", []byte(ik)) } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go b/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go index 1395bd928..18a19ed42 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go @@ -17,6 +17,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) +// Common errors. var ( ErrNotFound = errors.ErrNotFound ErrIterReleased = errors.New("leveldb/memdb: iterator released") @@ -385,7 +386,7 @@ func (p *DB) Find(key []byte) (rkey, value []byte, err error) { } // NewIterator returns an iterator of the DB. -// The returned iterator is not goroutine-safe, but it is safe to use +// The returned iterator is not safe for concurrent use, but it is safe to use // multiple iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently with modifying its // underlying DB. However, the resultant key/value pairs are not guaranteed @@ -411,7 +412,7 @@ func (p *DB) Capacity() int { } // Size returns sum of keys and values length. Note that deleted -// key/value will not be accouted for, but it will still consume +// key/value will not be accounted for, but it will still consume // the buffer, since the buffer is append only. func (p *DB) Size() int { p.mu.RLock() @@ -453,11 +454,14 @@ func (p *DB) Reset() { p.mu.Unlock() } -// New creates a new initalized in-memory key/value DB. The capacity +// New creates a new initialized in-memory key/value DB. The capacity // is the initial key/value buffer capacity. The capacity is advisory, // not enforced. // -// The returned DB instance is goroutine-safe. +// This DB is append-only, deleting an entry would remove entry node but not +// reclaim KV buffer. +// +// The returned DB instance is safe for concurrent use. func New(cmp comparer.BasicComparer, capacity int) *DB { p := &DB{ cmp: cmp, diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb_test.go index 5dd6dbc7b..3f0a31e4e 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb_test.go @@ -73,7 +73,7 @@ var _ = testutil.Defer(func() { db := New(comparer.DefaultComparer, 0) t := testutil.DBTesting{ DB: db, - Deleted: testutil.KeyValue_Generate(nil, 1000, 1, 30, 5, 5).Clone(), + Deleted: testutil.KeyValue_Generate(nil, 1000, 1, 1, 30, 5, 5).Clone(), PostFn: func(t *testutil.DBTesting) { Expect(db.Len()).Should(Equal(t.Present.Len())) Expect(db.Size()).Should(Equal(t.Present.Size())) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go index 3d2bf1c02..44e7d9adc 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go @@ -312,6 +312,11 @@ type Options struct { // The default is false. NoSync bool + // NoWriteMerge allows disabling write merge. + // + // The default is false. + NoWriteMerge bool + // OpenFilesCacher provides cache algorithm for open files caching. // Specify NoCacher to disable caching algorithm. // @@ -543,6 +548,13 @@ func (o *Options) GetNoSync() bool { return o.NoSync } +func (o *Options) GetNoWriteMerge() bool { + if o == nil { + return false + } + return o.NoWriteMerge +} + func (o *Options) GetOpenFilesCacher() Cacher { if o == nil || o.OpenFilesCacher == nil { return DefaultOpenFilesCacher @@ -629,6 +641,11 @@ func (ro *ReadOptions) GetStrict(strict Strict) bool { // WriteOptions holds the optional parameters for 'write operation'. The // 'write operation' includes Write, Put and Delete. type WriteOptions struct { + // NoWriteMerge allows disabling write merge. + // + // The default is false. + NoWriteMerge bool + // Sync is whether to sync underlying writes from the OS buffer cache // through to actual disk, if applicable. Setting Sync can result in // slower writes. @@ -644,6 +661,13 @@ type WriteOptions struct { Sync bool } +func (wo *WriteOptions) GetNoWriteMerge() bool { + if wo == nil { + return false + } + return wo.NoWriteMerge +} + func (wo *WriteOptions) GetSync() bool { if wo == nil { return false diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session.go b/vendor/github.com/syndtr/goleveldb/leveldb/session.go index c9679b6b5..ad68a8703 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session.go @@ -18,7 +18,8 @@ import ( "github.com/syndtr/goleveldb/leveldb/storage" ) -// ErrManifestCorrupted records manifest corruption. +// ErrManifestCorrupted records manifest corruption. This error will be +// wrapped with errors.ErrCorrupted. type ErrManifestCorrupted struct { Field string Reason string @@ -42,10 +43,11 @@ type session struct { stSeqNum uint64 // last mem compacted seq; need external synchronization stor storage.Storage - storLock storage.Lock + storLock storage.Locker o *cachedOptions icmp *iComparer tops *tOps + fileRef map[int64]int manifest *journal.Writer manifestWriter storage.Writer @@ -68,6 +70,7 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { s = &session{ stor: stor, storLock: storLock, + fileRef: make(map[int64]int), } s.setOptions(o) s.tops = newTableOps(s) @@ -92,7 +95,7 @@ func (s *session) close() { // Release session lock. func (s *session) release() { - s.storLock.Release() + s.storLock.Unlock() } // Create a new database session; need external synchronization. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go index 674182fb2..92328933c 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go @@ -39,6 +39,18 @@ func (s *session) newTemp() storage.FileDesc { return storage.FileDesc{storage.TypeTemp, num} } +func (s *session) addFileRef(fd storage.FileDesc, ref int) int { + ref += s.fileRef[fd.Num] + if ref > 0 { + s.fileRef[fd.Num] = ref + } else if ref == 0 { + delete(s.fileRef, fd.Num) + } else { + panic(fmt.Sprintf("negative ref: %v", fd)) + } + return ref +} + // Session state. // Get current version. This will incr version ref, must call @@ -46,21 +58,28 @@ func (s *session) newTemp() storage.FileDesc { func (s *session) version() *version { s.vmu.Lock() defer s.vmu.Unlock() - s.stVersion.ref++ + s.stVersion.incref() return s.stVersion } +func (s *session) tLen(level int) int { + s.vmu.Lock() + defer s.vmu.Unlock() + return s.stVersion.tLen(level) +} + // Set current version to v. func (s *session) setVersion(v *version) { s.vmu.Lock() - v.ref = 1 // Holds by session. - if old := s.stVersion; old != nil { - v.ref++ // Holds by old version. - old.next = v - old.releaseNB() + defer s.vmu.Unlock() + // Hold by session. It is important to call this first before releasing + // current version, otherwise the still used files might get released. + v.incref() + if s.stVersion != nil { + // Release current version. + s.stVersion.releaseNB() } s.stVersion = v - s.vmu.Unlock() } // Get current unused file number. @@ -197,7 +216,7 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { if s.manifestWriter != nil { s.manifestWriter.Close() } - if !s.manifestFd.Nil() { + if !s.manifestFd.Zero() { s.stor.Remove(s.manifestFd) } s.manifestFd = fd diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go index cbe1dc103..e53434cab 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go @@ -32,7 +32,7 @@ type fileStorageLock struct { fs *fileStorage } -func (lock *fileStorageLock) Release() { +func (lock *fileStorageLock) Unlock() { if lock.fs != nil { lock.fs.mu.Lock() defer lock.fs.mu.Unlock() @@ -116,7 +116,7 @@ func OpenFile(path string, readOnly bool) (Storage, error) { return fs, nil } -func (fs *fileStorage) Lock() (Lock, error) { +func (fs *fileStorage) Lock() (Locker, error) { fs.mu.Lock() defer fs.mu.Unlock() if fs.open < 0 { @@ -323,7 +323,7 @@ func (fs *fileStorage) GetMeta() (fd FileDesc, err error) { } } // Don't remove any files if there is no valid CURRENT file. - if fd.Nil() { + if fd.Zero() { if cerr != nil { err = cerr } else { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_test.go index 7a77f2891..28a59ecc9 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_test.go @@ -126,7 +126,7 @@ func TestFileStorage_Locking(t *testing.T) { } else { t.Logf("storage lock got error: %s (expected)", err) } - l.Release() + l.Unlock() _, err = p3.Lock() if err != nil { t.Fatal("storage lock failed(2): ", err) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go index 9b70e1513..9b0421f03 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go @@ -18,7 +18,7 @@ type memStorageLock struct { ms *memStorage } -func (lock *memStorageLock) Release() { +func (lock *memStorageLock) Unlock() { ms := lock.ms ms.mu.Lock() defer ms.mu.Unlock() @@ -43,7 +43,7 @@ func NewMemStorage() Storage { } } -func (ms *memStorage) Lock() (Lock, error) { +func (ms *memStorage) Lock() (Locker, error) { ms.mu.Lock() defer ms.mu.Unlock() if ms.slock != nil { @@ -69,7 +69,7 @@ func (ms *memStorage) SetMeta(fd FileDesc) error { func (ms *memStorage) GetMeta() (FileDesc, error) { ms.mu.Lock() defer ms.mu.Unlock() - if ms.meta.Nil() { + if ms.meta.Zero() { return FileDesc{}, os.ErrNotExist } return ms.meta, nil @@ -78,7 +78,7 @@ func (ms *memStorage) GetMeta() (FileDesc, error) { func (ms *memStorage) List(ft FileType) ([]FileDesc, error) { ms.mu.Lock() var fds []FileDesc - for x, _ := range ms.files { + for x := range ms.files { fd := unpackFile(x) if fd.Type&ft != 0 { fds = append(fds, fd) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage_test.go index 729507586..fb03d301a 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage_test.go @@ -24,7 +24,7 @@ func TestMemStorage(t *testing.T) { } else { t.Logf("storage lock got error: %s (expected)", err) } - l.Release() + l.Unlock() _, err = m.Lock() if err != nil { t.Fatal("storage lock failed(2): ", err) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go index 9b30b6727..c16bce6b6 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go @@ -11,12 +11,12 @@ import ( "errors" "fmt" "io" - - "github.com/syndtr/goleveldb/leveldb/util" ) +// FileType represent a file type. type FileType int +// File types. const ( TypeManifest FileType = 1 << iota TypeJournal @@ -40,6 +40,7 @@ func (t FileType) String() string { return fmt.Sprintf("", t) } +// Common error. var ( ErrInvalidFile = errors.New("leveldb/storage: invalid file for argument") ErrLocked = errors.New("leveldb/storage: already locked") @@ -55,11 +56,10 @@ type ErrCorrupted struct { } func (e *ErrCorrupted) Error() string { - if !e.Fd.Nil() { + if !e.Fd.Zero() { return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd) - } else { - return e.Err.Error() } + return e.Err.Error() } // Syncer is the interface that wraps basic Sync method. @@ -83,11 +83,12 @@ type Writer interface { Syncer } -type Lock interface { - util.Releaser +// Locker is the interface that wraps Unlock method. +type Locker interface { + Unlock() } -// FileDesc is a file descriptor. +// FileDesc is a 'file descriptor'. type FileDesc struct { Type FileType Num int64 @@ -108,12 +109,12 @@ func (fd FileDesc) String() string { } } -// Nil returns true if fd == (FileDesc{}). -func (fd FileDesc) Nil() bool { +// Zero returns true if fd == (FileDesc{}). +func (fd FileDesc) Zero() bool { return fd == (FileDesc{}) } -// FileDescOk returns true if fd is a valid file descriptor. +// FileDescOk returns true if fd is a valid 'file descriptor'. func FileDescOk(fd FileDesc) bool { switch fd.Type { case TypeManifest: @@ -126,43 +127,44 @@ func FileDescOk(fd FileDesc) bool { return fd.Num >= 0 } -// Storage is the storage. A storage instance must be goroutine-safe. +// Storage is the storage. A storage instance must be safe for concurrent use. type Storage interface { // Lock locks the storage. Any subsequent attempt to call Lock will fail // until the last lock released. - // After use the caller should call the Release method. - Lock() (Lock, error) + // Caller should call Unlock method after use. + Lock() (Locker, error) // Log logs a string. This is used for logging. // An implementation may write to a file, stdout or simply do nothing. Log(str string) - // SetMeta sets to point to the given fd, which then can be acquired using - // GetMeta method. - // SetMeta should be implemented in such way that changes should happened + // SetMeta store 'file descriptor' that can later be acquired using GetMeta + // method. The 'file descriptor' should point to a valid file. + // SetMeta should be implemented in such way that changes should happen // atomically. SetMeta(fd FileDesc) error - // GetManifest returns a manifest file. - // Returns os.ErrNotExist if meta doesn't point to any fd, or point to fd - // that doesn't exist. + // GetMeta returns 'file descriptor' stored in meta. The 'file descriptor' + // can be updated using SetMeta method. + // Returns os.ErrNotExist if meta doesn't store any 'file descriptor', or + // 'file descriptor' point to nonexistent file. GetMeta() (FileDesc, error) - // List returns fds that match the given file types. + // List returns file descriptors that match the given file types. // The file types may be OR'ed together. List(ft FileType) ([]FileDesc, error) - // Open opens file with the given fd read-only. + // Open opens file with the given 'file descriptor' read-only. // Returns os.ErrNotExist error if the file does not exist. // Returns ErrClosed if the underlying storage is closed. Open(fd FileDesc) (Reader, error) - // Create creates file with the given fd, truncate if already exist and - // opens write-only. + // Create creates file with the given 'file descriptor', truncate if already + // exist and opens write-only. // Returns ErrClosed if the underlying storage is closed. Create(fd FileDesc) (Writer, error) - // Remove removes file with the given fd. + // Remove removes file with the given 'file descriptor'. // Returns ErrClosed if the underlying storage is closed. Remove(fd FileDesc) error diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go b/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go index 1edf4e2ba..c5be420b3 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -26,12 +26,15 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) +// Reader errors. var ( ErrNotFound = errors.ErrNotFound ErrReaderReleased = errors.New("leveldb/table: reader released") ErrIterReleased = errors.New("leveldb/table: iterator released") ) +// ErrCorrupted describes error due to corruption. This error will be wrapped +// with errors.ErrCorrupted. type ErrCorrupted struct { Pos int64 Size int64 @@ -61,7 +64,7 @@ type block struct { func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) { index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) - offset += 1 // shared always zero, since this is a restart point + offset++ // shared always zero, since this is a restart point v1, n1 := binary.Uvarint(b.data[offset:]) // key length _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length m := offset + n1 + n2 @@ -356,7 +359,7 @@ func (i *blockIter) Prev() bool { i.value = nil offset := i.block.restartOffset(ri) if offset == i.offset { - ri -= 1 + ri-- if ri < 0 { i.dir = dirSOI return false @@ -783,8 +786,8 @@ func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChe // table. And a nil Range.Limit is treated as a key after all keys in // the table. // -// The returned iterator is not goroutine-safe and should be released -// when not used. +// The returned iterator is not safe for concurrent use and should be released +// after use. // // Also read Iterator documentation of the leveldb/iterator package. func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { @@ -826,18 +829,21 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo index := r.newBlockIter(indexBlock, nil, nil, true) defer index.Release() + if !index.Seek(key) { - err = index.Error() - if err == nil { + if err = index.Error(); err == nil { err = ErrNotFound } return } + dataBH, n := decodeBlockHandle(index.Value()) if n == 0 { r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") - return + return nil, nil, r.err } + + // The filter should only used for exact match. if filtered && r.filter != nil { filterBlock, frel, ferr := r.getFilterBlock(true) if ferr == nil { @@ -847,30 +853,53 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo } frel.Release() } else if !errors.IsCorrupted(ferr) { - err = ferr + return nil, nil, ferr + } + } + + data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache()) + if !data.Seek(key) { + data.Release() + if err = data.Error(); err != nil { + return + } + + // The nearest greater-than key is the first key of the next block. + if !index.Next() { + if err = index.Error(); err == nil { + err = ErrNotFound + } + return + } + + dataBH, n = decodeBlockHandle(index.Value()) + if n == 0 { + r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") + return nil, nil, r.err + } + + data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache()) + if !data.Next() { + data.Release() + if err = data.Error(); err == nil { + err = ErrNotFound + } return } } - data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache()) - defer data.Release() - if !data.Seek(key) { - err = data.Error() - if err == nil { - err = ErrNotFound - } - return - } - // Don't use block buffer, no need to copy the buffer. + + // Key doesn't use block buffer, no need to copy the buffer. rkey = data.Key() if !noValue { if r.bpool == nil { value = data.Value() } else { - // Use block buffer, and since the buffer will be recycled, the buffer - // need to be copied. + // Value does use block buffer, and since the buffer will be + // recycled, it need to be copied. value = append([]byte{}, data.Value()...) } } + data.Release() return } @@ -987,7 +1016,7 @@ func (r *Reader) Release() { // NewReader creates a new initialized table reader for the file. // The fi, cache and bpool is optional and can be nil. // -// The returned table reader instance is goroutine-safe. +// The returned table reader instance is safe for concurrent use. func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) { if f == nil { return nil, errors.New("leveldb/table: nil file") @@ -1039,9 +1068,8 @@ func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.Name if errors.IsCorrupted(err) { r.err = err return r, nil - } else { - return nil, err } + return nil, err } // Set data end. @@ -1086,9 +1114,8 @@ func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.Name if errors.IsCorrupted(err) { r.err = err return r, nil - } else { - return nil, err } + return nil, err } if r.filter != nil { r.filterBlock, err = r.readFilterBlock(r.filterBH) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table/table_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/table/table_test.go index 1bc73ed82..232efcdfc 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table/table_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table/table_test.go @@ -111,7 +111,7 @@ var _ = testutil.Defer(func() { } testutil.AllKeyValueTesting(nil, Build, nil, nil) - Describe("with one key per block", Test(testutil.KeyValue_Generate(nil, 9, 1, 10, 512, 512), func(r *Reader) { + Describe("with one key per block", Test(testutil.KeyValue_Generate(nil, 9, 1, 1, 10, 512, 512), func(r *Reader) { It("should have correct blocks number", func() { indexBlock, err := r.readBlock(r.indexBH, true) Expect(err).To(BeNil()) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go b/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go index 274dee6da..b96b271d8 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go @@ -349,7 +349,7 @@ func (w *Writer) Close() error { // NewWriter creates a new initialized table writer for the file. // -// Table writer is not goroutine-safe. +// Table writer is not safe for concurrent use. func NewWriter(f io.Writer, o *opt.Options) *Writer { w := &Writer{ writer: f, diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/testutil/kv.go b/vendor/github.com/syndtr/goleveldb/leveldb/testutil/kv.go index 471d5708c..608cbf372 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/testutil/kv.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/testutil/kv.go @@ -292,7 +292,7 @@ func KeyValue_MultipleKeyValue() *KeyValue { var keymap = []byte("012345678ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy") -func KeyValue_Generate(rnd *rand.Rand, n, minlen, maxlen, vminlen, vmaxlen int) *KeyValue { +func KeyValue_Generate(rnd *rand.Rand, n, incr, minlen, maxlen, vminlen, vmaxlen int) *KeyValue { if rnd == nil { rnd = NewRand() } @@ -308,7 +308,7 @@ func KeyValue_Generate(rnd *rand.Rand, n, minlen, maxlen, vminlen, vmaxlen int) } kv := &KeyValue{} - endC := byte(len(keymap) - 1) + endC := byte(len(keymap) - incr) gen := make([]byte, 0, maxlen) for i := 0; i < n; i++ { m := rrand(minlen, maxlen) @@ -325,8 +325,8 @@ func KeyValue_Generate(rnd *rand.Rand, n, minlen, maxlen, vminlen, vmaxlen int) if c == endC { continue } - gen[j] = c + 1 - for j += 1; j < m; j++ { + gen[j] = c + byte(incr) + for j++; j < m; j++ { gen[j] = 0 } goto ok diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/testutil/kvtest.go b/vendor/github.com/syndtr/goleveldb/leveldb/testutil/kvtest.go index 19d29dfc4..f7563dc89 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/testutil/kvtest.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/testutil/kvtest.go @@ -23,15 +23,15 @@ func TestFind(db Find, kv KeyValue) { // Using exact key. rkey, rvalue, err := db.TestFind(key) - Expect(err).ShouldNot(HaveOccurred(), "Error for key %q", key) + Expect(err).ShouldNot(HaveOccurred(), "Error for exact key %q", key) Expect(rkey).Should(Equal(key), "Key") - Expect(rvalue).Should(Equal(value), "Value for key %q", key) + Expect(rvalue).Should(Equal(value), "Value for exact key %q", key) // Using inexact key. rkey, rvalue, err = db.TestFind(key_) - Expect(err).ShouldNot(HaveOccurred(), "Error for key %q (%q)", key_, key) - Expect(rkey).Should(Equal(key)) - Expect(rvalue).Should(Equal(value), "Value for key %q (%q)", key_, key) + Expect(err).ShouldNot(HaveOccurred(), "Error for inexact key %q (%q)", key_, key) + Expect(rkey).Should(Equal(key), "Key for inexact key %q (%q)", key_, key) + Expect(rvalue).Should(Equal(value), "Value for inexact key %q (%q)", key_, key) }) } @@ -140,7 +140,7 @@ func KeyValueTesting(rnd *rand.Rand, kv KeyValue, p DB, setup func(KeyValue) DB, TestIter(db, nil, kv.Clone()) } done <- true - }, 3.0) + }, 30.0) It("Should iterates and seeks slice correctly", func(done Done) { if db, ok := p.(NewIterator); ok { @@ -162,7 +162,7 @@ func KeyValueTesting(rnd *rand.Rand, kv KeyValue, p DB, setup func(KeyValue) DB, }) } done <- true - }, 50.0) + }, 200.0) It("Should iterates and seeks slice correctly", func(done Done) { if db, ok := p.(NewIterator); ok { @@ -174,7 +174,7 @@ func KeyValueTesting(rnd *rand.Rand, kv KeyValue, p DB, setup func(KeyValue) DB, }) } done <- true - }, 50.0) + }, 200.0) } func AllKeyValueTesting(rnd *rand.Rand, body, setup func(KeyValue) DB, teardown func(DB)) { @@ -207,5 +207,6 @@ func AllKeyValueTesting(rnd *rand.Rand, body, setup func(KeyValue) DB, teardown Describe("with big value", Test(KeyValue_BigValue())) Describe("with special key", Test(KeyValue_SpecialKey())) Describe("with multiple key/value", Test(KeyValue_MultipleKeyValue())) - Describe("with generated key/value", Test(KeyValue_Generate(nil, 120, 1, 50, 10, 120))) + Describe("with generated key/value 2-incr", Test(KeyValue_Generate(nil, 120, 2, 1, 50, 10, 120))) + Describe("with generated key/value 3-incr", Test(KeyValue_Generate(nil, 120, 3, 1, 50, 10, 120))) } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/testutil/storage.go b/vendor/github.com/syndtr/goleveldb/leveldb/testutil/storage.go index 1d9163ea4..581daf314 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/testutil/storage.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/testutil/storage.go @@ -20,7 +20,6 @@ import ( . "github.com/onsi/gomega" "github.com/syndtr/goleveldb/leveldb/storage" - "github.com/syndtr/goleveldb/leveldb/util" ) var ( @@ -161,11 +160,11 @@ func (err emulatedError) Error() string { type storageLock struct { s *Storage - r util.Releaser + l storage.Locker } -func (l storageLock) Release() { - l.r.Release() +func (l storageLock) Unlock() { + l.l.Unlock() l.s.logI("storage lock released") } @@ -332,7 +331,7 @@ func (s *Storage) Log(str string) { s.Storage.Log(str) } -func (s *Storage) Lock() (l storage.Lock, err error) { +func (s *Storage) Lock() (l storage.Locker, err error) { l, err = s.Storage.Lock() if err != nil { s.logI("storage locking failed, err=%v", err) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/util.go b/vendor/github.com/syndtr/goleveldb/leveldb/util.go index 3b663d1cc..e572a329e 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/util.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/util.go @@ -89,3 +89,10 @@ func (p fdSorter) Swap(i, j int) { func sortFds(fds []storage.FileDesc) { sort.Sort(fdSorter(fds)) } + +func ensureBuffer(b []byte, n int) []byte { + if cap(b) < n { + return make([]byte, n) + } + return b[:n] +} diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/version.go b/vendor/github.com/syndtr/goleveldb/leveldb/version.go index c60f12c20..73f272af5 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/version.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/version.go @@ -34,44 +34,48 @@ type version struct { cSeek unsafe.Pointer - closing bool - ref int - // Succeeding version. - next *version + closing bool + ref int + released bool } func newVersion(s *session) *version { return &version{s: s} } +func (v *version) incref() { + if v.released { + panic("already released") + } + + v.ref++ + if v.ref == 1 { + // Incr file ref. + for _, tt := range v.levels { + for _, t := range tt { + v.s.addFileRef(t.fd, 1) + } + } + } +} + func (v *version) releaseNB() { v.ref-- if v.ref > 0 { return - } - if v.ref < 0 { + } else if v.ref < 0 { panic("negative version ref") } - nextTables := make(map[int64]bool) - for _, tt := range v.next.levels { - for _, t := range tt { - num := t.fd.Num - nextTables[num] = true - } - } - for _, tt := range v.levels { for _, t := range tt { - num := t.fd.Num - if _, ok := nextTables[num]; !ok { + if v.s.addFileRef(t.fd, -1) == 0 { v.s.tops.remove(t) } } } - v.next.releaseNB() - v.next = nil + v.released = true } func (v *version) release() { diff --git a/vendor/github.com/syndtr/goleveldb/manualtest/dbstress/main.go b/vendor/github.com/syndtr/goleveldb/manualtest/dbstress/main.go index cf8466e9c..2886475ff 100644 --- a/vendor/github.com/syndtr/goleveldb/manualtest/dbstress/main.go +++ b/vendor/github.com/syndtr/goleveldb/manualtest/dbstress/main.go @@ -331,7 +331,7 @@ func main() { log.Printf("FATAL: "+format, v...) if err != nil && errors.IsCorrupted(err) { cerr := err.(*errors.ErrCorrupted) - if !cerr.Fd.Nil() && cerr.Fd.Type == storage.TypeTable { + if !cerr.Fd.Zero() && cerr.Fd.Type == storage.TypeTable { log.Print("FATAL: corruption detected, scanning...") if !tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false) { log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.Fd) diff --git a/vendor/manifest b/vendor/manifest index 5ccb5b696..ca62695bd 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -261,8 +261,8 @@ { "importpath": "github.com/syndtr/goleveldb", "repository": "https://github.com/syndtr/goleveldb", - "vcs": "", - "revision": "6ae1797c0b42b9323fc27ff7dcf568df88f2f33d", + "vcs": "git", + "revision": "23851d93a2292dcc56e71a18ec9e0624d84a0f65", "branch": "master" }, {