diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index b977042d..3c8ae4f6 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -15,7 +15,7 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v2 with: - version: v1.51.2 + version: v1.55.2 args: --enable gofmt,gocyclo,goimports,dupl,gosec --timeout 5m --skip-dirs=examples,test --skip-files=.*_test.go$ static-checks: runs-on: ubuntu-latest diff --git a/.github/workflows/local_storage.yml b/.github/workflows/local_storage.yml new file mode 100644 index 00000000..e1907149 --- /dev/null +++ b/.github/workflows/local_storage.yml @@ -0,0 +1,24 @@ +name: Merge check for local +on: [push, pull_request] +jobs: + etcd-with-localstorage: + runs-on: ubuntu-latest + steps: + - name: Set up Go + uses: actions/setup-go@v1 + with: + go-version: 1.18 + id: go + - name: Check out code into the Go module directory + uses: actions/checkout@v1 + - name: UT for etcd with local storage + run: | + time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379 + while ! nc -z 127.0.0.1 2379; do + sleep 1 + done + export TEST_DB_KIND=etcd_with_localstorage + export TEST_DB_URI=127.0.0.1:2379 + export TEST_KVS_ROOT_PATH=/data/kvs + sudo rm -rf /data/kvs + sudo time go test $(go list ./... | grep -v mongo | grep -v third_party | grep -v examples) \ No newline at end of file diff --git a/cmd/kieserver/main.go b/cmd/kieserver/main.go index f9936436..b5045cc3 100644 --- a/cmd/kieserver/main.go +++ b/cmd/kieserver/main.go @@ -35,6 +35,7 @@ import ( //storage _ "github.com/apache/servicecomb-kie/server/datasource/etcd" + _ "github.com/apache/servicecomb-kie/server/datasource/local" _ "github.com/apache/servicecomb-kie/server/datasource/mongo" //quota management diff --git a/examples/dev/kie-conf.yaml b/examples/dev/kie-conf.yaml index 11b91067..56aff3a3 100644 --- a/examples/dev/kie-conf.yaml +++ b/examples/dev/kie-conf.yaml @@ -1,6 +1,9 @@ db: - # kind can be mongo, etcd, embedded_etcd + # kind can be mongo, etcd, embedded_etcd, embedded_etcd_with_localstorage, etcd_with_localstorage kind: embedded_etcd + +# localFilePath: is the root path to store local kv files +# uri: http://127.0.0.1:2379 # uri is the db endpoints list # kind=mongo, then is the mongodb cluster's uri, e.g. mongodb://127.0.0.1:27017/kie # kind=etcd, then is the remote etcd server's advertise-client-urls, e.g. http://127.0.0.1:2379 diff --git a/scripts/start.sh b/scripts/start.sh index 8e0b662f..c25f7835 100755 --- a/scripts/start.sh +++ b/scripts/start.sh @@ -73,6 +73,7 @@ EOM db: kind: ${db_type} uri: ${uri} + localFilePath: ${KVS_ROOT_PATH} EOM } diff --git a/server/config/struct.go b/server/config/struct.go index 83e91f08..3047ded6 100644 --- a/server/config/struct.go +++ b/server/config/struct.go @@ -41,11 +41,12 @@ type TLS struct { // DB is yaml file struct to set persistent config type DB struct { - TLS `yaml:",inline" json:",inline"` - URI string `yaml:"uri" json:"uri,omitempty"` - Kind string `yaml:"kind" json:"kind,omitempty"` - PoolSize int `yaml:"poolSize" json:"pool_size,omitempty"` - Timeout string `yaml:"timeout" json:"timeout,omitempty"` + TLS `yaml:",inline" json:",inline"` + URI string `yaml:"uri" json:"uri,omitempty"` + Kind string `yaml:"kind" json:"kind,omitempty"` + LocalFilePath string `yaml:"localFilePath" json:"local_file_path,omitempty"` + PoolSize int `yaml:"poolSize" json:"pool_size,omitempty"` + Timeout string `yaml:"timeout" json:"timeout,omitempty"` } // RBAC is rbac config diff --git a/server/datasource/dao.go b/server/datasource/dao.go index bce146b1..a4cc01eb 100644 --- a/server/datasource/dao.go +++ b/server/datasource/dao.go @@ -23,10 +23,9 @@ import ( "errors" "fmt" + "github.com/apache/servicecomb-kie/pkg/model" "github.com/apache/servicecomb-kie/server/datasource/rbac" "github.com/go-chassis/openlog" - - "github.com/apache/servicecomb-kie/pkg/model" ) var ( @@ -120,9 +119,12 @@ type ViewDao interface { func Init(kind string) error { var err error f, ok := plugins[kind] + if !ok { + openlog.Info(fmt.Sprintf("do not support '%s'", kind)) return fmt.Errorf("do not support '%s'", kind) } + dbc := &Config{} if b, err = f(dbc); err != nil { return err diff --git a/server/datasource/etcd/kv/kv_cache.go b/server/datasource/etcd/kv/kv_cache.go index 61d017b2..7cf65297 100644 --- a/server/datasource/etcd/kv/kv_cache.go +++ b/server/datasource/etcd/kv/kv_cache.go @@ -9,15 +9,16 @@ import ( "sync" "time" - "github.com/apache/servicecomb-kie/pkg/model" - "github.com/apache/servicecomb-kie/pkg/stringutil" - "github.com/apache/servicecomb-kie/server/datasource" - "github.com/apache/servicecomb-kie/server/datasource/etcd/key" "github.com/go-chassis/foundation/backoff" "github.com/go-chassis/openlog" "github.com/little-cui/etcdadpt" goCache "github.com/patrickmn/go-cache" "go.etcd.io/etcd/api/v3/mvccpb" + + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/pkg/stringutil" + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/apache/servicecomb-kie/server/datasource/etcd/key" ) func Init() { @@ -35,8 +36,6 @@ const ( backOffMinInterval = 5 * time.Second ) -type IDSet map[string]struct{} - type Cache struct { timeOut time.Duration client etcdadpt.Client @@ -158,11 +157,13 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) { cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels) m, ok := kc.LoadKvIDSet(cacheKey) if !ok { - kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}}) + z := &sync.Map{} + z.Store(kvDoc.ID, struct{}{}) + kc.StoreKvIDSet(cacheKey, z) openlog.Info("cacheKey " + cacheKey + "not exists") continue } - m[kvDoc.ID] = struct{}{} + m.Store(kvDoc.ID, struct{}{}) } } @@ -180,23 +181,23 @@ func (kc *Cache) cacheDelete(rsp *etcdadpt.Response) { openlog.Error("cacheKey " + cacheKey + "not exists") continue } - delete(m, kvDoc.ID) + m.Delete(kvDoc.ID) } } -func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) { +func (kc *Cache) LoadKvIDSet(cacheKey string) (*sync.Map, bool) { val, ok := kc.kvIDCache.Load(cacheKey) if !ok { return nil, false } - kvIds, ok := val.(IDSet) + kvIds, ok := val.(*sync.Map) if !ok { return nil, false } return kvIds, true } -func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) { +func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds *sync.Map) { kc.kvIDCache.Store(cacheKey, kvIds) } @@ -220,9 +221,9 @@ func (kc *Cache) DeleteKvDoc(kvID string) { kc.kvDocCache.Delete(kvID) } -func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool) { +func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) { if !req.Opts.ExactLabels { - return nil, false + return nil, false, nil } openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts)) @@ -232,22 +233,25 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool) cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels) kvIds, ok := kvCache.LoadKvIDSet(cacheKey) if !ok { - kvCache.StoreKvIDSet(cacheKey, IDSet{}) - return result, true + kvCache.StoreKvIDSet(cacheKey, &sync.Map{}) + return result, true, nil } var docs []*model.KVDoc var kvIdsLeft []string - for kvID := range kvIds { - if doc, ok := kvCache.LoadKvDoc(kvID); ok { + kvIds.Range(func(kvID, value any) bool { + if doc, ok := kvCache.LoadKvDoc(kvID.(string)); ok { docs = append(docs, doc) - continue + } else { + kvIdsLeft = append(kvIdsLeft, kvID.(string)) } - kvIdsLeft = append(kvIdsLeft, kvID) + return true + }) + tpData, err := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft) + if err != nil { + return nil, true, err } - - tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft) docs = append(docs, tpData...) for _, doc := range docs { @@ -257,17 +261,18 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool) } } result.Total = len(result.Data) - return result, true + return result, true, nil } -func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc { +func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) ([]*model.KVDoc, error) { if len(kvIdsLeft) == 0 { - return nil + return nil, nil } openlog.Debug("get kv from etcd by kvId") wg := sync.WaitGroup{} docs := make([]*model.KVDoc, len(kvIdsLeft)) + var getKvErr error for i, kvID := range kvIdsLeft { wg.Add(1) go func(kvID string, cnt int) { @@ -277,12 +282,14 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe kv, err := etcdadpt.Get(ctx, docKey) if err != nil { openlog.Error(fmt.Sprintf("failed to get kv from etcd, err %v", err)) + getKvErr = err return } doc, err := kc.GetKvDoc(kv) if err != nil { openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err)) + getKvErr = err return } @@ -291,7 +298,10 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe }(kvID, i) } wg.Wait() - return docs + if getKvErr != nil { + return nil, getKvErr + } + return docs, nil } func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool { diff --git a/server/datasource/etcd/kv/kv_dao.go b/server/datasource/etcd/kv/kv_dao.go index 2332b586..d7260dfd 100644 --- a/server/datasource/etcd/kv/kv_dao.go +++ b/server/datasource/etcd/kv/kv_dao.go @@ -524,15 +524,18 @@ func (s *Dao) listData(ctx context.Context, project, domain string, options ...d } if Enabled() { - result, useCache := Search(ctx, &CacheSearchReq{ + result, useCache, err := Search(ctx, &CacheSearchReq{ Domain: domain, Project: project, Opts: &opts, Regex: regex, }) - if useCache { + if useCache && err == nil { return result, opts, nil } + if useCache && err != nil { + openlog.Error("using cache to search kv failed: " + err.Error()) + } } result, err := matchLabelsSearch(ctx, domain, project, regex, opts) diff --git a/server/datasource/kv_dao_test.go b/server/datasource/kv_dao_test.go index 977944db..a473c5f1 100644 --- a/server/datasource/kv_dao_test.go +++ b/server/datasource/kv_dao_test.go @@ -150,7 +150,7 @@ func TestWithSync(t *testing.T) { Project: "sync-create", ResourceType: datasource.ConfigResource, } - tombstones, tempErr := tombstone.List(ctx, &tbListReq) + tombstones, _ := tombstone.List(ctx, &tbListReq) assert.Equal(t, 1, len(tombstones)) tempErr = tombstone.Delete(ctx, tombstones...) assert.Nil(t, tempErr) @@ -222,7 +222,7 @@ func TestWithSync(t *testing.T) { Project: "sync-update", ResourceType: datasource.ConfigResource, } - tombstones, tempErr := tombstone.List(ctx, &tbListReq) + tombstones, _ := tombstone.List(ctx, &tbListReq) assert.Equal(t, 2, len(tombstones)) tempErr = tombstone.Delete(ctx, tombstones...) assert.Nil(t, tempErr) diff --git a/server/datasource/local/counter/revision.go b/server/datasource/local/counter/revision.go new file mode 100644 index 00000000..756b79f3 --- /dev/null +++ b/server/datasource/local/counter/revision.go @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package counter + +import ( + "context" + "os" + "path" + "strconv" + + "github.com/apache/servicecomb-kie/server/datasource/local/file" + "github.com/go-chassis/openlog" +) + +// Dao is the implementation +type Dao struct { +} + +// GetRevision return current revision number +func (s *Dao) GetRevision(ctx context.Context, domain string) (int64, error) { + revisionPath := path.Join(file.FileRootPath, domain, "revision") + + revisionByte, err := file.ReadFile(revisionPath) + + if err != nil { + if os.IsNotExist(err) { + return 0, nil + } + openlog.Error("get error: " + err.Error()) + return 0, err + } + if revisionByte == nil || string(revisionByte) == "" { + return 0, nil + } + + revisionNum, err := strconv.Atoi(string(revisionByte)) + if err != nil { + return 0, err + } + return int64(revisionNum), nil +} + +// ApplyRevision increase revision number and return modified value +func (s *Dao) ApplyRevision(ctx context.Context, domain string) (int64, error) { + currentRevisionNum, err := s.GetRevision(ctx, domain) + if err != nil { + return 0, err + } + err = file.CreateOrUpdateFile(path.Join(file.FileRootPath, domain, "revision"), []byte(strconv.Itoa(int(currentRevisionNum+1))), &[]file.FileDoRecord{}, false) + if err != nil { + return 0, err + } + return currentRevisionNum + 1, nil +} diff --git a/server/datasource/local/file/fileprocess.go b/server/datasource/local/file/fileprocess.go new file mode 100644 index 00000000..30fa486e --- /dev/null +++ b/server/datasource/local/file/fileprocess.go @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package file + +import ( + "io/fs" + "os" + "path" + "path/filepath" + "strings" + "sync" + + log "github.com/go-chassis/openlog" +) + +var FileRootPath = "/data/kvs" + +var NewstKVFile = "newest_version.json" + +var MutexMap = make(map[string]*sync.RWMutex) +var mutexMapLock = &sync.Mutex{} +var rollbackMutexLock = &sync.Mutex{} +var createDirMutexLock = &sync.Mutex{} + +type SchemaDAO struct{} + +type FileDoRecord struct { + filepath string + content []byte +} + +func GetOrCreateMutex(path string) *sync.RWMutex { + mutexMapLock.Lock() + mutex, ok := MutexMap[path] + if !ok { + mutex = &sync.RWMutex{} + MutexMap[path] = mutex + } + mutexMapLock.Unlock() + + return mutex +} + +func ExistDir(path string) error { + _, err := os.ReadDir(path) + if err != nil { + // create the dir if not exist + if os.IsNotExist(err) { + createDirMutexLock.Lock() + defer createDirMutexLock.Unlock() + err = os.MkdirAll(path, fs.ModePerm) + if err != nil { + log.Error("failed to make dir: " + path + " " + err.Error()) + + return err + } + return nil + } + log.Error("failed to read dir: " + path + " " + err.Error()) + return err + } + + return nil +} + +func MoveDir(srcDir string, dstDir string) (err error) { + srcMutex := GetOrCreateMutex(srcDir) + dstMutex := GetOrCreateMutex(dstDir) + srcMutex.Lock() + dstMutex.Lock() + defer srcMutex.Unlock() + defer dstMutex.Unlock() + + var movedFiles []string + files, err := os.ReadDir(srcDir) + if err != nil { + log.Error("move schema files failed " + err.Error()) + return err + } + for _, file := range files { + err = ExistDir(dstDir) + if err != nil { + log.Error("move schema files failed " + err.Error()) + return err + } + srcFile := filepath.Join(srcDir, file.Name()) + dstFile := filepath.Join(dstDir, file.Name()) + err = os.Rename(srcFile, dstFile) + if err != nil { + log.Error("move schema files failed " + err.Error()) + break + } + movedFiles = append(movedFiles, file.Name()) + } + + if err != nil { + log.Error("Occur error when move schema files, begain rollback... " + err.Error()) + for _, fileName := range movedFiles { + srcFile := filepath.Join(srcDir, fileName) + dstFile := filepath.Join(dstDir, fileName) + err = os.Rename(dstFile, srcFile) + if err != nil { + log.Error("rollback move schema files failed and continue" + err.Error()) + } + } + } + return err +} + +func CreateOrUpdateFile(filepath string, content []byte, rollbackOperations *[]FileDoRecord, isRollback bool) error { + err := ExistDir(path.Dir(filepath)) + + if !isRollback { + mutex := GetOrCreateMutex(path.Dir(filepath)) + mutex.Lock() + defer mutex.Unlock() + } + + if err != nil { + log.Error("failed to build new schema file dir " + filepath + ", " + err.Error()) + return err + } + + fileExist := true + _, err = os.Stat(filepath) + if err != nil { + fileExist = false + } + + if fileExist { + oldcontent, err := os.ReadFile(filepath) + if err != nil { + log.Error("failed to read content to file " + filepath + ", " + err.Error()) + return err + } + *rollbackOperations = append(*rollbackOperations, FileDoRecord{filepath: filepath, content: oldcontent}) + } else { + *rollbackOperations = append(*rollbackOperations, FileDoRecord{filepath: filepath, content: nil}) + } + + err = os.WriteFile(filepath, content, 0600) + if err != nil { + log.Error("failed to create file " + filepath + ", " + err.Error()) + return err + } + return nil +} + +func DeleteFile(filepath string, rollbackOperations *[]FileDoRecord) error { + _, err := os.Stat(filepath) + if err != nil { + log.Error("file does not exist when deleting file " + filepath + ", " + err.Error()) + return nil + } + + oldcontent, err := os.ReadFile(filepath) + if err != nil { + log.Error("failed to read content to file " + filepath + ", " + err.Error()) + return err + } + + *rollbackOperations = append(*rollbackOperations, FileDoRecord{filepath: filepath, content: oldcontent}) + + err = os.Remove(filepath) + if err != nil { + log.Error("failed to delete file " + filepath + ", " + err.Error()) + return err + } + return nil +} + +func CleanDir(dir string) error { + mutex := GetOrCreateMutex(dir) + mutex.Lock() + defer delete(MutexMap, dir) + defer mutex.Unlock() + + rollbackOperations := []FileDoRecord{} + _, err := os.Stat(dir) + if err != nil { + return nil + } + + files, err := os.ReadDir(dir) + if err != nil { + return nil + } + + for _, file := range files { + if file.IsDir() { + continue + } + filepath := filepath.Join(dir, file.Name()) + err = DeleteFile(filepath, &rollbackOperations) + if err != nil { + break + } + } + + if err != nil { + log.Error("Occur error when create schema files, begain rollback... " + err.Error()) + Rollback(rollbackOperations) + return err + } + + err = os.Remove(dir) + if err != nil { + log.Error("OOccur error when remove service schema dir, begain rollback... " + err.Error()) + Rollback(rollbackOperations) + return err + } + + return nil +} + +func ReadFile(filepath string) ([]byte, error) { + // check the file is empty + mutex := GetOrCreateMutex(path.Dir(filepath)) + mutex.RLocker() + defer mutex.RLocker() + + content, err := os.ReadFile(filepath) + if err != nil { + log.Error("failed to read content to file " + filepath + ", " + err.Error()) + return nil, err + } + return content, nil +} + +func CountInDomain(dir string) (int, error) { + mutex := GetOrCreateMutex(dir) + mutex.RLock() + defer mutex.RUnlock() + + files, err := os.ReadDir(dir) + if err != nil { + log.Error("failed to read directory " + dir + ", " + err.Error()) + return 0, err + } + + count := 0 + for _, projectFolder := range files { + if projectFolder.IsDir() { + count++ + } + } + // count kv numbers + return count, nil +} + +func ReadAllKvsFromProjectFolder(dir string) ([][]byte, error) { + var kvs [][]byte + + kvDir, err := os.ReadDir(dir) + if err != nil { + log.Error("failed to read directory " + dir + ", " + err.Error()) + return nil, err + } + + for _, file := range kvDir { + if file.IsDir() { + filepath := path.Join(dir, file.Name(), NewstKVFile) + content, err := ReadFile(filepath) + if err != nil { + log.Error("failed to read content to file " + filepath + ", " + err.Error()) + return nil, err + } + kvs = append(kvs, content) + } + } + return kvs, nil +} + +func ReadAllFiles(dir string) ([]string, [][]byte, error) { + mutex := GetOrCreateMutex(dir) + mutex.RLock() + defer mutex.RUnlock() + + files := []string{} + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + if !strings.Contains(path, NewstKVFile) { + files = append(files, path) + } + return nil + }) + + if err != nil { + return nil, nil, err + } + + var contentArray [][]byte + + for _, file := range files { + content, err := os.ReadFile(file) + if err != nil { + log.Error("failed to read content from schema file " + file + ", " + err.Error()) + return nil, nil, err + } + contentArray = append(contentArray, content) + } + return files, contentArray, nil +} + +func Rollback(rollbackOperations []FileDoRecord) { + rollbackMutexLock.Lock() + defer rollbackMutexLock.Unlock() + + var err error + for _, fileOperation := range rollbackOperations { + if fileOperation.content == nil { + err = DeleteFile(fileOperation.filepath, &[]FileDoRecord{}) + } else { + err = CreateOrUpdateFile(fileOperation.filepath, fileOperation.content, &[]FileDoRecord{}, true) + } + if err != nil { + log.Error("Occur error when rolling back schema files: " + err.Error()) + } + } +} diff --git a/server/datasource/local/history/history_dao.go b/server/datasource/local/history/history_dao.go new file mode 100644 index 00000000..5766beb8 --- /dev/null +++ b/server/datasource/local/history/history_dao.go @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package history + +import ( + "context" + "encoding/json" + "path" + "strconv" + + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/apache/servicecomb-kie/server/datasource/auth" + "github.com/apache/servicecomb-kie/server/datasource/local/file" + "github.com/go-chassis/openlog" +) + +// Dao is the implementation +type Dao struct { +} + +// GetHistory get all history by label id +func (s *Dao) GetHistory(ctx context.Context, kvID, project, domain string, options ...datasource.FindOption) (*model.KVResponse, error) { + kvreq := &model.GetKVRequest{ + Domain: domain, + Project: project, + ID: kvID, + } + kvdoc, err := datasource.GetBroker().GetKVDao().Get(ctx, kvreq) + if err != nil { + return nil, err + } + if err := auth.CheckGetKV(ctx, kvdoc); err != nil { + return nil, err + } + + opts := datasource.FindOptions{} + for _, o := range options { + o(&opts) + } + kvFolderPath := path.Join(file.FileRootPath, domain, project, kvID) + _, kvs, err := file.ReadAllFiles(kvFolderPath) + + if err != nil { + openlog.Error(err.Error()) + return nil, err + } + histories := make([]*model.KVDoc, 0, len(kvs)) + for _, kv := range kvs { + var doc model.KVDoc + err := json.Unmarshal(kv, &doc) + if err != nil { + openlog.Error("decode error: " + err.Error()) + continue + } + histories = append(histories, &doc) + } + return &model.KVResponse{ + Data: pagingResult(histories, opts.Offset, opts.Limit), + Total: len(kvs), + }, nil +} + +func pagingResult(histories []*model.KVDoc, offset, limit int64) []*model.KVDoc { + total := int64(len(histories)) + if limit != 0 && offset >= total { + return []*model.KVDoc{} + } + + datasource.ReverseByPriorityAndUpdateRev(histories) + + if limit == 0 { + return histories + } + end := offset + limit + if end > total { + end = total + } + return histories[offset:end] +} + +// AddHistory add kv history +func (s *Dao) AddHistory(ctx context.Context, kv *model.KVDoc) error { + err := s.historyRotate(ctx, kv.ID, kv.Project, kv.Domain) + return err +} + +// DelayDeletionTime add delete time to all revisions of the kv, +// thus these revisions will be automatically deleted by TTL index. +// TODO support delay deletion +func (s *Dao) DelayDeletionTime(ctx context.Context, kvIDs []string, project, domain string) error { + // history have been deleted in function and in kv_dao.go + return nil +} + +// historyRotate delete historical versions for a key that exceeds the limited number +func (s *Dao) historyRotate(ctx context.Context, kvID, project, domain string) error { + resp, err := s.GetHistory(ctx, kvID, project, domain) + if err != nil { + openlog.Error(err.Error()) + return err + } + if resp.Total <= datasource.MaxHistoryNum { + return nil + } + kvs := resp.Data + kvs = kvs[datasource.MaxHistoryNum:] + + mutex := file.GetOrCreateMutex(path.Join(file.FileRootPath, domain, project, kvID)) + mutex.Lock() + defer mutex.Unlock() + + for _, kv := range kvs { + revision := kv.UpdateRevision + revisionFilePath := path.Join(file.FileRootPath, domain, project, kvID, strconv.FormatInt(revision, 10)+".json") + + err = file.DeleteFile(revisionFilePath, &[]file.FileDoRecord{}) + if err != nil { + openlog.Error(err.Error()) + return err + } + } + return nil +} diff --git a/server/datasource/local/init.go b/server/datasource/local/init.go new file mode 100644 index 00000000..e78c1f32 --- /dev/null +++ b/server/datasource/local/init.go @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package local + +import ( + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/apache/servicecomb-kie/server/datasource/local/counter" + "github.com/apache/servicecomb-kie/server/datasource/local/history" + "github.com/apache/servicecomb-kie/server/datasource/local/kv" + "github.com/apache/servicecomb-kie/server/datasource/local/rbac" + "github.com/apache/servicecomb-kie/server/datasource/local/track" + rbacdao "github.com/apache/servicecomb-kie/server/datasource/rbac" +) + +type Broker struct { +} + +func NewFrom(c *datasource.Config) (datasource.Broker, error) { + kv.Init() + return &Broker{}, nil +} +func (*Broker) GetRevisionDao() datasource.RevisionDao { + return &counter.Dao{} +} +func (*Broker) GetKVDao() datasource.KVDao { + return &kv.Dao{} +} +func (*Broker) GetHistoryDao() datasource.HistoryDao { + return &history.Dao{} +} +func (*Broker) GetTrackDao() datasource.TrackDao { + return &track.Dao{} +} +func (*Broker) GetRbacDao() rbacdao.Dao { + return &rbac.Dao{} +} + +func init() { + datasource.RegisterPlugin("etcd_with_localstorage", NewFrom) + datasource.RegisterPlugin("embedded_etcd_with_localstorage", NewFrom) +} diff --git a/server/datasource/local/kv/kv_cache.go b/server/datasource/local/kv/kv_cache.go new file mode 100644 index 00000000..3eb843b0 --- /dev/null +++ b/server/datasource/local/kv/kv_cache.go @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kv + +import ( + "context" + "encoding/json" + "fmt" + "regexp" + "strings" + "sync" + "time" + + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/pkg/stringutil" + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/go-chassis/openlog" + goCache "github.com/patrickmn/go-cache" + "go.etcd.io/etcd/api/v3/mvccpb" +) + +type IDSet map[string]struct{} + +func Init() { + kvCache = NewKvCache() + go kvCache.Refresh(context.Background()) +} + +const ( + cacheExpirationTime = 10 * time.Minute + cacheCleanupInterval = 11 * time.Minute + backOffMinInterval = 5 * time.Second +) + +var kvCache *Cache + +type CacheSearchReq struct { + Domain string + Project string + Opts *datasource.FindOptions + Regex *regexp.Regexp +} + +type Cache struct { + revision int64 + kvIDCache sync.Map + kvDocCache *goCache.Cache +} + +func NewKvCache() *Cache { + kvDocCache := goCache.New(cacheExpirationTime, cacheCleanupInterval) + return &Cache{ + revision: 0, + kvDocCache: kvDocCache, + } +} + +func (kc *Cache) Refresh(ctx context.Context) { + openlog.Info("start to list and watch") + + timer := time.NewTimer(backOffMinInterval) + defer timer.Stop() + for { + nextPeriod := backOffMinInterval + select { + case <-ctx.Done(): + openlog.Info("stop to list and watch") + return + case <-timer.C: + timer.Reset(nextPeriod) + } + } +} + +func (kc *Cache) GetKvDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) { + kvDoc := &model.KVDoc{} + err := json.Unmarshal(kv.Value, kvDoc) + if err != nil { + return nil, err + } + return kvDoc, nil +} + +func (kc *Cache) GetCacheKey(domain, project string, labels map[string]string) string { + labelFormat := stringutil.FormatMap(labels) + inputKey := strings.Join([]string{ + "", + domain, + project, + labelFormat, + }, "/") + return inputKey +} + +func (kc *Cache) StoreKvDoc(kvID string, kvDoc *model.KVDoc) { + kc.kvDocCache.SetDefault(kvID, kvDoc) +} + +func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) { + kc.kvIDCache.Store(cacheKey, kvIds) +} + +func (kc *Cache) DeleteKvDoc(kvID string) { + kc.kvDocCache.Delete(kvID) +} + +func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) { + val, ok := kc.kvIDCache.Load(cacheKey) + if !ok { + return nil, false + } + kvIds, ok := val.(IDSet) + if !ok { + return nil, false + } + return kvIds, true +} + +func (kc *Cache) LoadKvDoc(kvID string) (*model.KVDoc, bool) { + val, ok := kc.kvDocCache.Get(kvID) + if !ok { + return nil, false + } + doc, ok := val.(*model.KVDoc) + if !ok { + return nil, false + } + return doc, true +} + +func (kc *Cache) CachePut(kvs []*model.KVDoc) { + for _, kvDoc := range kvs { + kc.StoreKvDoc(kvDoc.ID, kvDoc) + cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels) + m, ok := kc.LoadKvIDSet(cacheKey) + if !ok { + kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}}) + openlog.Info("cacheKey " + cacheKey + "not exists") + continue + } + m[kvDoc.ID] = struct{}{} + } +} + +func (kc *Cache) CacheDelete(kvs []*model.KVDoc) { + for _, kvDoc := range kvs { + kc.DeleteKvDoc(kvDoc.ID) + cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels) + m, ok := kc.LoadKvIDSet(cacheKey) + if !ok { + openlog.Error("cacheKey " + cacheKey + "not exists") + continue + } + delete(m, kvDoc.ID) + } +} + +func Search(req *CacheSearchReq) (*model.KVResponse, bool, []string) { + if !req.Opts.ExactLabels { + return nil, false, nil + } + + openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts)) + result := &model.KVResponse{ + Data: []*model.KVDoc{}, + } + cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels) + kvIds, ok := kvCache.LoadKvIDSet(cacheKey) + if !ok { + kvCache.StoreKvIDSet(cacheKey, IDSet{}) + return result, true, nil + } + + var docs []*model.KVDoc + + var kvIdsInCache []string + for kvID := range kvIds { + if doc, ok := kvCache.LoadKvDoc(kvID); ok { + docs = append(docs, doc) + kvIdsInCache = append(kvIdsInCache, kvID) + continue + } + } + + for _, doc := range docs { + if isMatch(req, doc) { + bytes, _ := json.Marshal(doc) + var docDeepCopy model.KVDoc + err := json.Unmarshal(bytes, &docDeepCopy) + if err != nil { + return nil, false, nil + } + datasource.ClearPart(&docDeepCopy) + result.Data = append(result.Data, &docDeepCopy) + } + } + result.Total = len(result.Data) + return result, true, kvIdsInCache +} + +func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool { + if doc == nil { + return false + } + if req.Opts.Status != "" && doc.Status != req.Opts.Status { + return false + } + if req.Regex != nil && !req.Regex.MatchString(doc.Key) { + return false + } + if req.Opts.Value != "" && !strings.Contains(doc.Value, req.Opts.Value) { + return false + } + return true +} diff --git a/server/datasource/local/kv/kv_dao.go b/server/datasource/local/kv/kv_dao.go new file mode 100644 index 00000000..f16ec090 --- /dev/null +++ b/server/datasource/local/kv/kv_dao.go @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kv + +import ( + "context" + "encoding/json" + "os" + "path" + "regexp" + "strconv" + "strings" + + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/pkg/util" + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/apache/servicecomb-kie/server/datasource/auth" + "github.com/apache/servicecomb-kie/server/datasource/local/file" + "github.com/go-chassis/openlog" +) + +// Dao operate data in local +type Dao struct { +} + +func (s *Dao) Create(ctx context.Context, kv *model.KVDoc, options ...datasource.WriteOption) (*model.KVDoc, error) { + if err := auth.CheckCreateKV(ctx, kv); err != nil { + return nil, err + } + + err := create(kv) + if err != nil { + openlog.Error("create error", openlog.WithTags(openlog.Tags{ + "err": err.Error(), + "kv": kv, + })) + return nil, err + } + kvCache.CachePut([]*model.KVDoc{kv}) + return kv, nil +} + +func create(kv *model.KVDoc) (err error) { + data, _ := json.Marshal(&kv) + rollbackOperations := []file.FileDoRecord{} + + defer func() { + if err != nil { + file.Rollback(rollbackOperations) + } + }() + + err = file.CreateOrUpdateFile(path.Join(file.FileRootPath, kv.Domain, kv.Project, kv.ID, strconv.FormatInt(kv.UpdateRevision, 10)+".json"), data, &rollbackOperations, false) + if err != nil { + return err + } + + err = file.CreateOrUpdateFile(path.Join(file.FileRootPath, kv.Domain, kv.Project, kv.ID, file.NewstKVFile), data, &rollbackOperations, false) + return err +} + +// Update update key value +func (s *Dao) Update(ctx context.Context, kv *model.KVDoc, options ...datasource.WriteOption) error { + kvpath := path.Join(file.FileRootPath, kv.Domain, kv.Project, kv.ID, file.NewstKVFile) + kvInfo, err := file.ReadFile(kvpath) + if err != nil { + openlog.Error(err.Error()) + return err + } + if kvInfo == nil { + return datasource.ErrKeyNotExists + } + var oldKV model.KVDoc + err = json.Unmarshal(kvInfo, &oldKV) + if err != nil { + openlog.Error(err.Error()) + return err + } + + if err := auth.CheckUpdateKV(ctx, &oldKV); err != nil { + return err + } + + oldKV.LabelFormat = kv.LabelFormat + oldKV.Value = kv.Value + oldKV.Status = kv.Status + oldKV.Checker = kv.Checker + oldKV.UpdateTime = kv.UpdateTime + oldKV.UpdateRevision = kv.UpdateRevision + + err = create(kv) + if err != nil { + openlog.Error(err.Error()) + return err + } + + kvCache.CachePut([]*model.KVDoc{kv}) + return nil +} + +// Extract key values +func getValue(str string) string { + rex := regexp.MustCompile(`\(([^)]+)\)`) + res := rex.FindStringSubmatch(str) + return res[len(res)-1] +} + +// Exist supports you query a key value by label map or labels id +func (s *Dao) Exist(ctx context.Context, key, project, domain string, options ...datasource.FindOption) (bool, error) { + opts := datasource.FindOptions{Key: key} + for _, o := range options { + o(&opts) + } + kvs, err := s.listNoAuth(ctx, project, domain, + datasource.WithExactLabels(), + datasource.WithLabels(opts.Labels), + datasource.WithLabelFormat(opts.LabelFormat), + datasource.WithKey(key), + datasource.WithCaseSensitive()) + if err != nil { + openlog.Error("check kv exist: " + err.Error()) + return false, err + } + if IsUniqueFind(opts) && len(kvs.Data) == 0 { + return false, nil + } + if len(kvs.Data) != 1 { + return false, datasource.ErrTooMany + } + return true, nil +} + +func (s *Dao) GetByKey(ctx context.Context, key, project, domain string, options ...datasource.FindOption) ([]*model.KVDoc, error) { + opts := datasource.FindOptions{Key: key} + for _, o := range options { + o(&opts) + } + kvs, err := s.listNoAuth(ctx, project, domain, + datasource.WithExactLabels(), + datasource.WithLabels(opts.Labels), + datasource.WithLabelFormat(opts.LabelFormat), + datasource.WithKey(key), + datasource.WithCaseSensitive()) + if err != nil { + openlog.Error("check kv exist: " + err.Error()) + return nil, err + } + if IsUniqueFind(opts) && len(kvs.Data) == 0 { + return nil, datasource.ErrKeyNotExists + } + if len(kvs.Data) != 1 { + return nil, datasource.ErrTooMany + } + return kvs.Data, nil +} + +// FindOneAndDelete deletes one kv by id and return the deleted kv as these appeared before deletion +// domain=tenant +func (s *Dao) FindOneAndDelete(ctx context.Context, kvID, project, domain string, options ...datasource.WriteOption) (*model.KVDoc, error) { + kvDoc := model.KVDoc{} + kvpath := path.Join(file.FileRootPath, domain, project, kvID, file.NewstKVFile) + kvFolderPath := path.Join(file.FileRootPath, domain, project, kvID) + kvTmpFolderPath := path.Join(file.FileRootPath, "tmp", domain, project, kvID) + + kvInfo, err := file.ReadFile(kvpath) + if err != nil { + return nil, err + } + + if kvInfo == nil { + return nil, datasource.ErrKeyNotExists + } + + err = file.MoveDir(kvFolderPath, kvTmpFolderPath) + + if err != nil { + openlog.Error("delete Key error: " + err.Error()) + return nil, err + } + + err = json.Unmarshal(kvInfo, &kvDoc) + if err != nil { + openlog.Error("decode error: " + err.Error()) + moveDirErr := file.MoveDir(kvTmpFolderPath, kvFolderPath) + if moveDirErr != nil { + openlog.Error("rollback error when delete kv: " + err.Error()) + } + return nil, err + } + err = file.CleanDir(kvTmpFolderPath) + if err != nil { + openlog.Warn("clean tmp dir error when delete kv: " + err.Error()) + } + err = file.CleanDir(kvFolderPath) + if err != nil { + openlog.Warn("clean dir error when delete kv: " + err.Error()) + } + // delete Cache + kvCache.CacheDelete([]*model.KVDoc{&kvDoc}) + return &kvDoc, nil +} + +// FindManyAndDelete deletes multiple kvs and return the deleted kv list as these appeared before deletion +func (s *Dao) FindManyAndDelete(ctx context.Context, kvIDs []string, project, domain string, options ...datasource.WriteOption) ([]*model.KVDoc, int64, error) { + var docs []*model.KVDoc + var removedIds []string + kvParentPath := path.Join(file.FileRootPath, domain, project) + kvTmpParentPath := path.Join(file.FileRootPath, "tmp", domain, project) + var err error + + defer func() { + if err != nil { + for _, id := range removedIds { + err = file.MoveDir(path.Join(kvTmpParentPath, id), path.Join(kvParentPath, id)) + if err != nil { + openlog.Warn("move tmp dir to real dir error when delete many kvs: " + err.Error()) + } + err = file.CleanDir(path.Join(kvTmpParentPath, id)) + if err != nil { + openlog.Warn("clean tmp dir error when delete many kvs: " + err.Error()) + } + } + } else { + for _, id := range removedIds { + err = file.CleanDir(path.Join(kvTmpParentPath, id)) + if err != nil { + openlog.Warn("clean tmp dir error when delete many kvs: " + err.Error()) + } + err = file.CleanDir(path.Join(kvParentPath, id)) + if err != nil { + openlog.Warn("clean real dir error when delete many kvs: " + err.Error()) + } + } + } + }() + + for _, id := range kvIDs { + kvPath := path.Join(kvParentPath, id, file.NewstKVFile) + kvInfo, kvErr := getKVDoc(kvPath) + err = kvErr + if err != nil { + return nil, 0, err + } + docs = append(docs, kvInfo) + + err = file.MoveDir(path.Join(kvParentPath, id), path.Join(kvTmpParentPath, id)) + if err != nil { + return nil, 0, err + } else { + removedIds = append(removedIds, id) + } + } + + if len(docs) == 0 { + return nil, 0, datasource.ErrKeyNotExists + } + kvCache.CacheDelete(docs) + return docs, int64(len(docs)), nil +} + +// Get get kv by kv id +func (s *Dao) Get(ctx context.Context, req *model.GetKVRequest) (*model.KVDoc, error) { + kvpath := path.Join(file.FileRootPath, req.Domain, req.Project, req.ID, file.NewstKVFile) + curKV, err := getKVDoc(kvpath) + if err != nil { + return nil, err + } + if err := auth.CheckGetKV(ctx, curKV); err != nil { + return nil, err + } + return curKV, nil +} + +func getKVDoc(kvpath string) (*model.KVDoc, error) { + kvInfo, err := file.ReadFile(kvpath) + if err != nil { + openlog.Error(err.Error()) + return nil, err + } + if kvInfo == nil { + return nil, datasource.ErrKeyNotExists + } + curKV := &model.KVDoc{} + err = json.Unmarshal(kvInfo, curKV) + if err != nil { + openlog.Error("decode error: " + err.Error()) + return nil, err + } + return curKV, nil +} + +func (s *Dao) Total(ctx context.Context, project, domain string) (int64, error) { + kvParentPath := path.Join(file.FileRootPath, domain, project) + total, err := file.CountInDomain(kvParentPath) + + if err != nil { + if os.IsNotExist(err) { + return 0, nil + } + openlog.Error("find total number: " + err.Error()) + return 0, err + } + return int64(total), nil +} + +// List get kv list by key and criteria +func (s *Dao) List(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, error) { + result, opts, err := s.listData(ctx, project, domain, options...) + if err != nil { + return nil, err + } + + filterKVs, err := auth.FilterKVList(ctx, result.Data) + if err != nil { + return nil, err + } + + result.Data = filterKVs + result.Total = len(filterKVs) + + return pagingResult(result, opts), nil +} + +func (s *Dao) listNoAuth(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, error) { + result, opts, err := s.listData(ctx, project, domain, options...) + if err != nil { + return nil, err + } + + return pagingResult(result, opts), nil +} + +// List get kv list by key and criteria +func (s *Dao) listData(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, datasource.FindOptions, error) { + opts := datasource.NewDefaultFindOpts() + for _, o := range options { + o(&opts) + } + ctx, cancel := context.WithTimeout(ctx, opts.Timeout) + defer cancel() + + regex, err := toRegex(opts) + if err != nil { + return nil, opts, err + } + + resultInCache, useCache, kvIdsInCache := Search(&CacheSearchReq{ + Domain: domain, + Project: project, + Opts: &opts, + Regex: regex, + }) + if useCache { + openlog.Info("Use Cache Find Success") + } + + result, err := matchLabelsSearchLocally(ctx, domain, project, regex, opts, kvIdsInCache) + if err != nil { + if os.IsNotExist(err) { + return &model.KVResponse{ + Data: []*model.KVDoc{}, + }, opts, nil + } + openlog.Error("list kv failed: " + err.Error()) + return nil, opts, err + } + + if resultInCache != nil { + result.Data = append(result.Data, resultInCache.Data...) + result.Total += len(resultInCache.Data) + } + return result, opts, nil +} + +func matchLabelsSearchLocally(ctx context.Context, domain, project string, regex *regexp.Regexp, opts datasource.FindOptions, kvIdsInCache []string) (*model.KVResponse, error) { + openlog.Debug("using labels to search kv") + kvParentPath := path.Join(file.FileRootPath, domain, project) + kvs, err := file.ReadAllKvsFromProjectFolder(kvParentPath) + if err != nil { + return nil, err + } + result := &model.KVResponse{ + Data: []*model.KVDoc{}, + } + var docs []*model.KVDoc + for _, kv := range kvs { + var doc model.KVDoc + err := json.Unmarshal(kv, &doc) + if err != nil { + openlog.Error("decode to KVList error: " + err.Error()) + continue + } + var exist = false + for _, v := range kvIdsInCache { + if v == doc.ID { + exist = true + break + } + } + if exist { + continue + } + + if !filterMatch(&doc, opts, regex) { + continue + } + bytes, _ := json.Marshal(doc) + var docDeepCopy model.KVDoc + err = json.Unmarshal(bytes, &docDeepCopy) + if err != nil { + openlog.Error("decode to KVList error: " + err.Error()) + continue + } + docs = append(docs, &docDeepCopy) + datasource.ClearPart(&doc) + result.Data = append(result.Data, &doc) + result.Total++ + + if IsUniqueFind(opts) { + break + } + } + kvCache.CachePut(docs) + + return result, nil +} + +func IsUniqueFind(opts datasource.FindOptions) bool { + return opts.LabelFormat != "" && opts.Key != "" +} + +func toRegex(opts datasource.FindOptions) (*regexp.Regexp, error) { + var value string + if opts.Key == "" { + return nil, nil + } + switch { + case strings.HasPrefix(opts.Key, "beginWith("): + value = strings.ReplaceAll(getValue(opts.Key), ".", "\\.") + ".*" + case strings.HasPrefix(opts.Key, "wildcard("): + value = strings.ReplaceAll(getValue(opts.Key), ".", "\\.") + value = strings.ReplaceAll(value, "*", ".*") + default: + value = strings.ReplaceAll(opts.Key, ".", "\\.") + } + value = "^" + value + "$" + if !opts.CaseSensitive { + value = "(?i)" + value + } + regex, err := regexp.Compile(value) + if err != nil { + openlog.Error("invalid wildcard expr: " + value + ", error: " + err.Error()) + return nil, err + } + return regex, nil +} + +func pagingResult(result *model.KVResponse, opts datasource.FindOptions) *model.KVResponse { + datasource.ReverseByPriorityAndUpdateRev(result.Data) + + if opts.Limit == 0 { + return result + } + total := int64(result.Total) + if opts.Offset >= total { + result.Data = []*model.KVDoc{} + return result + } + end := opts.Offset + opts.Limit + if end > total { + end = total + } + result.Data = result.Data[opts.Offset:end] + return result +} + +func filterMatch(doc *model.KVDoc, opts datasource.FindOptions, regex *regexp.Regexp) bool { + if opts.Status != "" && doc.Status != opts.Status { + return false + } + if regex != nil && !regex.MatchString(doc.Key) { + return false + } + if len(opts.Labels) != 0 { + if opts.ExactLabels && !util.IsEquivalentLabel(opts.Labels, doc.Labels) { + return false + } + if !opts.ExactLabels && !util.IsContainLabel(doc.Labels, opts.Labels) { + return false + } + } + if opts.LabelFormat != "" && doc.LabelFormat != opts.LabelFormat { + return false + } + if opts.Value != "" && !strings.Contains(doc.Value, opts.Value) { + return false + } + return true +} diff --git a/server/datasource/local/rbac/rbac.go b/server/datasource/local/rbac/rbac.go new file mode 100644 index 00000000..8c1249a6 --- /dev/null +++ b/server/datasource/local/rbac/rbac.go @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rbac + +import ( + "context" + "encoding/json" + "errors" + + crbac "github.com/go-chassis/cari/rbac" + "github.com/go-chassis/openlog" + "github.com/little-cui/etcdadpt" +) + +func generateRBACRoleKey(name string) string { + return "/cse-sr/roles/" + name +} + +func generateRBACAccountKey(name string) string { + return "/cse-sr/accounts/" + name +} + +type Dao struct { +} + +func (re *Dao) GetRole(ctx context.Context, name string) (*crbac.Role, error) { + kv, err := etcdadpt.Get(ctx, generateRBACRoleKey(name)) + if err != nil { + return nil, err + } + if kv == nil { + return nil, errors.New("role not exist") + } + role := &crbac.Role{} + err = json.Unmarshal(kv.Value, role) + if err != nil { + openlog.Error("role info format invalid", openlog.WithErr(err)) + return nil, err + } + return role, nil +} + +func (re *Dao) AccountExist(ctx context.Context, name string) (bool, error) { + return etcdadpt.Exist(ctx, generateRBACAccountKey(name)) +} diff --git a/server/datasource/local/track/polling_detail_dao.go b/server/datasource/local/track/polling_detail_dao.go new file mode 100644 index 00000000..eab584bb --- /dev/null +++ b/server/datasource/local/track/polling_detail_dao.go @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package track + +import ( + "context" + "encoding/json" + "os" + "path" + + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/apache/servicecomb-kie/server/datasource/local/file" + "github.com/go-chassis/openlog" +) + +// Dao is the implementation +type Dao struct { +} + +// CreateOrUpdate create a record or update exist record +// If revision and session_id exists then update else insert +func (s *Dao) CreateOrUpdate(ctx context.Context, detail *model.PollingDetail) (*model.PollingDetail, error) { + bytes, err := json.Marshal(detail) + if err != nil { + openlog.Error("encode polling detail error: " + err.Error()) + return nil, err + } + + revision := "default" + if detail.Revision != "" { + revision = detail.Revision + } + trackPath := path.Join(file.FileRootPath, "track", detail.Domain, detail.Project, revision, detail.SessionID+".json") + + err = file.CreateOrUpdateFile(trackPath, bytes, &[]file.FileDoRecord{}, false) + if err != nil { + openlog.Error(err.Error()) + return nil, err + } + return detail, nil +} + +// GetPollingDetail is to get a track data +func (s *Dao) GetPollingDetail(ctx context.Context, detail *model.PollingDetail) ([]*model.PollingDetail, error) { + trackFolderPath := path.Join(file.FileRootPath, "track", detail.Domain, detail.Project) + _, kvs, err := file.ReadAllFiles(trackFolderPath) + if err != nil { + if os.IsNotExist(err) { + return make([]*model.PollingDetail, 0), nil + } + openlog.Error(err.Error()) + return nil, err + } + + records := make([]*model.PollingDetail, 0, len(kvs)) + for _, kv := range kvs { + var doc model.PollingDetail + err := json.Unmarshal(kv, &doc) + if err != nil { + openlog.Error("decode polling detail error: " + err.Error()) + continue + } + if detail.SessionID != "" && doc.SessionID != detail.SessionID { + continue + } + if detail.IP != "" && doc.IP != detail.IP { + continue + } + if detail.UserAgent != "" && doc.UserAgent != detail.UserAgent { + continue + } + if detail.URLPath != "" && doc.URLPath != detail.URLPath { + continue + } + if detail.Revision != "" && doc.Revision != detail.Revision { + continue + } + records = append(records, &doc) + } + if len(records) == 0 { + return nil, datasource.ErrRecordNotExists + } + return records, nil +} diff --git a/server/db/db.go b/server/db/db.go index 5bacd371..7f75b32f 100644 --- a/server/db/db.go +++ b/server/db/db.go @@ -24,6 +24,7 @@ import ( "github.com/apache/servicecomb-kie/server/config" "github.com/apache/servicecomb-kie/server/config/tlsutil" + "github.com/apache/servicecomb-kie/server/datasource/local/file" "github.com/go-chassis/cari/db" dconfig "github.com/go-chassis/cari/db/config" "github.com/go-chassis/openlog" @@ -59,6 +60,18 @@ func Init(c config.DB) error { return errors.New("tls setting invalid:" + err.Error()) } } + + if c.Kind == "etcd_with_localstorage" || c.Kind == "embedded_etcd_with_localstorage" { + if c.Kind == "embedded_etcd_with_localstorage" { + c.Kind = "embedded_etcd" + } + if c.Kind == "etcd_with_localstorage" { + c.Kind = "etcd" + } + if c.LocalFilePath != "" { + file.FileRootPath = c.LocalFilePath + } + } return db.Init(&dconfig.Config{ Kind: c.Kind, URI: c.URI, diff --git a/server/handler/track_handler.go b/server/handler/track_handler.go index 29328e1c..d327f33a 100644 --- a/server/handler/track_handler.go +++ b/server/handler/track_handler.go @@ -78,7 +78,10 @@ func (h *TrackHandler) Handle(chain *handler.Chain, inv *invocation.Invocation, data.Domain = v1.ReadDomain(req.Request.Context()) data.Project = req.PathParameter(common.PathParameterProject) data.IP = iputil.ClientIP(req.Request) - data.ResponseBody = req.Attribute(common.RespBodyContextKey).([]*model.KVDoc) + responseBodyAttr := req.Attribute(common.RespBodyContextKey) + if responseBodyAttr != nil { + data.ResponseBody = responseBodyAttr.([]*model.KVDoc) + } data.ResponseCode = ir.Status data.Timestamp = time.Now() if resp != nil { @@ -96,7 +99,6 @@ func (h *TrackHandler) Handle(chain *handler.Chain, inv *invocation.Invocation, return } cb(ir) - }) } diff --git a/server/resource/v1/admin_resource_test.go b/server/resource/v1/admin_resource_test.go index 2ed6c2fa..9344bf0a 100644 --- a/server/resource/v1/admin_resource_test.go +++ b/server/resource/v1/admin_resource_test.go @@ -19,7 +19,6 @@ package v1_test import ( "encoding/json" - "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -34,7 +33,7 @@ import ( ) func Test_HeathCheck(t *testing.T) { - path := fmt.Sprintf("/v1/health") + path := "/v1/health" r, _ := http.NewRequest("GET", path, nil) revision := &v1.AdminResource{} c, err := restfultest.New(revision, nil) diff --git a/server/resource/v1/history_resource_test.go b/server/resource/v1/history_resource_test.go index 14efae65..a6956395 100644 --- a/server/resource/v1/history_resource_test.go +++ b/server/resource/v1/history_resource_test.go @@ -88,7 +88,7 @@ func TestHistoryResource_GetRevisions(t *testing.T) { body, err := ioutil.ReadAll(resp.Body) assert.NoError(t, err) var data model.KVResponse - err = json.Unmarshal(body, &data) + _ = json.Unmarshal(body, &data) assert.Equal(t, before+1, len(data.Data)) }) } diff --git a/server/resource/v1/kv_resource_test.go b/server/resource/v1/kv_resource_test.go index c52c3aa1..80cc709c 100644 --- a/server/resource/v1/kv_resource_test.go +++ b/server/resource/v1/kv_resource_test.go @@ -356,7 +356,7 @@ func TestKVResource_List(t *testing.T) { c.ServeHTTP(resp2, r2) rev = resp2.Header().Get(common2.HeaderRevision) t.Log(rev) - body, err := ioutil.ReadAll(resp2.Body) + body, _ := ioutil.ReadAll(resp2.Body) time.Sleep(1 * time.Second) t.Log(string(body)) assert.Equal(t, http.StatusNotModified, resp2.Result().StatusCode) diff --git a/test/init.go b/test/init.go index 40b2e033..8b4e20f4 100644 --- a/test/init.go +++ b/test/init.go @@ -25,6 +25,7 @@ import ( _ "github.com/go-chassis/cari/db/bootstrap" _ "github.com/apache/servicecomb-kie/server/datasource/etcd" + _ "github.com/apache/servicecomb-kie/server/datasource/local" _ "github.com/apache/servicecomb-kie/server/datasource/mongo" _ "github.com/apache/servicecomb-kie/server/plugin/qms" _ "github.com/apache/servicecomb-kie/server/pubsub/notifier" @@ -42,8 +43,9 @@ import ( ) var ( - uri string - kind string + uri string + kind string + localFilePath string ) func init() { @@ -54,6 +56,8 @@ func init() { } kind = archaius.GetString("TEST_DB_KIND", "etcd") uri = archaius.GetString("TEST_DB_URI", "http://127.0.0.1:2379") + localFilePath = archaius.GetString("TEST_KVS_ROOT_PATH", "") + err = archaius.Init(archaius.WithMemorySource()) if err != nil { panic(err) @@ -71,9 +75,10 @@ func init() { panic(err) } err = db.Init(config.DB{ - URI: uri, - Timeout: "10s", - Kind: kind, + URI: uri, + Timeout: "10s", + Kind: kind, + LocalFilePath: localFilePath, }) if err != nil { panic(err) @@ -82,7 +87,15 @@ func init() { if err != nil { panic(err) } - err = edatasource.Init(kind) + + edatasourceKind := kind + if kind == "etcd_with_localstorage" { + edatasourceKind = "etcd" + } + if kind == "embedded_etcd_with_localstorage" { + edatasourceKind = "embedded_etcd" + } + err = edatasource.Init(edatasourceKind) if err != nil { panic(err) } @@ -115,5 +128,5 @@ func randomListenAddress() string { } func IsEmbeddedetcdMode() bool { - return kind == "embedded_etcd" + return kind == "embedded_etcd" || kind == "embedded_etcd_with_localstorage" }