写入流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//etcdserver/apply.go
func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
resp = &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}
trace = traceutil.Get(ctx)
// create put tracing if the trace in context is empty
if trace.IsEmpty() {
trace = traceutil.New("put",
a.s.getLogger(),
traceutil.Field{Key: "key", Value: string(p.Key)},
traceutil.Field{Key: "req_size", Value: proto.Size(p)},
)
}
val, leaseID := p.Value, lease.LeaseID(p.Lease)
if txn == nil {
if leaseID != lease.NoLease {
if l := a.s.lessor.Lookup(leaseID); l == nil {
return nil, nil, lease.ErrLeaseNotFound
}
}
//获得一个
txn = a.s.KV().Write(trace)
//End内部如果是写,会判断是否需要commit
defer txn.End()
}
var rr *mvcc.RangeResult
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
trace.StepWithFunction(func() {
rr, err = txn.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{})
}, "get previous kv pair")
if err != nil {
return nil, nil, err
}
}
if p.IgnoreValue || p.IgnoreLease {
if rr == nil || len(rr.KVs) == 0 {
// ignore_{lease,value} flag expects previous key-value pair
return nil, nil, ErrKeyNotFound
}
}
if p.IgnoreValue {
val = rr.KVs[0].Value
}
if p.IgnoreLease {
leaseID = lease.LeaseID(rr.KVs[0].Lease)
}
if p.PrevKv {
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
}
resp.Header.Revision = txn.Put(p.Key, val, leaseID)
trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
return resp, trace, nil
}
a.s.KV().Write(trace)的实现有两个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//mvcc/kvstore_tx.go
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.Lock()
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
}
//mvcc/watchable_store_txn.go
type watchableStoreTxnWrite struct {
TxnWrite
s *watchableStore
}
func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite {
return &watchableStoreTxnWrite{s.store.Write(trace), s}
}
再来看下End()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//mvcc/kvstore_txn.go
func (tw *storeTxnWrite) End() {
// only update index if the txn modifies the mvcc state.
if len(tw.changes) != 0 {
tw.s.saveIndex(tw.tx)
// hold revMu lock to prevent new read txns from opening until writeback.
tw.s.revMu.Lock()
tw.s.currentRev++
}
//调用写事务BatchTx的Unlock,这里需要注意:Unlock原本是ReadTx定义的接口,但是嵌套在BatchTx
tw.tx.Unlock()
if len(tw.changes) != 0 {
tw.s.revMu.Unlock()
}
tw.s.mu.RUnlock()
}
//mvcc/backend/batch_tx.go
func (t *batchTxBuffered) Unlock() {
if t.pending != 0 {
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.Unlock()
if t.pending >= t.backend.batchLimit {
t.commit(false)
}
}
t.batchTx.Unlock()
}
//
func (t *batchTx) Unlock() {
//判断是否达到事务提交阈值
if t.pending >= t.backend.batchLimit {
t.commit(false)
}
t.Mutex.Unlock()
}
在创建backend的时候会创建负责执行读写事务的初始化readTx,batchTx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func newBackend(bcfg BackendConfig) *backend {
if bcfg.Logger == nil {
bcfg.Logger = zap.NewNop()
}
bopts := &bolt.Options{}
if boltOpenOptions != nil {
*bopts = *boltOpenOptions
}
bopts.InitialMmapSize = bcfg.mmapSize()
bopts.FreelistType = bcfg.BackendFreelistType
bopts.NoSync = bcfg.UnsafeNoFsync
bopts.NoGrowSync = bcfg.UnsafeNoFsync
//打开bolt db
db, err := bolt.Open(bcfg.Path, 0600, bopts)
if err != nil {
bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
}
// In future, may want to make buffering optional for low-concurrency systems
// or dynamically swap between buffered/non-buffered depending on workload.
b := &backend{
db: db,
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit,
//初始化readTx,执行读事务操作
readTx: &readTx{
baseReadTx: baseReadTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
},
buckets: make(map[string]*bolt.Bucket),
txWg: new(sync.WaitGroup),
txMu: new(sync.RWMutex),
},
},
stopc: make(chan struct{}),
donec: make(chan struct{}),
lg: bcfg.Logger,
}
//初始化batchTx,执行写事务操作;这里会执行一次commit操作
b.batchTx = newBatchTxBuffered(b)
//启动goroutine,异步执行commit操作
go b.run()
return b
}
//mvcc/backend/batch_tx.go batchTxBuffered持有batchTx
func newBatchTxBuffered(backend *backend) *batchTxBuffered {
tx := &batchTxBuffered{
batchTx: batchTx{backend: backend},
buf: txWriteBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
seq: true,
},
}
tx.Commit()
return tx
}
ReadTx和BatchTx定义,其中BatchTx内部又嵌套了ReadTx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type ReadTx interface {
Lock()
Unlock()
RLock()
RUnlock()
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
}
type BatchTx interface {
ReadTx
UnsafeCreateBucket(name []byte)
UnsafePut(bucketName []byte, key []byte, value []byte)
UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
UnsafeDelete(bucketName []byte, key []byte)
// Commit commits a previous tx and begins a new writable one.
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
CommitAndStop()
}
readTx被baseReadTx持有,batchTxBuffered持有batchTx
1
2
3
4
5
6
7
type readTx struct {
baseReadTx
}
type batchTxBuffered struct {
batchTx
buf txWriteBuffer
}
数据持久化
1