入口
写的入口是update,通过key/value对写入值
1
2
3
4
err := db.Update(func(txn *badger.Txn) error {
err := txn.Set([]byte("answer"), []byte("42"))
return err
})
也可以有线new一个Entry,再通过txn.SetEntry
1
2
3
4
5
err := db.Update(func(txn *badger.Txn) error {
e := badger.NewEntry([]byte("answer"), []byte("42"))
err := txn.SetEntry(e)
return err
})
这两其实一样的
1
2
3
4
5
6
7
func (txn *Txn) Set(key, val []byte) error {
return txn.SetEntry(NewEntry(key, val))
}
func (txn *Txn) SetEntry(e *Entry) error {
return txn.modify(e)
}
注意看db.Update
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (db *DB) Update(fn func(txn *Txn) error) error {
if db.IsClosed() {
return ErrDBClosed
}
if db.opt.managedTxns {
panic("Update can only be used with managedDB=false.")
}
txn := db.NewTransaction(true)
defer txn.Discard()
//fn为上边的func,即set(SetEntry/Set)操作,当然也可以是其他的如:Txn.Delete()
if err := fn(txn); err != nil {
return err
}
return txn.Commit()
}
txn.modify(e) 主要一堆校验,直接丢给pendingWrites缓存了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (txn *Txn) modify(e *Entry) error {
switch {
case !txn.update:
return ErrReadOnlyTxn
case txn.discarded:
return ErrDiscardedTxn
case len(e.Key) == 0:
return ErrEmptyKey
case bytes.HasPrefix(e.Key, badgerPrefix):
return ErrInvalidKey
case len(e.Key) > maxKeySize:
// Key length can't be more than uint16, as determined by table::header. To
// keep things safe and allow badger move prefix and a timestamp suffix, let's
// cut it down to 65000, instead of using 65536.
return exceedsSize("Key", maxKeySize, e.Key)
case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize:
return exceedsSize("Value", txn.db.opt.ValueLogFileSize, e.Value)
case txn.db.opt.InMemory && int64(len(e.Value)) > txn.db.valueThreshold():
return exceedsSize("Value", txn.db.valueThreshold(), e.Value)
}
if err := txn.db.isBanned(e.Key); err != nil {
return err
}
if err := txn.checkSize(e); err != nil {
return err
}
// The txn.conflictKeys is used for conflict detection. If conflict detection
// is disabled, we don't need to store key hashes in this map.
if txn.db.opt.DetectConflicts {
fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
txn.conflictKeys[fp] = struct{}{}
}
// If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
// Add the entry to duplicateWrites only if both the entries have different versions. For
// same versions, we will overwrite the existing entry.
if oldEntry, ok := txn.pendingWrites[string(e.Key)]; ok && oldEntry.version != e.version {
txn.duplicateWrites = append(txn.duplicateWrites, oldEntry)
}
txn.pendingWrites[string(e.Key)] = e//pendingWrites缓存
return nil
}
所以变更应该txn.Commit()里边
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (txn *Txn) Commit() error {
// txn.conflictKeys can be zero if conflict detection is turned off. So we
// should check txn.pendingWrites.
if len(txn.pendingWrites) == 0 {
return nil // Nothing to do.
}
// Precheck before discarding txn.
if err := txn.commitPrecheck(); err != nil {
return err
}
defer txn.Discard()
txnCb, err := txn.commitAndSend()//send 这里golang,肯定是channel了
if err != nil {
return err
}
// If batchSet failed, LSM would not have been updated. So, no need to rollback anything.
// TODO: What if some of the txns successfully make it to value log, but others fail.
// Nothing gets updated to LSM, until a restart happens.
return txnCb()
}
跟进去发现这个也没有直接写文件的操作,这个是典型的go channel处理方式,这个函数主要处理冲突检测,设置版本号以及entry元数据等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
func (txn *Txn) commitAndSend() (func() error, error) {
orc := txn.db.orc
// Ensure that the order in which we get the commit timestamp is the same as
// the order in which we push these updates to the write channel. So, we
// acquire a writeChLock before getting a commit timestamp, and only release
// it after pushing the entries to it.
orc.writeChLock.Lock()
defer orc.writeChLock.Unlock()
//冲突检测
commitTs, conflict := orc.newCommitTs(txn)
if conflict {
return nil, ErrConflict
}
keepTogether := true
setVersion := func(e *Entry) {
if e.version == 0 {
e.version = commitTs
} else {
keepTogether = false
}
}
//设置version
for _, e := range txn.pendingWrites {
setVersion(e)
}
// The duplicateWrites slice will be non-empty only if there are duplicate
// entries with different versions.
for _, e := range txn.duplicateWrites {
setVersion(e)
}
entries := make([]*Entry, 0, len(txn.pendingWrites)+len(txn.duplicateWrites)+1)
//组装entry数据
processEntry := func(e *Entry) {
// Suffix the keys with commit ts, so the key versions are sorted in
// descending order of commit timestamp.
e.Key = y.KeyWithTs(e.Key, e.version)
// Add bitTxn only if these entries are part of a transaction. We
// support SetEntryAt(..) in managed mode which means a single
// transaction can have entries with different timestamps. If entries
// in a single transaction have different timestamps, we don't add the
// transaction markers.
if keepTogether {
e.meta |= bitTxn
}
entries = append(entries, e)
}
// The following debug information is what led to determining the cause of
// bank txn violation bug, and it took a whole bunch of effort to narrow it
// down to here. So, keep this around for at least a couple of months.
// var b strings.Builder
// fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",
// txn.readTs, commitTs, txn.reads, txn.conflictKeys)
for _, e := range txn.pendingWrites {
processEntry(e)
}
for _, e := range txn.duplicateWrites {
processEntry(e)
}
if keepTogether {
// CommitTs should not be zero if we're inserting transaction markers.
y.AssertTrue(commitTs != 0)
e := &Entry{
Key: y.KeyWithTs(txnKey, commitTs),
Value: []byte(strconv.FormatUint(commitTs, 10)),
meta: bitFinTxn,
}
entries = append(entries, e)
}
req, err := txn.db.sendToWriteCh(entries)//发送给writeCh
if err != nil {
orc.doneCommit(commitTs)
return nil, err
}
ret := func() error {
err := req.Wait()
// Wait before marking commitTs as done.
// We can't defer doneCommit above, because it is being called from a
// callback here.
orc.doneCommit(commitTs)
return err
}
return ret, nil
}
最后塞给db.writeCh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
if atomic.LoadInt32(&db.blockWrites) == 1 {
return nil, ErrBlockedWrites
}
var count, size int64
for _, e := range entries {
size += e.estimateSizeAndSetThreshold(db.valueThreshold())
count++
}
//批次大小检测
if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
return nil, ErrTxnTooBig
}
// We can only service one request because we need each txn to be stored in a contiguous section.
// Txns should not interleave among other txns or rewrites.
req := requestPool.Get().(*request)
req.reset()
req.Entries = entries
req.Wg.Add(1)
req.IncrRef() // for db write
db.writeCh <- req // Handled in doWrites.//塞给writeCh
y.NumPutsAdd(db.opt.MetricsEnabled, int64(len(entries)))
return req, nil
}
下半部分,注意db.open的时候会起几个Goroutine
1
2
go db.doWrites(db.closers.writes)
go db.handleHandovers(db.closers.writes)
doWrites负责select writeCh,并处理写请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
func (db *DB) doWrites(lc *z.Closer) {
defer lc.Done()
pendingCh := make(chan struct{}, 1)
writeRequests := func(reqs []*request) {
if err := db.writeRequests(reqs); err != nil {//主要处理写请求逻辑
db.opt.Errorf("writeRequests: %v", err)
}
<-pendingCh
}
// This variable tracks the number of pending writes.
reqLen := new(expvar.Int)
y.PendingWritesSet(db.opt.MetricsEnabled, db.opt.Dir, reqLen)
reqs := make([]*request, 0, 10)
for {
var r *request
select {
case r = <-db.writeCh://原来的writeCh
case <-lc.HasBeenClosed():
goto closedCase
}
for {
reqs = append(reqs, r)
reqLen.Set(int64(len(reqs)))
if len(reqs) >= 3*kvWriteChCapacity {
pendingCh <- struct{}{} // blocking.
goto writeCase
}
select {
// Either push to pending, or continue to pick from writeCh.
case r = <-db.writeCh:
case pendingCh <- struct{}{}:
goto writeCase//处理写逻辑,回收重置reqs, go writeRequests(reqs)
case <-lc.HasBeenClosed():
goto closedCase
}
}
closedCase:
// All the pending request are drained.
// Don't close the writeCh, because it has be used in several places.
for {
select {
case r = <-db.writeCh:
reqs = append(reqs, r)
default:
pendingCh <- struct{}{} // Push to pending before doing a write.
writeRequests(reqs)
return
}
}
writeCase:
go writeRequests(reqs)
reqs = make([]*request, 0, 10)
reqLen.Set(0)
}
}
db.writeRequests(reqs)这里才会实际写,包括vlog,LSM等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// writeRequests is called serially by only one goroutine.
func (db *DB) writeRequests(reqs []*request) error {
if len(reqs) == 0 {
return nil
}
done := func(err error) {
for _, r := range reqs {
r.Err = err
r.Wg.Done()
}
}
db.opt.Debugf("writeRequests called. Writing to value log")
//先写vlog
err := db.vlog.write(reqs)
if err != nil {
done(err)
return err
}
db.opt.Debugf("Sending updates to subscribers")
db.pub.sendUpdates(reqs)
db.opt.Debugf("Writing to memtable")
var count int
for _, b := range reqs {
if len(b.Entries) == 0 {
continue
}
count += len(b.Entries)
var i uint64
var err error
for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
i++
if i%100 == 0 {
db.opt.Debugf("Making room for writes")
}
// We need to poll a bit because both hasRoomForWrite and the flusher need access to s.imm.
// When flushChan is full and you are blocked there, and the flusher is trying to update s.imm,
// you will get a deadlock.
time.Sleep(10 * time.Millisecond)
}
if err != nil {
done(err)
return y.Wrap(err, "writeRequests")
}
//写LSM
if err := db.writeToLSM(b); err != nil {
done(err)
return y.Wrap(err, "writeRequests")
}
}
done(nil)
db.opt.Debugf("%d entries written", count)
return nil
}