diff --git a/wal.go b/wal.go index f37d3fb..a1aab71 100644 --- a/wal.go +++ b/wal.go @@ -76,6 +76,24 @@ func (fb *filebased) filename() string { return filepath.Join(fb.dir, sequenceToFilename(fb.fileSequence)) } +func (fb *filebased) loadLastFile() (bool, error) { + files, err := ioutil.ReadDir(fb.dir) + if err != nil { + return false, err + } + numFiles := len(files) + if numFiles == 0 { + return false, nil + } + fi := files[len(files)-1] + if fi.Size() > maxSegmentSize { + return false, nil + } + fb.fileSequence = filenameToSequence(fi.Name()) + fb.position = fi.Size() + return true, nil +} + // WAL provides a simple write-ahead log backed by a single file on disk. It is // safe to write to a single WAL from multiple goroutines. type WAL struct { @@ -97,7 +115,18 @@ func Open(dir string, syncInterval time.Duration) (*WAL, error) { log: golog.LoggerFor("wal"), }, } - err := wal.advance() + loaded, err := wal.loadLastFile() + if err != nil { + return nil, fmt.Errorf("unable to load last file: %s", err) + } + if !loaded { + err = wal.advance() + } else { + err = wal.openFile() + if err == nil { + wal.writer = bufio.NewWriterSize(wal.file, defaultFileBuffer) + } + } if err != nil { return nil, err } @@ -143,7 +172,7 @@ func (wal *WAL) Latest() ([]byte, Offset, error) { _, err := io.ReadFull(r, headBuf) if err != nil { // upon encountering a read error, break, as we've found the end of the latest segment - return false, nil + break } length := int64(encoding.Uint32(headBuf)) @@ -154,6 +183,7 @@ func (wal *WAL) Latest() ([]byte, Offset, error) { // upon encountering a read error, break, as we've found the end of the latest segment break } + h.Reset() h.Write(b) if h.Sum32() != checksum { @@ -166,7 +196,7 @@ func (wal *WAL) Latest() ([]byte, Offset, error) { } if position > 0 { - // We found a valid entry in the current file, return + // An entry was read in the current file, set offset and stop iterating offset = newOffset(fileSequence, position) return false, nil } @@ -229,16 +259,13 @@ func (wal *WAL) Write(bufs ...[]byte) (int, error) { wal.position += int64(n) } - if wal.syncImmediate { - wal.doSync() - } - if wal.position >= maxSegmentSize { // Write sentinel length to mark end of file - _, err = wal.writer.Write(sentinelBytes) + sn, err := wal.writer.Write(sentinelBytes) if err != nil { return 0, err } + wal.position += int64(sn) err = wal.writer.Flush() if err != nil { return 0, err @@ -249,6 +276,10 @@ func (wal *WAL) Write(bufs ...[]byte) (int, error) { } } + if wal.syncImmediate { + wal.doSync() + } + return n, nil } @@ -582,7 +613,7 @@ top: n, err := r.reader.Read(headBuf[read:]) read += n r.position += int64(n) - if err != nil && err.Error() == "EOF" && read < 4 { + if err != nil && err == io.EOF && read < 4 { if r.wal.hasMovedBeyond(r.fileSequence) { if read > 0 { r.log.Errorf("Out of data to read after reading %d, and WAL has moved beyond %d. Assuming WAL at %v corrupted. Advancing and continuing.", r.position, r.fileSequence, r.filename())