Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,24 @@ func (fb *filebased) filename() string {
return filepath.Join(fb.dir, sequenceToFilename(fb.fileSequence))
}

func (fb *filebased) loadLastFile() (bool, error) {
files, err := ioutil.ReadDir(fb.dir)
if err != nil {
return false, err
}
numFiles := len(files)
if numFiles == 0 {
return false, nil
}
fi := files[len(files)-1]
if fi.Size() > maxSegmentSize {
return false, nil
}
fb.fileSequence = filenameToSequence(fi.Name())
fb.position = fi.Size()
return true, nil
}

// WAL provides a simple write-ahead log backed by a single file on disk. It is
// safe to write to a single WAL from multiple goroutines.
type WAL struct {
Expand All @@ -97,7 +115,18 @@ func Open(dir string, syncInterval time.Duration) (*WAL, error) {
log: golog.LoggerFor("wal"),
},
}
err := wal.advance()
loaded, err := wal.loadLastFile()
if err != nil {
return nil, fmt.Errorf("unable to load last file: %s", err)
}
if !loaded {
err = wal.advance()
} else {
err = wal.openFile()
if err == nil {
wal.writer = bufio.NewWriterSize(wal.file, defaultFileBuffer)
}
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -143,7 +172,7 @@ func (wal *WAL) Latest() ([]byte, Offset, error) {
_, err := io.ReadFull(r, headBuf)
if err != nil {
// upon encountering a read error, break, as we've found the end of the latest segment
return false, nil
break
}

length := int64(encoding.Uint32(headBuf))
Expand All @@ -154,6 +183,7 @@ func (wal *WAL) Latest() ([]byte, Offset, error) {
// upon encountering a read error, break, as we've found the end of the latest segment
break
}

h.Reset()
h.Write(b)
if h.Sum32() != checksum {
Expand All @@ -166,7 +196,7 @@ func (wal *WAL) Latest() ([]byte, Offset, error) {
}

if position > 0 {
// We found a valid entry in the current file, return
// An entry was read in the current file, set offset and stop iterating
offset = newOffset(fileSequence, position)
return false, nil
}
Expand Down Expand Up @@ -229,16 +259,13 @@ func (wal *WAL) Write(bufs ...[]byte) (int, error) {
wal.position += int64(n)
}

if wal.syncImmediate {
wal.doSync()
}

if wal.position >= maxSegmentSize {
// Write sentinel length to mark end of file
_, err = wal.writer.Write(sentinelBytes)
sn, err := wal.writer.Write(sentinelBytes)
if err != nil {
return 0, err
}
wal.position += int64(sn)
err = wal.writer.Flush()
if err != nil {
return 0, err
Expand All @@ -249,6 +276,10 @@ func (wal *WAL) Write(bufs ...[]byte) (int, error) {
}
}

if wal.syncImmediate {
wal.doSync()
}

return n, nil
}

Expand Down Expand Up @@ -582,7 +613,7 @@ top:
n, err := r.reader.Read(headBuf[read:])
read += n
r.position += int64(n)
if err != nil && err.Error() == "EOF" && read < 4 {
if err != nil && err == io.EOF && read < 4 {
if r.wal.hasMovedBeyond(r.fileSequence) {
if read > 0 {
r.log.Errorf("Out of data to read after reading %d, and WAL has moved beyond %d. Assuming WAL at %v corrupted. Advancing and continuing.", r.position, r.fileSequence, r.filename())
Expand Down