diff --git a/cmd/pavosql/cmd/serve/serve.go b/cmd/pavosql/cmd/serve/serve.go index 9232af3..e3f76c4 100644 --- a/cmd/pavosql/cmd/serve/serve.go +++ b/cmd/pavosql/cmd/serve/serve.go @@ -5,7 +5,8 @@ import ( ) var ( - port uint16 + port uint16 + filePath string ) func Command() *cobra.Command { @@ -18,6 +19,7 @@ func Command() *cobra.Command { }, } + serveCmd.Flags().StringVarP(&filePath, "file", "f", "/var/lib/pavosql/pavosql.db", "") serveCmd.Flags().Uint16VarP(&port, "port", "p", 6677, "") return serveCmd diff --git a/internal/db/db.go b/internal/db/db.go new file mode 100644 index 0000000..b4ad054 --- /dev/null +++ b/internal/db/db.go @@ -0,0 +1,3 @@ +package db + +type DB struct{} diff --git a/internal/pager/pager.go b/internal/pager/pager.go new file mode 100644 index 0000000..99e3fd5 --- /dev/null +++ b/internal/pager/pager.go @@ -0,0 +1,56 @@ +package pager + +import ( + "sync" + + "github.com/gkits/pavosql/pkg/atomic" +) + +type Pager struct { + rw atomic.ReadWriterAt + freeList int64 + mu sync.RWMutex + end int64 +} + +type ( + readFn = func(int64) ([]byte, error) + commitFn = func(map[int64][]byte) error + + set[T comparable] = map[T]struct{} +) + +func (p *Pager) NewReader() (*Reader, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + reader := newReader(p.read) + + return reader, nil +} + +func (p *Pager) NewWriter() (*Writer, error) { + p.mu.Lock() + defer p.mu.Unlock() + + writer := newWriter(newReader(p.read), make(set[int64]), p.end, p.commit) + + return writer, nil +} + +func (p *Pager) read(off int64) ([]byte, error) { + page := make([]byte, 99) + if _, err := p.rw.ReadAt(page, int64(off)); err != nil { + return nil, err + } + return page, nil +} + +func (p *Pager) commit(changes map[int64][]byte) error { + for off, d := range changes { + if _, err := p.rw.WriteAt(d, off); err != nil { + return err + } + } + return p.rw.Commit() +} diff --git a/internal/pager/reader.go b/internal/pager/reader.go new file mode 100644 index 0000000..7a13a3f --- /dev/null +++ b/internal/pager/reader.go @@ -0,0 +1,23 @@ +package pager + +type Reader struct { + pages map[int64][]byte + read readFn +} + +func newReader(callbackRead readFn) *Reader { + return &Reader{make(map[int64][]byte), callbackRead} +} + +func (r *Reader) Read(off int64) ([]byte, error) { + if page, ok := r.pages[off]; ok { + return page, nil + } + + page, err := r.read(off) + if err != nil { + return nil, err + } + r.pages[off] = page + return page, nil +} diff --git a/internal/pager/writer.go b/internal/pager/writer.go new file mode 100644 index 0000000..c4bfd16 --- /dev/null +++ b/internal/pager/writer.go @@ -0,0 +1,60 @@ +package pager + +type Writer struct { + freelist set[int64] + freed set[int64] + new map[int64][]byte + nextPage int64 + pageSize int64 + commit commitFn + *Reader +} + +func newWriter(r *Reader, freelist set[int64], nextPage int64, commitCallback commitFn) *Writer { + return &Writer{ + Reader: r, + + freelist: freelist, + freed: make(set[int64]), + new: make(map[int64][]byte), + + commit: commitCallback, + nextPage: nextPage, + } +} + +func (w *Writer) Alloc(d []byte) int64 { + off := w.nextPage + switch { + case len(w.freed) > 0: + for off := range w.freed { + delete(w.freed, off) + break + } + case len(w.freelist) > 0: + for off := range w.freelist { + delete(w.freelist, off) + break + } + default: + w.nextPage += w.pageSize + } + w.new[off] = d + return off +} + +func (w *Writer) Free(off int64) { + if _, ok := w.freelist[off]; ok { + return + } + if _, ok := w.freed[off]; ok { + return + } + w.freed[off] = struct{}{} +} + +func (w *Writer) Commit() error { + return w.commit(w.new) +} + +func (w *Writer) Abort() {} diff --git a/internal/server/server.go b/internal/server/server.go new file mode 100644 index 0000000..abb4e43 --- /dev/null +++ b/internal/server/server.go @@ -0,0 +1 @@ +package server diff --git a/internal/tree/node.go b/internal/tree/node.go index 52b83f4..9ba92fa 100755 --- a/internal/tree/node.go +++ b/internal/tree/node.go @@ -97,13 +97,13 @@ func (n *node) Val(i uint16) []byte { return n[off+4+kLen : off+4+kLen+vLen] } -func (n *node) Pointer(i uint16) uint64 { +func (n *node) Pointer(i uint16) int64 { if !n.indexInBounds(i) { panic(ErrIndexOutOfBounds) } off := n.offset(i) kLen := binary.LittleEndian.Uint16(n[off:]) - return binary.LittleEndian.Uint64(n[off+2+kLen : off+2+kLen+8]) + return int64(binary.LittleEndian.Uint64(n[off+2+kLen : off+2+kLen+8])) } // Binary searches the target key inside n and returns its position and weither it exists. diff --git a/internal/tree/tree.go b/internal/tree/tree.go index 6adc9fd..35dce51 100755 --- a/internal/tree/tree.go +++ b/internal/tree/tree.go @@ -3,17 +3,21 @@ package tree import ( "errors" "fmt" + "slices" ) type pager interface { - ReadPage(off uint64) ([PageSize]byte, error) + ReadPage(int64) ([PageSize]byte, error) + Alloc([PageSize]byte) (int64, error) + Free(int64) error Commit() error - Rollback() error + Abort() error } type Tree struct { - root uint64 - pager pager + root int64 + pager pager + readOnly bool } func New() *Tree { @@ -50,9 +54,71 @@ func (t *Tree) Get(k []byte) ([]byte, error) { } func (t *Tree) Set(k []byte, v []byte) error { + if t.readOnly { + return errors.New("tree: cannot write onto read only tree") + } + page, err := t.pager.ReadPage(t.root) + if err != nil { + return fmt.Errorf("tree: failed to read root page: %w", err) + } + cur := node(page) + + visited := []node{cur} + for { + i, exists := cur.Search(k) + + switch cur.Type() { + case PointerPage: + ptr := cur.Pointer(i) + page, err = t.pager.ReadPage(ptr) + if err != nil { + return fmt.Errorf("tree: failed to read page: %w", err) + } + cur = node(page) + visited = append(visited, cur) + continue + + case LeafPage: + if !exists { + return errors.New("key does not exists on leaf node") + } + + if cur.CanSet(k, v) { + newNode := cur.Set(i, k, v) + ptr, err := t.pager.Alloc(newNode) + if err != nil { + return fmt.Errorf("tree: failed to allocate page: %w", err) + } + // TODO: pass ptr to parent node + _ = ptr + break + } + + // TODO: handle splitting + left, right := cur.Split() + _, _ = left, right + + default: + return errors.New("invalid page type") + } + break + } + + for _, n := range slices.Backward(visited) { + ptr, err := t.pager.Alloc(n) + if err != nil { + return err + } + // TODO: pass ptr to parent nodes + _ = ptr + } + return nil } func (t *Tree) Delete(k []byte) error { + if t.readOnly { + return errors.New("tree: cannot write onto read only tree") + } return nil } diff --git a/internal/tree/tree_test.go b/internal/tree/tree_test.go new file mode 100644 index 0000000..db8083d --- /dev/null +++ b/internal/tree/tree_test.go @@ -0,0 +1,91 @@ +package tree_test + +import ( + "testing" + + "github.com/gkits/pavosql/internal/tree" +) + +func TestTree_Get(t *testing.T) { + tests := []struct { + name string // description of this test case + // Named input parameters for target function. + k []byte + want []byte + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tr := tree.New() + got, gotErr := tr.Get(tt.k) + if gotErr != nil { + if !tt.wantErr { + t.Errorf("Get() failed: %v", gotErr) + } + return + } + if tt.wantErr { + t.Fatal("Get() succeeded unexpectedly") + } + // TODO: update the condition below to compare got with tt.want. + if true { + t.Errorf("Get() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestTree_Set(t *testing.T) { + tests := []struct { + name string // description of this test case + // Named input parameters for target function. + k []byte + v []byte + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tr := tree.New() + gotErr := tr.Set(tt.k, tt.v) + if gotErr != nil { + if !tt.wantErr { + t.Errorf("Set() failed: %v", gotErr) + } + return + } + if tt.wantErr { + t.Fatal("Set() succeeded unexpectedly") + } + }) + } +} + +func TestTree_Delete(t *testing.T) { + tests := []struct { + name string // description of this test case + // Named input parameters for target function. + k []byte + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tr := tree.New() + gotErr := tr.Delete(tt.k) + if gotErr != nil { + if !tt.wantErr { + t.Errorf("Delete() failed: %v", gotErr) + } + return + } + if tt.wantErr { + t.Fatal("Delete() succeeded unexpectedly") + } + }) + } +} diff --git a/pkg/atomic/file.go b/pkg/atomic/file.go new file mode 100644 index 0000000..6915fd2 --- /dev/null +++ b/pkg/atomic/file.go @@ -0,0 +1,86 @@ +package atomic + +import ( + "fmt" + "io" + "os" +) + +type ReadWriterAt interface { + io.ReaderAt + io.WriterAt + Commit() error + Abort() error +} + +type File struct { + original string + info os.FileInfo + tmp *os.File +} + +func OpenFile(name string) (*File, error) { + tmp, err := os.CreateTemp("/tmp", "") + if err != nil { + return nil, fmt.Errorf("atomic: failed to create temporary file: %w", err) + } + + og, err := os.Open(name) + if err != nil && !os.IsNotExist(err) { + return nil, fmt.Errorf("atomic: failed to open target file: %w", err) + } + defer og.Close() + + if _, err := io.Copy(tmp, og); err != nil { + return nil, fmt.Errorf("atomic: failed to copy data into temporary file: %w", err) + } + + info, err := og.Stat() + if err != nil && !os.IsNotExist(err) { + return nil, fmt.Errorf("atomic: failed to obtain file info of target file: %w", err) + } + + // tmp.Chmod + + return &File{ + original: name, + info: info, + tmp: tmp, + }, nil +} + +func (f *File) ReadAt(b []byte, off int64) (n int, err error) { + return f.tmp.ReadAt(b, off) +} + +func (f *File) WriteAt(b []byte, off int64) (n int, err error) { + return f.tmp.WriteAt(b, off) +} + +func (f *File) Commit() error { + defer os.Remove(f.tmp.Name()) + defer f.tmp.Close() + + if err := f.tmp.Sync(); err != nil { + return fmt.Errorf("atomic: failed to flush temporary file: %w", err) + } + if err := f.tmp.Close(); err != nil { + return fmt.Errorf("atomic: failed to close temporary file: %w", err) + } + + if f.info != nil { + if err := f.tmp.Chmod(f.info.Mode()); err != nil { + return fmt.Errorf("atomic: failed to set filemode of temporary file: %w", err) + } + } + + if err := replaceFile("", f.original); err != nil { + return fmt.Errorf("atomic: failed to replace target file: %w", err) + } + return nil +} + +func (f *File) Abort() error { + defer os.Remove(f.tmp.Name()) + return f.tmp.Close() +} diff --git a/pkg/atomic/file_unix.go b/pkg/atomic/file_unix.go new file mode 100644 index 0000000..5b9d1b7 --- /dev/null +++ b/pkg/atomic/file_unix.go @@ -0,0 +1,14 @@ +//go:build !windows + +package atomic + +import ( + "os" +) + +// replaceFile atomically replaces the destination file or directory with the +// source. It is guaranteed to either replaceFile the target file entirely, or not +// change either file. +func replaceFile(source, destination string) error { + return os.Rename(source, destination) +} diff --git a/pkg/atomic/file_win.go b/pkg/atomic/file_win.go new file mode 100644 index 0000000..ccd2d90 --- /dev/null +++ b/pkg/atomic/file_win.go @@ -0,0 +1,35 @@ +//go:build windows + +package atomic + +import ( + "os" + "syscall" +) + +const ( + movefile_replace_existing = 0x1 + movefile_write_through = 0x8 +) + +//sys moveFileEx(lpExistingFileName *uint16, lpNewFileName *uint16, dwFlags uint32) (err error) = MoveFileExW + +// replaceFile atomically replaces the destination file or directory with the +// source. It is guaranteed to either replaceFile the target file entirely, or not +// change either file. +func replaceFile(source, destination string) error { + src, err := syscall.UTF16PtrFromString(source) + if err != nil { + return &os.LinkError{Op: "replace", Old: source, New: destination, Err: err} + } + dest, err := syscall.UTF16PtrFromString(destination) + if err != nil { + return &os.LinkError{Op: "replace", Old: source, New: destination, Err: err} + } + + // see http://msdn.microsoft.com/en-us/library/windows/desktop/aa365240(v=vs.85).aspx + if err := moveFileEx(src, dest, movefile_replace_existing|movefile_write_through); err != nil { + return &os.LinkError{Op: "replace", Old: source, New: destination, Err: err} + } + return nil +} diff --git a/pkg/atomic/zfile_win.go b/pkg/atomic/zfile_win.go new file mode 100644 index 0000000..54db04c --- /dev/null +++ b/pkg/atomic/zfile_win.go @@ -0,0 +1,32 @@ +//go:build windows + +package atomic + +import ( + "syscall" + "unsafe" +) + +var ( + modkernel32 = syscall.NewLazyDLL("kernel32.dll") + + procMoveFileExW = modkernel32.NewProc("MoveFileExW") +) + +func moveFileEx(lpExistingFileName *uint16, lpNewFileName *uint16, dwFlags uint32) (err error) { + r1, _, e1 := syscall.SyscallN( + procMoveFileExW.Addr(), + 3, + uintptr(unsafe.Pointer(lpExistingFileName)), + uintptr(unsafe.Pointer(lpNewFileName)), + uintptr(dwFlags), + ) + if r1 == 0 { + if e1 != 0 { + err = error(e1) + } else { + err = syscall.EINVAL + } + } + return +}