From 7cb086d9655aec0d66fa886d33dbeb8dc4d219ee Mon Sep 17 00:00:00 2001 From: Arturo Vergara Date: Wed, 11 Jul 2018 15:08:19 -0500 Subject: [PATCH 1/4] Fixed WAL file management behavior Latest() now returns the latest data and offset as expected. Additionally, if a previous file exists, and the file is smaller than the maximum segment size, then that file is loaded instead of advancing the WAL and creating a new one. This fixes a problem that made tests hang. --- wal.go | 58 ++++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/wal.go b/wal.go index f37d3fb..b1cbb6a 100644 --- a/wal.go +++ b/wal.go @@ -76,6 +76,24 @@ func (fb *filebased) filename() string { return filepath.Join(fb.dir, sequenceToFilename(fb.fileSequence)) } +func (fb *filebased) loadLastFile() (bool, error) { + files, err := ioutil.ReadDir(fb.dir) + if err != nil { + return false, err + } + numFiles := len(files) + if numFiles == 0 { + return false, nil + } + fi := files[len(files)-1] + if fi.Size() > maxSegmentSize { + return false, nil + } + fb.fileSequence = filenameToSequence(fi.Name()) + fb.position = fi.Size() + return true, nil +} + // WAL provides a simple write-ahead log backed by a single file on disk. It is // safe to write to a single WAL from multiple goroutines. type WAL struct { @@ -97,7 +115,18 @@ func Open(dir string, syncInterval time.Duration) (*WAL, error) { log: golog.LoggerFor("wal"), }, } - err := wal.advance() + loaded, err := wal.loadLastFile() + if err != nil { + return nil, fmt.Errorf("unable to load last file: %s", err) + } + if !loaded { + err = wal.advance() + } else { + err = wal.openFile() + if err == nil { + wal.writer = bufio.NewWriterSize(wal.file, defaultFileBuffer) + } + } if err != nil { return nil, err } @@ -117,7 +146,7 @@ func (wal *WAL) Latest() ([]byte, Offset, error) { var offset Offset lastSeq := int64(0) - err := wal.forEachSegmentInReverse(func(file os.FileInfo, first bool, last bool) (bool, error) { + err := wal.forEachSegment(func(file os.FileInfo, first bool, last bool) (bool, error) { filename := file.Name() fileSequence := filenameToSequence(filename) if fileSequence == lastSeq { @@ -142,8 +171,8 @@ func (wal *WAL) Latest() ([]byte, Offset, error) { headBuf := make([]byte, 8) _, err := io.ReadFull(r, headBuf) if err != nil { - // upon encountering a read error, break, as we've found the end of the latest segment - return false, nil + // upon encountering EOF, break, as we've found the end of the latest segment + break } length := int64(encoding.Uint32(headBuf)) @@ -151,9 +180,10 @@ func (wal *WAL) Latest() ([]byte, Offset, error) { b := make([]byte, length) _, err = io.ReadFull(r, b) if err != nil { - // upon encountering a read error, break, as we've found the end of the latest segment + // upon encountering EOF, break, as we've found the end of the latest segment break } + h.Reset() h.Write(b) if h.Sum32() != checksum { @@ -166,14 +196,13 @@ func (wal *WAL) Latest() ([]byte, Offset, error) { } if position > 0 { - // We found a valid entry in the current file, return offset = newOffset(fileSequence, position) - return false, nil + return true, nil } lastSeq = fileSequence - return true, nil + return false, nil }) // No files found with a valid entry, return nil data and offset @@ -229,16 +258,13 @@ func (wal *WAL) Write(bufs ...[]byte) (int, error) { wal.position += int64(n) } - if wal.syncImmediate { - wal.doSync() - } - if wal.position >= maxSegmentSize { // Write sentinel length to mark end of file - _, err = wal.writer.Write(sentinelBytes) + sn, err := wal.writer.Write(sentinelBytes) if err != nil { return 0, err } + wal.position += int64(sn) err = wal.writer.Flush() if err != nil { return 0, err @@ -249,6 +275,10 @@ func (wal *WAL) Write(bufs ...[]byte) (int, error) { } } + if wal.syncImmediate { + wal.doSync() + } + return n, nil } @@ -582,7 +612,7 @@ top: n, err := r.reader.Read(headBuf[read:]) read += n r.position += int64(n) - if err != nil && err.Error() == "EOF" && read < 4 { + if err != nil && err == io.EOF && read < 4 { if r.wal.hasMovedBeyond(r.fileSequence) { if read > 0 { r.log.Errorf("Out of data to read after reading %d, and WAL has moved beyond %d. Assuming WAL at %v corrupted. Advancing and continuing.", r.position, r.fileSequence, r.filename()) From 101d5eb12b49d378da9fcd0656478a5a336f01d2 Mon Sep 17 00:00:00 2001 From: Arturo Vergara Date: Wed, 11 Jul 2018 15:22:51 -0500 Subject: [PATCH 2/4] Revert read error explanation --- wal.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wal.go b/wal.go index b1cbb6a..97ccb34 100644 --- a/wal.go +++ b/wal.go @@ -171,7 +171,7 @@ func (wal *WAL) Latest() ([]byte, Offset, error) { headBuf := make([]byte, 8) _, err := io.ReadFull(r, headBuf) if err != nil { - // upon encountering EOF, break, as we've found the end of the latest segment + // upon encountering a read error, break, as we've found the end of the latest segment break } @@ -180,7 +180,7 @@ func (wal *WAL) Latest() ([]byte, Offset, error) { b := make([]byte, length) _, err = io.ReadFull(r, b) if err != nil { - // upon encountering EOF, break, as we've found the end of the latest segment + // upon encountering a read error, break, as we've found the end of the latest segment break } From bf54988a0d7f7220172085747179fc51306f507c Mon Sep 17 00:00:00 2001 From: Arturo Vergara Date: Wed, 11 Jul 2018 16:35:01 -0500 Subject: [PATCH 3/4] Fix traversal order --- wal.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wal.go b/wal.go index 97ccb34..1224011 100644 --- a/wal.go +++ b/wal.go @@ -146,7 +146,7 @@ func (wal *WAL) Latest() ([]byte, Offset, error) { var offset Offset lastSeq := int64(0) - err := wal.forEachSegment(func(file os.FileInfo, first bool, last bool) (bool, error) { + err := wal.forEachSegmentInReverse(func(file os.FileInfo, first bool, last bool) (bool, error) { filename := file.Name() fileSequence := filenameToSequence(filename) if fileSequence == lastSeq { @@ -197,7 +197,7 @@ func (wal *WAL) Latest() ([]byte, Offset, error) { if position > 0 { offset = newOffset(fileSequence, position) - return true, nil + return false, nil } lastSeq = fileSequence From 78bc72ae3f13a680af31ed5155aaf818dfa21a97 Mon Sep 17 00:00:00 2001 From: Arturo Vergara Date: Wed, 11 Jul 2018 16:41:31 -0500 Subject: [PATCH 4/4] Adjusted for reverse iteration --- wal.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/wal.go b/wal.go index 1224011..a1aab71 100644 --- a/wal.go +++ b/wal.go @@ -196,13 +196,14 @@ func (wal *WAL) Latest() ([]byte, Offset, error) { } if position > 0 { + // An entry was read in the current file, set offset and stop iterating offset = newOffset(fileSequence, position) return false, nil } lastSeq = fileSequence - return false, nil + return true, nil }) // No files found with a valid entry, return nil data and offset