Skip to content

Commit 01200fd

Browse files
authored
Merge pull request #1200 from PapaPiya/fix_lag
[PDR-16094][fix(lag)]: dirx/dir模式下lag显示不正确
2 parents 6976061 + 698c81b commit 01200fd

File tree

3 files changed

+50
-40
lines changed

3 files changed

+50
-40
lines changed

reader/dirx/dir_reader.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,18 @@ func (drs *dirReaders) SyncMeta() ([]byte, error) {
465465
return data, nil
466466
}
467467

468+
func (drs *dirReaders) Lag() (rl *LagInfo, err error) {
469+
rl = &LagInfo{Size: 0, SizeUnit: "bytes"}
470+
for _, dr := range drs.getReaders() {
471+
drLag, err := dr.br.Lag()
472+
if err != nil {
473+
return nil, err
474+
}
475+
rl.Size += drLag.Size
476+
}
477+
return rl, nil
478+
}
479+
468480
func (drs *dirReaders) Close() {
469481
var wg sync.WaitGroup
470482
for _, dr := range drs.getReaders() {

reader/dirx/dirx.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,10 @@ func (r *Reader) ReadLine() (string, error) {
422422
}
423423
}
424424

425+
func (r *Reader) Lag() (rl *LagInfo, err error) {
426+
return r.dirReaders.Lag()
427+
}
428+
425429
func (r *Reader) Status() StatsInfo {
426430
r.statsLock.RLock()
427431
defer r.statsLock.RUnlock()

reader/seqfile/seqfile.go

Lines changed: 34 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
// SeqFile 按最终修改时间依次读取文件的Reader类型
2727
type SeqFile struct {
2828
meta *reader.Meta
29-
mux sync.Mutex
29+
mux *sync.RWMutex
3030

3131
name string
3232
dir string // 文件目录
@@ -109,7 +109,7 @@ func NewSeqFile(meta *reader.Meta, path string, ignoreHidden, newFileNewLine boo
109109
ignoreFileSuffix: suffixes,
110110
ignoreHidden: ignoreHidden,
111111
validFilePattern: validFileRegex,
112-
mux: sync.Mutex{},
112+
mux: new(sync.RWMutex),
113113
newFileAsNewLine: newFileNewLine,
114114
meta: meta,
115115
inodeOffset: make(map[string]int64),
@@ -499,13 +499,7 @@ func (sf *SeqFile) getNextFileCondition() (condition func(os.FileInfo) bool, err
499499
if len(sf.inodeOffset) < 1 {
500500
return true
501501
}
502-
var key string
503-
if sf.inodeSensitive {
504-
key = reader.JoinFileInode(f.Name(), strconv.FormatUint(inode, 10))
505-
} else {
506-
key = filepath.Base(f.Name())
507-
}
508-
offset, ok := sf.inodeOffset[key]
502+
offset, ok := sf.inodeOffset[getInodeKey(f.Name(), inode, sf.inodeSensitive)]
509503
return !ok || (sf.ReadSameInode && offset != -1 && f.Size() != offset)
510504
}
511505

@@ -624,13 +618,7 @@ func (sf *SeqFile) open(fi os.FileInfo) (err error) {
624618
if sf.inodeOffset == nil {
625619
sf.inodeOffset = make(map[string]int64)
626620
}
627-
var key string
628-
if sf.inodeSensitive {
629-
key = reader.JoinFileInode(doneFile, strconv.FormatUint(doneFileInode, 10))
630-
} else {
631-
key = filepath.Base(doneFile)
632-
}
633-
sf.inodeOffset[key] = doneFileOffset
621+
sf.inodeOffset[getInodeKey(doneFile, doneFileInode, sf.inodeSensitive)] = doneFileOffset
634622
tryTime := 0
635623
for {
636624
err := sf.meta.SyncDoneFileInode(sf.inodeOffset)
@@ -682,19 +670,8 @@ func (sf *SeqFile) SyncMeta() (err error) {
682670

683671
func (sf *SeqFile) Lag() (rl *LagInfo, err error) {
684672
sf.mux.Lock()
685-
rl = &LagInfo{Size: -sf.offset, SizeUnit: "bytes"}
686-
logReading := filepath.Base(sf.currFile)
687-
sf.mux.Unlock()
688-
689-
inode, err := utilsos.GetIdentifyIDByPath(sf.currFile)
690-
if os.IsNotExist(err) || (inode != 0 && inode != sf.inode) {
691-
rl.Size = 0
692-
err = nil
693-
}
694-
if err != nil {
695-
rl.Size = 0
696-
return rl, err
697-
}
673+
defer sf.mux.Unlock()
674+
rl = &LagInfo{Size: 0, SizeUnit: "bytes"}
698675

699676
logs, err := ReadDirByTime(sf.dir)
700677
if err != nil {
@@ -705,13 +682,28 @@ func (sf *SeqFile) Lag() (rl *LagInfo, err error) {
705682
if l.IsDir() {
706683
continue
707684
}
708-
if condition == nil || !condition(l) {
685+
686+
inode, err := utilsos.GetIdentifyIDByPath(filepath.Join(sf.dir, l.Name()))
687+
if os.IsNotExist(err) || inode == 0 {
709688
continue
710689
}
711-
rl.Size += l.Size()
712-
if l.Name() == logReading {
713-
break
690+
if err != nil {
691+
rl.Size = 0
692+
return rl, err
693+
}
694+
695+
if filepath.Base(sf.currFile) == l.Name() {
696+
if sf.inodeSensitive && sf.inode != inode {
697+
rl.Size += l.Size()
698+
} else {
699+
rl.Size += l.Size() - sf.offset
700+
}
701+
continue
714702
}
703+
if condition == nil || !condition(l) {
704+
continue
705+
}
706+
rl.Size += l.Size() - sf.inodeOffset[getInodeKey(l.Name(), inode, sf.inodeSensitive)]
715707
}
716708

717709
return rl, nil
@@ -750,13 +742,7 @@ func (sf *SeqFile) getOffset(f *os.File, offset int64, seek bool) int64 {
750742
log.Errorf("Runner[%s] NewSeqFile get file %s inode error %v, ignore...", sf.meta.RunnerName, fileName, err)
751743
return offset
752744
}
753-
var key string
754-
if sf.inodeSensitive {
755-
key = reader.JoinFileInode(fileName, strconv.FormatUint(inode, 10))
756-
} else {
757-
key = filepath.Base(fileName)
758-
}
759-
offset = sf.inodeOffset[key]
745+
offset = sf.inodeOffset[getInodeKey(fileName, inode, sf.inodeSensitive)]
760746
if fileInfo.Size() < offset {
761747
offset = 0
762748
}
@@ -769,6 +755,14 @@ func (sf *SeqFile) getOffset(f *os.File, offset int64, seek bool) int64 {
769755
return offset
770756
}
771757

758+
func getInodeKey(name string, inode uint64, inodeSensitive bool) string {
759+
if inodeSensitive {
760+
return reader.JoinFileInode(name, strconv.FormatUint(inode, 10))
761+
} else {
762+
return filepath.Base(name)
763+
}
764+
}
765+
772766
var (
773767
_ LineSkipper = new(SeqFile)
774768
_ reader.NewSourceRecorder = new(SeqFile)

0 commit comments

Comments
 (0)