GC入口RunValueLogGC,供用户显示调用,唯一参数discardRatio,表示
1
2
3
4
5
6
7
8
9
10
11
| func (db *DB) RunValueLogGC(discardRatio float64) error {
if db.opt.InMemory {
return ErrGCInMemoryMode
}
if discardRatio >= 1.0 || discardRatio <= 0.0 {
return ErrInvalidRequest
}
// Pick a log file and run GC
return db.vlog.runGC(discardRatio)
}
|
主要两步:
1.选择vlog文件
2.执行GC,vlog重写rewrite
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| func (vlog *valueLog) runGC(discardRatio float64) error {
select {
case vlog.garbageCh <- struct{}{}:
// Pick a log file for GC.
defer func() {
<-vlog.garbageCh
}()
//pick 需要进行GC的文件
lf := vlog.pickLog(discardRatio)
if lf == nil {
return ErrNoRewrite
}
//GC rewrite
return vlog.doRunGC(lf)
default:
return ErrRejected
}
}
|
picklog的依据是discarded < discardRatio ,与较老的版本比较已经去掉了随机选取文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
| func (vlog *valueLog) pickLog(discardRatio float64) *logFile {
vlog.filesLock.RLock()
defer vlog.filesLock.RUnlock()
LOOP:
// Pick a candidate that contains the largest amount of discardable data
fid, discard := vlog.discardStats.MaxDiscard()
// MaxDiscard will return fid=0 if it doesn't have any discard data. The
// vlog files start from 1.
if fid == 0 {
for fid = vlog.nextGCFid; fid < vlog.maxFid; fid++ {
lf := vlog.filesMap[fid]
if lf == nil {
continue
}
//计算文件的discarded,这里返回的是个百分比
discarded, err := vlog.calculateDiscardStat(lf)
if err != nil || discarded < discardRatio {
continue
}
vlog.nextGCFid = fid + 1
return lf
}
// reset the counter so next time we will start from the start
vlog.nextGCFid = 0
return nil
}
lf, ok := vlog.filesMap[fid]
// This file was deleted but it's discard stats increased because of compactions. The file
// doesn't exist so we don't need to do anything. Skip it and retry.
if !ok {
vlog.discardStats.Update(fid, -1)
goto LOOP
}
// We have a valid file.
fi, err := lf.Fd.Stat()
if err != nil {
vlog.opt.Errorf("Unable to get stats for value log fid: %d err: %+v", fi, err)
return nil
}
//判断依据discardRatio*Size>=discard
if thr := discardRatio * float64(fi.Size()); float64(discard) < thr {
vlog.opt.Debugf("Discard: %d less than threshold: %.0f for file: %s",
discard, thr, fi.Name())
return nil
}
maxFid := atomic.LoadUint32(&vlog.maxFid)
if fid < maxFid {
vlog.opt.Infof("Found value log max discard fid: %d discard: %d\n", fid, discard)
lf, ok := vlog.filesMap[fid]
y.AssertTrue(ok)
return lf
}
// Don't randomly pick any value log file.
return nil
}
|
discarded的计算calculateDiscardStat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| func (vlog *valueLog) calculateDiscardStat(f *logFile) (discardedRatio float64, err error) {
vlog.filesLock.RLock()
for _, fid := range vlog.filesToBeDeleted {
if fid == f.fid {
vlog.filesLock.RUnlock()
return 0, errors.Errorf("value log file already marked for deletion fid: %d", fid)
}
}
maxFid := vlog.maxFid
y.AssertTruef(uint32(f.fid) < maxFid, "fid to calculateDiscardStat: %d. Current max fid: %d", f.fid, maxFid)
vlog.filesLock.RUnlock()
y.AssertTrue(vlog.db != nil)
var discarded, count int
fe := func(e Entry) error {
count++
ts := vlog.db.orc.readTs()
key := y.ParseKey(e.Key)
vs, err := vlog.db.get(y.KeyWithTs(key, ts))
if err != nil {
return err
}
//判断是否discard
if discardEntry(e, vs, vlog.db) {
discarded++
}
return nil
}
//遍历计算discarded和count
_, err = f.iterate(vlog.opt.ReadOnly, 0, func(e Entry, vp valuePointer) error {
return fe(e)
})
if err != nil {
return 0, err
}
//最后计算百分比
return float64(discarded) / float64(count), nil
}
|
discard判断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| /*
* 1.已删除或者过期
* 2.value是vptr
* 3.tnx entry
*/
func discardEntry(e Entry, vs y.ValueStruct, db *DB) bool {
if vs.Version != y.ParseTs(e.Key) {
// Version not found. Discard.
return true
}
if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
return true
}
if (vs.Meta & bitValuePointer) == 0 {
// Key also stores the value in LSM. Discard.
return true
}
if (vs.Meta & bitFinTxn) > 0 {
// Just a txn finish entry. Discard.
return true
}
return false
}
|
何时更新呢
levelsController.doCompact()–>levelsController.runCompactDef()–>levelsController.compactBuildTables–>levelsController.subcompact()
1
2
| //levelsController.subcompact()
s.kv.vlog.updateDiscardStats(discardStats)
|
1
2
3
4
5
6
7
8
| func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
if vlog.opt.InMemory {
return
}
for fid, discard := range stats {
vlog.discardStats.Update(fid, discard)
}
}
|