-
Notifications
You must be signed in to change notification settings - Fork 8
Open
Description
This repo has some good design. However the quality is poor.
Here's the fix for unit test:
diff --git a/wal.go b/wal.go
index f37d3fb..aad0fa8 100644
--- a/wal.go
+++ b/wal.go
@@ -16,7 +16,7 @@ import (
"sync/atomic"
"time"
- "github.com/dustin/go-humanize"
+ humanize "github.com/dustin/go-humanize"
"github.com/getlantern/golog"
"github.com/golang/snappy"
)
@@ -67,7 +67,7 @@ func (fb *filebased) openFile() error {
filename := fb.filename()
seq := filenameToSequence(filename)
ts := sequenceToTime(seq)
- fb.log.Debugf("Opened %v (%v)", filename, ts)
+ fb.log.Debugf("Opened(%v) %v (%v)", fb.fileFlags, filename, ts)
}
return err
}
@@ -82,6 +82,7 @@ type WAL struct {
filebased
syncImmediate bool
writer *bufio.Writer
+ closed bool
mx sync.RWMutex
}
@@ -137,13 +138,14 @@ func (wal *WAL) Latest() ([]byte, Offset, error) {
}
h := newHash()
+ lastPos := int64(0)
position := int64(0)
for {
headBuf := make([]byte, 8)
_, err := io.ReadFull(r, headBuf)
if err != nil {
// upon encountering a read error, break, as we've found the end of the latest segment
- return false, nil
+ break
}
length := int64(encoding.Uint32(headBuf))
@@ -162,12 +164,13 @@ func (wal *WAL) Latest() ([]byte, Offset, error) {
}
data = b
+ lastPos = position
position += 8 + length
}
if position > 0 {
// We found a valid entry in the current file, return
- offset = newOffset(fileSequence, position)
+ offset = newOffset(fileSequence, lastPos)
return false, nil
}
@@ -253,23 +256,15 @@ func (wal *WAL) Write(bufs ...[]byte) (int, error) {
}
// TruncateBefore removes all data prior to the given offset from disk.
+// The last segment will always be kept.
func (wal *WAL) TruncateBefore(o Offset) error {
cutoff := sequenceToFilename(o.FileSequence())
- _, latestOffset, err := wal.Latest()
- if err != nil {
- return fmt.Errorf("Unable to determine latest offset: %v", err)
- }
- latestSequence := latestOffset.FileSequence()
return wal.forEachSegment(func(file os.FileInfo, first bool, last bool) (bool, error) {
if last || file.Name() >= cutoff {
// Files are sorted by name, if we've gotten past the cutoff or
// encountered the last (active) file, don't bother continuing.
return false, nil
}
- if filenameToSequence(file.Name()) == latestSequence {
- // Don't delete the file containing the latest valid entry
- return true, nil
- }
rmErr := os.Remove(filepath.Join(wal.dir, file.Name()))
if rmErr != nil {
return false, rmErr
@@ -285,21 +280,23 @@ func (wal *WAL) TruncateBeforeTime(ts time.Time) error {
}
// TruncateToSize caps the size of the WAL to the given number of bytes
-func (wal *WAL) TruncateToSize(limit int64) error {
- seen := int64(0)
- return wal.forEachSegmentInReverse(func(file os.FileInfo, first bool, last bool) (bool, error) {
+func (wal *WAL) TruncateToSize(limit int64) (int64, error) {
+ remained := int64(0)
+ err := wal.forEachSegmentInReverse(func(file os.FileInfo, first bool, last bool) (bool, error) {
next := file.Size()
- seen += next
- if seen > limit {
+ if remained+next > limit {
fullname := filepath.Join(wal.dir, file.Name())
rmErr := os.Remove(fullname)
if rmErr != nil {
return false, rmErr
}
wal.log.Debugf("Removed WAL file %v", fullname)
+ } else {
+ remained += next
}
return true, nil
})
+ return remained, err
}
// CompressBefore compresses all data prior to the given offset on disk.
@@ -417,6 +414,7 @@ func (wal *WAL) Close() error {
wal.mx.Lock()
flushErr := wal.writer.Flush()
syncErr := wal.file.Sync()
+ wal.closed = true
wal.mx.Unlock()
closeErr := wal.file.Close()
if flushErr != nil {
@@ -537,11 +535,7 @@ func (wal *WAL) NewReader(name string, offset Offset, bufferSource func() []byte
// Read reads the next chunk from the WAL, blocking until one is available.
func (r *Reader) Read() ([]byte, error) {
for {
- length, err := r.readHeader()
- if err != nil {
- return nil, err
- }
- checksum, err := r.readHeader()
+ length, checksum, err := r.readHeader()
if err != nil {
return nil, err
}
@@ -553,12 +547,12 @@ func (r *Reader) Read() ([]byte, error) {
}
return nil, discardErr
}
- data, err := r.readData(length)
+ data, err := r.readData(int(length))
if data != nil || err != nil {
if data != nil {
r.h.Reset()
r.h.Write(data)
- if checksum != int(r.h.Sum32()) {
+ if checksum != r.h.Sum32() {
r.log.Errorf("Checksum mismatch, skipping entry")
continue
}
@@ -568,30 +562,17 @@ func (r *Reader) Read() ([]byte, error) {
}
}
-func (r *Reader) readHeader() (int, error) {
- headBuf := make([]byte, 4)
-top:
+func (r *Reader) readHeader() (length uint32, checksum uint32, err error) {
+ headBuf := make([]byte, 8)
for {
- length := 0
- read := 0
-
for {
if atomic.LoadInt32(&r.closed) == 1 {
- return 0, io.ErrUnexpectedEOF
+ return 0, 0, io.ErrUnexpectedEOF
}
- n, err := r.reader.Read(headBuf[read:])
- read += n
- r.position += int64(n)
- if err != nil && err.Error() == "EOF" && read < 4 {
- if r.wal.hasMovedBeyond(r.fileSequence) {
- if read > 0 {
- r.log.Errorf("Out of data to read after reading %d, and WAL has moved beyond %d. Assuming WAL at %v corrupted. Advancing and continuing.", r.position, r.fileSequence, r.filename())
- }
- advanceErr := r.advance()
- if advanceErr != nil {
- return 0, advanceErr
- }
- continue top
+ _, err = io.ReadFull(r.reader, headBuf)
+ if err != nil && err.Error() == "EOF" {
+ if r.wal.closed || r.wal.hasMovedBeyond(r.fileSequence) {
+ break
}
// No newer log files, continue trying to read from this one
time.Sleep(50 * time.Millisecond)
@@ -601,19 +582,14 @@ top:
r.log.Errorf("Unexpected error reading header from WAL file %v: %v", r.filename(), err)
break
}
- if read == 4 {
- length = int(encoding.Uint32(headBuf))
- break
- }
- }
-
- if length > sentinel {
- return length, nil
+ length = encoding.Uint32(headBuf[0:4])
+ checksum = encoding.Uint32(headBuf[4:8])
+ return
}
err := r.advance()
if err != nil {
- return 0, err
+ return 0, 0, err
}
}
}
diff --git a/wal_test.go b/wal_test.go
index 3728281..4c0ec04 100644
--- a/wal_test.go
+++ b/wal_test.go
@@ -1,6 +1,7 @@
package wal
import (
+ "fmt"
"io/ioutil"
"os"
"path/filepath"
@@ -75,7 +76,9 @@ func TestWAL(t *testing.T) {
}
wal.log.Debug(2)
+ wal.log.Debug(fmt.Sprintf("r.Offset() %v", r.Offset()))
b, readErr := r.Read()
+ wal.log.Debug(fmt.Sprintf("r.Offset() %v", r.Offset()))
if !assert.NoError(t, readErr) {
return false
}
@@ -108,7 +111,7 @@ func TestWAL(t *testing.T) {
if !assert.NoError(t, err) {
return
}
- assert.EqualValues(t, 9, lc.Position())
+ assert.EqualValues(t, 0, lc.Position())
assert.Equal(t, "2", string(latest))
r2, err := wal.NewReader("test", r.Offset(), bufferPool.Get)
@@ -117,6 +120,9 @@ func TestWAL(t *testing.T) {
}
defer r2.Close()
+ wal.log.Debug("Problem is here")
+ wal.log.Debug(fmt.Sprintf("r.Offset() %v", r.Offset()))
+ wal.log.Debug(fmt.Sprintf("r2.Offset() %v", r2.Offset()))
// Problem is here
if !testReadWrite("3") {
return
@@ -164,14 +170,12 @@ func TestWAL(t *testing.T) {
}
w.Flush()
file.Write([]byte("garbage"))
- } else {
- file.Seek(-1, 2)
- file.Write([]byte{0})
+ wal.log.Debug(fmt.Sprintf("corrupted file %v", name))
}
file.Close()
}
- assertWALContents([]string{"3"})
+ assertWALContents([]string{"2", "3"})
// Reader opened at prior offset should only get "3"
b, readErr := r2.Read()
@@ -196,8 +200,10 @@ func TestWAL(t *testing.T) {
testTruncate(t, wal, truncateErr, 1)
// Truncate to size 1, which should remove remaining log segment
- truncateErr = wal.TruncateToSize(1)
- testTruncate(t, wal, truncateErr, 0)
+ var remained int64
+ remained, truncateErr = wal.TruncateToSize(1)
+ assert.NoError(t, truncateErr, "Should be able to truncate")
+ assert.Equal(t, int64(0), remained)
}
func testTruncate(t *testing.T, wal *WAL, err error, expectedSegments int) {
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels