diff --git a/context/context.go b/context/context.go index 5df5bbfd95..3eca5060d4 100644 --- a/context/context.go +++ b/context/context.go @@ -5,11 +5,13 @@ import ( gocontext "context" "fmt" "math/rand" + "net/url" "os" "os/signal" "path/filepath" "runtime" "runtime/pprof" + "strconv" "strings" "sync" "syscall" @@ -21,6 +23,7 @@ import ( "github.com/aptly-dev/aptly/database" "github.com/aptly-dev/aptly/database/etcddb" "github.com/aptly-dev/aptly/database/goleveldb" + "github.com/aptly-dev/aptly/database/ssdb" "github.com/aptly-dev/aptly/deb" "github.com/aptly-dev/aptly/files" "github.com/aptly-dev/aptly/http" @@ -29,6 +32,7 @@ import ( "github.com/aptly-dev/aptly/swift" "github.com/aptly-dev/aptly/task" "github.com/aptly-dev/aptly/utils" + "github.com/seefan/gossdb/v2/conf" "github.com/smira/commander" "github.com/smira/flag" ) @@ -301,6 +305,21 @@ func (context *AptlyContext) _database() (database.Storage, error) { context.database, err = goleveldb.NewDB(dbPath) case "etcd": context.database, err = etcddb.NewDB(context.config().DatabaseBackend.URL) + case "ssdb": + var cfg conf.Config + u, e := url.Parse(context.config().DatabaseBackend.URL) + + if e != nil { + return nil, e + } + cfg.Port, e = strconv.Atoi(u.Port()) + cfg.Host = strings.Split(u.Host, ":")[0] + if e != nil { + return nil, e + } + password, _ := u.User.Password() + cfg.Password = password + context.database, err = ssdb.NewOpenDB(&cfg) default: context.database, err = goleveldb.NewDB(context.dbPath()) } diff --git a/database/ssdb/batch.go b/database/ssdb/batch.go new file mode 100644 index 0000000000..a854092944 --- /dev/null +++ b/database/ssdb/batch.go @@ -0,0 +1,129 @@ +package ssdb + +import ( + "fmt" + + "github.com/aptly-dev/aptly/database" + "github.com/seefan/gossdb/v2/conf" + "github.com/seefan/gossdb/v2/pool" +) + +const ( + delOpt = "del" +) + +type bWriteData struct { + key []byte + value []byte + opts string + err error +} + +type Batch struct { + cfg *conf.Config + // key-value chan + w chan bWriteData + p map[string]interface{} + d []string + db *pool.Client +} + +// func internalOpenBatch... +func internalOpenBatch(_ database.Storage) *Batch { + b := &Batch{ + w: make(chan bWriteData), + p: make(map[string]interface{}), + } + b.run() + + return b +} + +func (b *Batch) run() { + go func() { + for { + select { + case w, ok := <-b.w: + { + if !ok { + ssdbLog("ssdb batch write chan closed") + return + } + + if w.opts == "write" { + ssdbLog("ssdb batch write") + var err error + if len(b.p) > 0 && len(b.d) == 0 { + err = b.db.MultiSet(b.p) + ssdbLog("ssdb batch set errinfo: ", err) + } else if len(b.d) > 0 && len(b.p) == 0 { + err = b.db.MultiDel(b.d...) + ssdbLog("ssdb batch del errinfo: ", err) + } else if len(b.p) == 0 && len(b.d) == 0 { + err = nil + } else { + err = fmt.Errorf("ssdb batch does not support both put and delete operations") + } + ssdbLog("ssdb batch write errinfo: ", err) + b.w <- bWriteData{ + err: err, + } + ssdbLog("ssdb batch write end") + } else { + ssdbLog("ssdb batch", w.opts) + if w.opts == "put" { + b.p[string(w.key)] = w.value + } else if w.opts == delOpt { + b.d = append(b.d, string(w.key)) + } + } + } + } + } + }() +} + +func (b *Batch) stop() { + ssdbLog("ssdb batch stop") + close(b.w) +} + +func (b *Batch) Put(key, value []byte) (err error) { + // err = b.db.Set(string(key), string(value)) + w := bWriteData{ + key: key, + value: value, + opts: "put", + } + + b.w <- w + return nil +} + +func (b *Batch) Delete(key []byte) (err error) { + /* err = b.db.Del(string(key)) + return */ + w := bWriteData{ + key: key, + opts: delOpt, + } + + b.w <- w + return nil +} + +func (b *Batch) Write() (err error) { + defer b.stop() + w := bWriteData{ + opts: "write", + } + + b.w <- w + result := <-b.w + return result.err +} + +// batch should implement database.Batch +var ( + _ database.Batch = &Batch{} +) diff --git a/database/ssdb/database.go b/database/ssdb/database.go new file mode 100644 index 0000000000..1710333f2d --- /dev/null +++ b/database/ssdb/database.go @@ -0,0 +1,62 @@ +package ssdb + +import ( + "os" + "strconv" + + "github.com/aptly-dev/aptly/database" + "github.com/seefan/gossdb/v2" + "github.com/seefan/gossdb/v2/conf" + "github.com/seefan/gossdb/v2/pool" +) + +var defaultBufSize = 102400 +var defaultPoolSize = 1 + +func internalOpen(cfg *conf.Config) (*pool.Client, error) { + ssdbLog("internalOpen") + + cfg.ReadBufferSize = defaultBufSize + cfg.WriteBufferSize = defaultBufSize + cfg.MaxPoolSize = defaultPoolSize + cfg.PoolSize = defaultPoolSize + cfg.MinPoolSize = defaultPoolSize + cfg.MaxWaitSize = 100 * defaultPoolSize + cfg.RetryEnabled = true + + //override by env + if os.Getenv("SSDB_READBUFFERSIZE") != "" { + readBufSize, err := strconv.Atoi(os.Getenv("SSDB_READBUFFERSIZE")) + if err != nil { + cfg.ReadBufferSize = readBufSize + } + } + + if os.Getenv("SSDB_WRITEBUFFERSIZE") != "" { + writeBufSize, err := strconv.Atoi(os.Getenv("SSDB_WRITEBUFFERSIZE")) + if err != nil { + cfg.WriteBufferSize = writeBufSize + } + } + + var cfgs = []*conf.Config{cfg} + err := gossdb.Start(cfgs...) + if err != nil { + return nil, err + } + + return gossdb.NewClient() +} + +func NewDB(cfg *conf.Config) (database.Storage, error) { + return &Storage{cfg: cfg}, nil +} + +func NewOpenDB(cfg *conf.Config) (database.Storage, error) { + db, err := NewDB(cfg) + if err != nil { + return nil, err + } + + return db, db.Open() +} diff --git a/database/ssdb/database_test.go b/database/ssdb/database_test.go new file mode 100644 index 0000000000..139432bfd9 --- /dev/null +++ b/database/ssdb/database_test.go @@ -0,0 +1,274 @@ +package ssdb_test + +import ( + "fmt" + "io/ioutil" + "os" + "os/exec" + "testing" + + "github.com/aptly-dev/aptly/database" + "github.com/aptly-dev/aptly/database/ssdb" + "github.com/seefan/gossdb/v2/conf" + . "gopkg.in/check.v1" +) + +// Launch gocheck tests +func Test(t *testing.T) { + TestingT(t) +} + +func setUpSsdb() error { + setUpStr := ` + #!/bin/bash + if [ ! -e /tmp/ssdb-master/ssdb-master ]; then + mkdir -p /tmp/ssdb-master + wget --no-check-certificate https://github.com/ideawu/ssdb/archive/master.zip -O /tmp/ssdb-master/master.zip + cd /tmp/ssdb-master && unzip master && cd ssdb-master && make all + fi + cd /tmp/ssdb-master/ssdb-master && ./ssdb-server -d ssdb.conf -s restart + sleep 2` + + tmpShell, err := ioutil.TempFile("/tmp", "ssdbSetup") + if err != nil { + return err + } + defer os.Remove(tmpShell.Name()) + + _, err = tmpShell.WriteString(setUpStr) + if err != nil { + return err + } + + cmd := exec.Command("/bin/bash", tmpShell.Name()) + fmt.Println(cmd.String()) + output, err := cmd.Output() + fmt.Println(string(output)) + if err != nil { + return err + } + + return nil +} + +func TestMain(m *testing.M) { + setUpSsdb() + m.Run() +} + +type SSDBSuite struct { + cfg *conf.Config + db database.Storage +} + +var _ = Suite(&SSDBSuite{cfg: &conf.Config{ + Host: "127.0.0.1", + Port: 8888, +}}) + +func (s *SSDBSuite) SetUpTest(c *C) { + var err error + s.db, err = ssdb.NewOpenDB(s.cfg) + c.Assert(err, IsNil) +} + +func (s *SSDBSuite) TestSetUpTest(c *C) { + var err error + s.db, err = ssdb.NewOpenDB(s.cfg) + c.Assert(err, IsNil) +} + +func (s *SSDBSuite) TestGetPut(c *C) { + var ( + key = []byte("key") + value = []byte("value") + ) + var err error + + err = s.db.Put(key, value) + c.Assert(err, IsNil) + + result, err := s.db.Get(key) + c.Assert(err, IsNil) + c.Assert(result, DeepEquals, value) +} + +func (s *SSDBSuite) TestTemporaryDelete(c *C) { + fmt.Println("TestTemporaryDelete") + var ( + key = []byte("key") + value = []byte("value") + ) + + temp, err := s.db.CreateTemporary() + c.Assert(err, IsNil) + + c.Check(temp.HasPrefix([]byte(nil)), Equals, false) + + err = temp.Put(key, value) + c.Assert(err, IsNil) + c.Check(temp.HasPrefix([]byte(nil)), Equals, true) + + c.Assert(temp.Close(), IsNil) + c.Assert(temp.Drop(), IsNil) +} + +func (s *SSDBSuite) TestDelete(c *C) { + var ( + key = []byte("key") + value = []byte("value") + ) + + err := s.db.Put(key, value) + c.Assert(err, IsNil) + + _, err = s.db.Get(key) + c.Assert(err, IsNil) + + err = s.db.Delete(key) + c.Assert(err, IsNil) + +} + +func (s *SSDBSuite) TestByPrefix(c *C) { + //c.Check(s.db.FetchByPrefix([]byte{0x80}), DeepEquals, [][]byte{}) + + s.db.Put([]byte{0x80, 0x01}, []byte{0x01}) + s.db.Put([]byte{0x80, 0x03}, []byte{0x03}) + s.db.Put([]byte{0x80, 0x02}, []byte{0x02}) + c.Check(len(s.db.FetchByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x01}, {0x02}, {0x03}})) + c.Check(len(s.db.KeysByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})) + + s.db.Put([]byte{0x90, 0x01}, []byte{0x04}) + c.Check(len(s.db.FetchByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x01}, {0x02}, {0x03}})) + c.Check(len(s.db.KeysByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})) + + s.db.Put([]byte{0x00, 0x01}, []byte{0x05}) + c.Check(len(s.db.FetchByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x01}, {0x02}, {0x03}})) + c.Check(len(s.db.KeysByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})) + + keys := [][]byte{} + values := [][]byte{} + + c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error { + keys = append(keys, append([]byte(nil), k...)) + values = append(values, append([]byte(nil), v...)) + return nil + }), IsNil) + + c.Check(len(values), DeepEquals, len([][]byte{{0x01}, {0x02}, {0x03}})) + c.Check(len(keys), DeepEquals, len([][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})) + + c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error { + return database.ErrNotFound + }), Equals, database.ErrNotFound) + + c.Check(s.db.ProcessByPrefix([]byte{0xa0}, func(k, v []byte) error { + return database.ErrNotFound + }), IsNil) + + c.Check(s.db.FetchByPrefix([]byte{0xa0}), DeepEquals, [][]byte{}) + c.Check(s.db.KeysByPrefix([]byte{0xa0}), DeepEquals, [][]byte{}) +} + +func (s *SSDBSuite) TestHasPrefix(c *C) { + s.db.Put([]byte{0x80, 0x01}, []byte{0x01}) + + //c.Check(s.db.HasPrefix([]byte("")), Equals, true) + c.Check(s.db.HasPrefix([]byte{0x80}), Equals, true) + c.Check(s.db.HasPrefix([]byte{0x79}), Equals, false) +} + +func (s *SSDBSuite) TestTransactionCommit(c *C) { + var ( + key = []byte("key") + key2 = []byte("key2") + value = []byte("value") + value2 = []byte("value2") + ) + s.db.Delete(key) + s.db.Delete(key2) + transaction, err := s.db.OpenTransaction() + c.Assert(err, IsNil) + defer transaction.Discard() + + err = s.db.Put(key, value) + c.Assert(err, IsNil) + + v, err := s.db.Get(key) + c.Assert(err, IsNil) + c.Check(v, DeepEquals, value) + + err = transaction.Put(key2, value2) + c.Assert(err, IsNil) + v, err = transaction.Get(key2) + c.Check(err, IsNil) + c.Check(v, DeepEquals, value2) + _, err = s.db.Get(key2) + c.Assert(err, ErrorMatches, "key not found") + + err = transaction.Delete(key) + c.Assert(err, IsNil) + _, err = transaction.Get(key) + c.Assert(err, ErrorMatches, "key not found") + v, err = s.db.Get(key) + c.Assert(err, IsNil) + c.Check(v, DeepEquals, value) + + err = transaction.Commit() + c.Check(err, IsNil) + + v, err = s.db.Get(key2) + c.Check(err, IsNil) + c.Check(v, DeepEquals, value2) + + _, err = s.db.Get(key) + c.Assert(err, ErrorMatches, "key not found") +} + +func (s *SSDBSuite) TestBatch(c *C) { + var ( + key = []byte("bkey") + key2 = []byte("bkey2") + value = []byte("bvalue") + value2 = []byte("bvalue2") + ) + + err := s.db.Put(key, value) + c.Check(err, IsNil) + + batch := s.db.CreateBatch() + batch.Put(key2, value2) + v, err := s.db.Get(key) + c.Check(err, IsNil) + c.Check(v, DeepEquals, value) + _, err = s.db.Get(key2) + c.Check(err, ErrorMatches, "key not found") + + err = batch.Write() + c.Check(err, IsNil) + + v, err = s.db.Get(key2) + c.Check(err, IsNil) + c.Check(v, DeepEquals, value2) + + batch = s.db.CreateBatch() + batch.Delete(key) + batch.Delete(key2) + c.Check(err, IsNil) + v, err = s.db.Get(key) + c.Check(err, IsNil) + c.Check(v, DeepEquals, value) + c.Check(err, IsNil) + v, err = s.db.Get(key2) + c.Check(err, IsNil) + c.Check(v, DeepEquals, value2) + + err = batch.Write() + c.Check(err, IsNil) + + _, err = s.db.Get(key2) + c.Check(err, ErrorMatches, "key not found") + _, err = s.db.Get(key) + c.Check(err, ErrorMatches, "key not found") +} diff --git a/database/ssdb/log.go b/database/ssdb/log.go new file mode 100644 index 0000000000..ded044275f --- /dev/null +++ b/database/ssdb/log.go @@ -0,0 +1,12 @@ +package ssdb + +import ( + "fmt" + "os" +) + +func ssdbLog(a ...interface{}) { + if os.Getenv("SSDB_DEBUG") != "" { + fmt.Println(a...) + } +} diff --git a/database/ssdb/storage.go b/database/ssdb/storage.go new file mode 100644 index 0000000000..76b8fb3a63 --- /dev/null +++ b/database/ssdb/storage.go @@ -0,0 +1,183 @@ +package ssdb + +import ( + "os" + + "github.com/aptly-dev/aptly/database" + "github.com/aptly-dev/aptly/database/goleveldb" + "github.com/seefan/gossdb/v2" + "github.com/seefan/gossdb/v2/conf" + "github.com/seefan/gossdb/v2/pool" +) + +type Storage struct { + cfg *conf.Config + db *pool.Client +} + +// CreateTemporary creates new DB of the same type in temp dir +func (s *Storage) CreateTemporary() (database.Storage, error) { + // use leveldb as temp db + tmpPath := os.Getenv("SSDB_TMPDB_PATH") + if tmpPath == "" { + tmpPath = "/tmp/ssdb_tmpdb_path" + } + gdb, err := goleveldb.NewDB(tmpPath) + if err != nil { + return nil, err + } + + return gdb.CreateTemporary() +} + +// Get key value from ssdb +func (s *Storage) Get(key []byte) (value []byte, err error) { + // ssdbLog("ssdb origin db get key:", string(key)) + getResp, err := s.db.Get(string(key)) + if err != nil { + return + } + + value = getResp.Bytes() + + if len(value) == 0 { + err = database.ErrNotFound + return + } + return +} + +// Put saves key to ssdb, if key has the same value in DB already, it is not saved +func (s *Storage) Put(key []byte, value []byte) (err error) { + //ssdbLog("ssdb origin db put key:", string(key), " value: ", string(value)) + err = s.db.Set(string(key), value) + if err != nil { + return + } + return +} + +// Delete removes key from ssdb +func (s *Storage) Delete(key []byte) (err error) { + //ssdbLog("ssdb origin db del key:", string(key)) + err = s.db.Del(string(key)) + if err != nil { + return + } + return +} + +// KeysByPrefix returns all keys that start with prefix +func (s *Storage) KeysByPrefix(prefix []byte) [][]byte { + result := make([][]byte, 0) + getResp, err := s.db.Keys(string(prefix), string(prefix)+"}", -1) + if err != nil { + return nil + } + for _, ev := range getResp { + key := []byte(ev) + keyc := make([]byte, len(key)) + copy(keyc, key) + result = append(result, key) + } + return result +} + +// FetchByPrefix returns all values with keys that start with prefix +func (s *Storage) FetchByPrefix(prefix []byte) [][]byte { + result := make([][]byte, 0) + getResp, err := s.db.Scan(string(prefix), string(prefix)+"}", -1) + if err != nil { + return nil + } + for _, ev := range getResp { + value := ev.Bytes() + valuec := make([]byte, len(value)) + copy(valuec, value) + result = append(result, valuec) + } + return result +} + +// HasPrefix checks whether it can find any key with given prefix and returns true if one exists +func (s *Storage) HasPrefix(prefix []byte) bool { + //ssdbLog("HasPrefix", string(prefix), string(prefix)+"}") + getResp, err := s.db.Keys(string(prefix), string(prefix)+"}", -1) + if err != nil { + return false + } + //ssdbLog("HasPrefix", len(getResp)) + if len(getResp) > 0 { + return true + } + return false +} + +// ProcessByPrefix iterates through all entries where key starts with prefix and calls +// StorageProcessor on key value pair +func (s *Storage) ProcessByPrefix(prefix []byte, proc database.StorageProcessor) error { + getResp, err := s.db.Scan(string(prefix), string(prefix)+"}", -1) + if err != nil { + return err + } + + for k, v := range getResp { + err := proc([]byte(k), v.Bytes()) + if err != nil { + return err + } + } + return nil +} + +// Close finishes ssdb connect +func (s *Storage) Close() error { + ssdbLog("ssdb close") + if s.db != nil { + s.db.Close() + s.db = nil + } + gossdb.Shutdown() + return nil +} + +// Reopen tries to open (re-open) the database +func (s *Storage) Open() error { + ssdbLog("ssdb open") + if s.db != nil && s.db.IsOpen() { + ssdbLog("ssdb opened") + return nil + } + + var err error + s.db, err = internalOpen(s.cfg) + return err +} + +// CreateBatch creates a Batch object +func (s *Storage) CreateBatch() database.Batch { + Batch := internalOpenBatch(s) + Batch.cfg = s.cfg + Batch.db = s.db + return Batch +} + +// OpenTransaction creates new transaction. +func (s *Storage) OpenTransaction() (database.Transaction, error) { + return internalOpenTransaction(s) +} + +// CompactDB compacts database by merging layers +func (s *Storage) CompactDB() error { + return nil +} + +// Drop removes all the ssdb files (DANGEROUS!) +func (s *Storage) Drop() error { + return nil +} + +// Check interface +var ( + _ database.Storage = &Storage{} +) diff --git a/database/ssdb/transaction.go b/database/ssdb/transaction.go new file mode 100644 index 0000000000..a6518f3861 --- /dev/null +++ b/database/ssdb/transaction.go @@ -0,0 +1,188 @@ +package ssdb + +import ( + "fmt" + + "github.com/aptly-dev/aptly/database" +) + +type trWriteData struct { + key []byte + value []byte + opts string + err error +} + +type trReadData struct { + kv []byte + err error +} + +type transaction struct { + // for key-value-operation chan + w chan trWriteData + // key read chan + r chan trReadData + q map[string]trWriteData + t database.Storage +} + +// func internalOpenTransaction... +func internalOpenTransaction(t database.Storage) (*transaction, error) { + tr := &transaction{ + w: make(chan trWriteData), + r: make(chan trReadData), + q: make(map[string]trWriteData), + t: t, + } + + return tr, tr.run() +} + +// func run... +func (t *transaction) run() error { + go func() { + for { + select { + case w, ok := <-t.w: + { + if !ok { + ssdbLog("ssdb transaction write chan closed") + return + } + + if w.opts == "commit" { + ssdbLog("ssdb transaction commit") + var errs []error + for _, vo := range t.q { + if vo.opts == "put" { + err := t.t.Put(vo.key, vo.value) + if err != nil { + //ssdbLog(err) + errs = append(errs, err) + } + } + + if vo.opts == delOpt { + err := t.t.Delete(vo.key) + if err != nil { + errs = append(errs, err) + } + } + } + if len(errs) == 0 { + t.w <- trWriteData{ + err: nil, + } + } else { + t.w <- trWriteData{ + err: fmt.Errorf("ssdb transaction write errs: %v", errs), + } + } + ssdbLog("ssdb transaction commit end") + } else { + ssdbLog("ssdb transaction", w.opts) + //ssdbLog("ssdb r transaction", w.opts, "key: ", string(w.key), "value: ", string(w.value)) + t.q[string(w.key)] = w + } + } + case r, ok := <-t.r: + { + if !ok { + ssdbLog("ssdb transaction read chan closed") + return + } + + if rData, ok := t.q[string(r.kv)]; ok { + if rData.opts == delOpt { + // del return not found error + t.r <- trReadData{ + kv: nil, + err: database.ErrNotFound, + } + } else { + t.r <- trReadData{ + kv: rData.value, + err: nil, + } + } + } else { + v, err := t.t.Get(r.kv) + t.r <- trReadData{ + kv: v, + err: err, + } + } + } + } + } + }() + + return nil +} + +// Get implements database.Reader interface. +func (t *transaction) Get(key []byte) ([]byte, error) { + keyc := make([]byte, len(key)) + copy(keyc, key) + r := trReadData{ + kv: keyc, + err: nil, + } + t.r <- r + result := <-t.r + return result.kv, result.err +} + +// Put implements database.Writer interface. +func (t *transaction) Put(key, value []byte) error { + //ssdbLog("golf*********************ssdb put") + //ssdbLog("ssdb transaction db put key:", string(key), " value: ", string(value)) + keyc := make([]byte, len(key)) + copy(keyc, key) + valuec := make([]byte, len(value)) + copy(valuec, value) + w := trWriteData{ + key: keyc, + value: valuec, + opts: "put", + } + + t.w <- w + return nil +} + +// Delete implements database.Writer interface. +func (t *transaction) Delete(key []byte) error { + //return t.t.Delete(key) + //ssdbLog("golf*********************ssdb del") + keyc := make([]byte, len(key)) + copy(keyc, key) + w := trWriteData{ + key: keyc, + opts: delOpt, + } + + t.w <- w + return nil +} + +func (t *transaction) Commit() error { + w := trWriteData{ + opts: "commit", + } + + t.w <- w + result := <-t.w + return result.err +} + +// Discard is safe to call after Commit(), it would be no-op +func (t *transaction) Discard() { + ssdbLog("ssdb transaction stop") + close(t.r) + close(t.w) +} + +// transaction should implement database.Transaction +var _ database.Transaction = &transaction{}