diff --git a/.github/workflows/etcd_storage.yml b/.github/workflows/etcd_storage.yml index 47202b0e..a8115e79 100644 --- a/.github/workflows/etcd_storage.yml +++ b/.github/workflows/etcd_storage.yml @@ -13,7 +13,7 @@ jobs: uses: actions/checkout@v1 - name: UT for etcd run: | - time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379 + time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379 while ! nc -z 127.0.0.1 2379; do sleep 1 done diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index b977042d..3c8ae4f6 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -15,7 +15,7 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v2 with: - version: v1.51.2 + version: v1.55.2 args: --enable gofmt,gocyclo,goimports,dupl,gosec --timeout 5m --skip-dirs=examples,test --skip-files=.*_test.go$ static-checks: runs-on: ubuntu-latest diff --git a/.github/workflows/local_storage.yml b/.github/workflows/local_storage.yml new file mode 100644 index 00000000..43894113 --- /dev/null +++ b/.github/workflows/local_storage.yml @@ -0,0 +1,24 @@ +name: Merge check for local +on: [push, pull_request] +jobs: + etcd-with-localstorage: + runs-on: ubuntu-latest + steps: + - name: Set up Go + uses: actions/setup-go@v1 + with: + go-version: 1.18 + id: go + - name: Check out code into the Go module directory + uses: actions/checkout@v1 + - name: UT for etcd with local storage + run: | + time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379 + while ! nc -z 127.0.0.1 2379; do + sleep 1 + done + export TEST_DB_KIND=etcd_with_localstorage + export TEST_DB_URI=127.0.0.1:2379 + export TEST_KVS_ROOT_PATH=/data/kvs + sudo rm -rf /data/kvs + sudo time go test $(go list ./... | grep -v mongo | grep -v third_party | grep -v examples) \ No newline at end of file diff --git a/cmd/kieserver/main.go b/cmd/kieserver/main.go index f9936436..b5045cc3 100644 --- a/cmd/kieserver/main.go +++ b/cmd/kieserver/main.go @@ -35,6 +35,7 @@ import ( //storage _ "github.com/apache/servicecomb-kie/server/datasource/etcd" + _ "github.com/apache/servicecomb-kie/server/datasource/local" _ "github.com/apache/servicecomb-kie/server/datasource/mongo" //quota management diff --git a/examples/dev/conf/chassis.yaml b/examples/dev/conf/chassis.yaml index 4eab0d22..cb2cc691 100755 --- a/examples/dev/conf/chassis.yaml +++ b/examples/dev/conf/chassis.yaml @@ -6,6 +6,9 @@ servicecomb: protocols: rest: listenAddress: 127.0.0.1:30110 + metrics: + enable: true + interval: 10s match: rateLimitPolicy: | matches: diff --git a/examples/dev/kie-conf.yaml b/examples/dev/kie-conf.yaml index 11b91067..56aff3a3 100644 --- a/examples/dev/kie-conf.yaml +++ b/examples/dev/kie-conf.yaml @@ -1,6 +1,9 @@ db: - # kind can be mongo, etcd, embedded_etcd + # kind can be mongo, etcd, embedded_etcd, embedded_etcd_with_localstorage, etcd_with_localstorage kind: embedded_etcd + +# localFilePath: is the root path to store local kv files +# uri: http://127.0.0.1:2379 # uri is the db endpoints list # kind=mongo, then is the mongodb cluster's uri, e.g. mongodb://127.0.0.1:27017/kie # kind=etcd, then is the remote etcd server's advertise-client-urls, e.g. http://127.0.0.1:2379 diff --git a/go.mod b/go.mod index eb667ad0..f88e2876 100644 --- a/go.mod +++ b/go.mod @@ -1,21 +1,21 @@ module github.com/apache/servicecomb-kie require ( - github.com/apache/servicecomb-service-center/eventbase v0.0.0-20220120070230-26997eb876ca + github.com/apache/servicecomb-service-center/eventbase v0.0.0-20240328150344-01abe81dc5d0 github.com/emicklei/go-restful v2.15.1-0.20220703112237-d9c71e118c95+incompatible - github.com/go-chassis/cari v0.7.1-0.20220815112157-2c62cc5ae1a3 + github.com/go-chassis/cari v0.9.1-0.20240328115504-88da93faaca7 + github.com/go-chassis/etcdadpt v0.5.3-0.20240328092602-984e34b756fe github.com/go-chassis/foundation v0.4.0 - github.com/go-chassis/go-archaius v1.5.2-0.20210301074935-e4694f6b077b + github.com/go-chassis/go-archaius v1.5.6 github.com/go-chassis/go-chassis/v2 v2.7.1 github.com/go-chassis/openlog v1.1.3 github.com/go-chassis/seclog v1.3.1-0.20210917082355-52c40864f240 github.com/gofrs/uuid v4.0.0+incompatible github.com/hashicorp/serf v0.9.5 - github.com/little-cui/etcdadpt v0.3.2 github.com/patrickmn/go-cache v2.1.0+incompatible - github.com/stretchr/testify v1.7.1 + github.com/stretchr/testify v1.7.2 github.com/urfave/cli v1.22.4 - go.etcd.io/etcd/api/v3 v3.5.0 + go.etcd.io/etcd/api/v3 v3.5.4 go.mongodb.org/mongo-driver v1.5.1 gopkg.in/yaml.v2 v2.4.0 ) @@ -31,9 +31,7 @@ require ( github.com/cenkalti/backoff/v4 v4.1.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/coreos/go-semver v0.3.0 // indirect - github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect - github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/deckarep/golang-set v1.7.1 // indirect @@ -42,7 +40,7 @@ require ( github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-chassis/go-restful-swagger20 v1.0.4-0.20220704025524-9243cbee26b7 // indirect - github.com/go-chassis/kie-client v0.1.1-0.20210926011742-97eed4281056 // indirect + github.com/go-chassis/kie-client v0.2.0 // indirect github.com/go-chassis/sc-client v0.6.1-0.20220728072125-dacdd0c834bf // indirect github.com/go-playground/locales v0.13.0 // indirect github.com/go-playground/universal-translator v0.17.0 // indirect @@ -105,12 +103,12 @@ require ( github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect go.etcd.io/bbolt v1.3.6 // indirect - go.etcd.io/etcd/client/pkg/v3 v3.5.0 // indirect - go.etcd.io/etcd/client/v2 v2.305.0 // indirect - go.etcd.io/etcd/client/v3 v3.5.0 // indirect - go.etcd.io/etcd/pkg/v3 v3.5.0 // indirect - go.etcd.io/etcd/raft/v3 v3.5.0 // indirect - go.etcd.io/etcd/server/v3 v3.5.0 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect + go.etcd.io/etcd/client/v2 v2.305.4 // indirect + go.etcd.io/etcd/client/v3 v3.5.4 // indirect + go.etcd.io/etcd/pkg/v3 v3.5.4 // indirect + go.etcd.io/etcd/raft/v3 v3.5.4 // indirect + go.etcd.io/etcd/server/v3 v3.5.4 // indirect go.opentelemetry.io/contrib v0.20.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 // indirect go.opentelemetry.io/otel v0.20.0 // indirect @@ -124,18 +122,18 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect - golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b // indirect - golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect + golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect + golang.org/x/net v0.7.0 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect - golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect - golang.org/x/text v0.3.7 // indirect + golang.org/x/sys v0.5.0 // indirect + golang.org/x/text v0.7.0 // indirect golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect google.golang.org/grpc v1.40.0 // indirect google.golang.org/protobuf v1.27.1 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect - gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apimachinery v0.19.5 // indirect k8s.io/client-go v0.19.5 // indirect k8s.io/utils v0.0.0-20200729134348-d5654de09c73 // indirect diff --git a/go.sum b/go.sum index e67db898..652132a1 100644 --- a/go.sum +++ b/go.sum @@ -37,7 +37,6 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI= github.com/Azure/go-autorest/autorest v0.9.6/go.mod h1:/FALq9T/kS7b5J5qsQ+RSTUdAmGFqi0vUdVNNx8q630= github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0= -github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0= github.com/Azure/go-autorest/autorest/adal v0.8.2/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q= github.com/Azure/go-autorest/autorest/date v0.1.0/go.mod h1:plvfp3oPSKwf2DNjlBjWF/7vwR+cUD/ELuzDCXwHUVA= github.com/Azure/go-autorest/autorest/date v0.2.0/go.mod h1:vcORJHLJEh643/Ioh9+vPmf1Ij9AEBM5FuBIXLmIy0g= @@ -61,8 +60,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/apache/servicecomb-service-center/eventbase v0.0.0-20220120070230-26997eb876ca h1:e4NoNmQYa7y0K+jJSbvQHsQ+mVI1ThBk/pP21MbFblQ= -github.com/apache/servicecomb-service-center/eventbase v0.0.0-20220120070230-26997eb876ca/go.mod h1:eClC23tfQ0Q4vdesWnBeFfEyf58XyEQhoWki3+HSQ9s= +github.com/apache/servicecomb-service-center/eventbase v0.0.0-20240328150344-01abe81dc5d0 h1:71oLgvqQJ0IRhcNCMPho4kgelrhX3OuJM/z985I4lJ8= +github.com/apache/servicecomb-service-center/eventbase v0.0.0-20240328150344-01abe81dc5d0/go.mod h1:KZRz3iaxrK1ko1ChiPnU/GTGPNRGsN7jBNhN/2+YkTU= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I= @@ -112,11 +111,9 @@ github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkE github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= -github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM= @@ -169,16 +166,19 @@ github.com/go-chassis/cari v0.0.0-20201210041921-7b6fbef2df11/go.mod h1:MgtsEI0A github.com/go-chassis/cari v0.4.0/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8= github.com/go-chassis/cari v0.5.0/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8= github.com/go-chassis/cari v0.5.1-0.20210823023004-74041d1363c4/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8= -github.com/go-chassis/cari v0.7.1-0.20220815112157-2c62cc5ae1a3 h1:pSOHycBsSS8yO73zzNn0ARtX+DpuuEnt/GVnXA5VOeE= -github.com/go-chassis/cari v0.7.1-0.20220815112157-2c62cc5ae1a3/go.mod h1:vM13BN0TT505ZKqeJ+hUfzZvfn4nN0vgE6IpBOTWcTc= +github.com/go-chassis/cari v0.6.0/go.mod h1:mSDRCOQXGmlD69A6NG0hsv0UP1xbVPtL6HCGI6X1tqs= +github.com/go-chassis/cari v0.9.1-0.20240328115504-88da93faaca7 h1:XlCtMt+l1hcpfbiFRoSYWYr0q6Ak9g/UGFXSKoqmbT4= +github.com/go-chassis/cari v0.9.1-0.20240328115504-88da93faaca7/go.mod h1:ibqLyh+Q+1n9PlldW3glD9G+2s/yeSyVMCCkQWKRwuE= +github.com/go-chassis/etcdadpt v0.5.3-0.20240328092602-984e34b756fe h1:peLHEt3wzab6nKVcmcu0qkj1+ZXK6D1ymtiyyMBv/XA= +github.com/go-chassis/etcdadpt v0.5.3-0.20240328092602-984e34b756fe/go.mod h1:HV8OZ1Npu+lttD+pJA5nUxWZR3/SBFetTh7w8nYFkUA= github.com/go-chassis/foundation v0.2.2-0.20201210043510-9f6d3de40234/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA= github.com/go-chassis/foundation v0.2.2/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA= github.com/go-chassis/foundation v0.3.0/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA= github.com/go-chassis/foundation v0.4.0 h1:z0xETnSxF+vRXWjoIhOdzt6rywjZ4sB++utEl4YgWEY= github.com/go-chassis/foundation v0.4.0/go.mod h1:6NsIUaHghTFRGfCBcZN011zl196F6OR5QvD9N+P4oWU= github.com/go-chassis/go-archaius v1.5.1/go.mod h1:QPwvvtBxvwiC48rmydoAqxopqOr93RCQ6syWsIkXPXQ= -github.com/go-chassis/go-archaius v1.5.2-0.20210301074935-e4694f6b077b h1:0u2kNkdw+J8OublV27I1Xn6berg3MiUm5GSl/T3qXSg= -github.com/go-chassis/go-archaius v1.5.2-0.20210301074935-e4694f6b077b/go.mod h1:qjfG7opNF/QTzj7SyVIn/eIawaPhl7xeGgg8kxzFsDw= +github.com/go-chassis/go-archaius v1.5.6 h1:MF/yE9Mj51slccW6EmZInjFCmyfuhltRz9eu5Na4i88= +github.com/go-chassis/go-archaius v1.5.6/go.mod h1:WsqeDyZsCR2qGdWEAEpywS1taxCUHRF4hPSHVMfnAkc= github.com/go-chassis/go-chassis/v2 v2.3.0/go.mod h1:iyJ2DWSkqfnCmad/0Il9nXWHaob7RcwPGlIDRNxccH0= github.com/go-chassis/go-chassis/v2 v2.7.1 h1:bkYntNY0l5UVRJVQePCD+l+fR/QNqVHxnEjpJxzUBGQ= github.com/go-chassis/go-chassis/v2 v2.7.1/go.mod h1:uzFDzYKzpQrCff8xgPMUEi4ku0VAF6DbXdz57iXYqQM= @@ -186,9 +186,8 @@ github.com/go-chassis/go-restful-swagger20 v1.0.3/go.mod h1:eW62fYuzlNFDvIacB6AV github.com/go-chassis/go-restful-swagger20 v1.0.4-0.20220704025524-9243cbee26b7 h1:EOIGW+inOz52zh6vgr9EQHvvgL2w/VghAeCQIFOVUSE= github.com/go-chassis/go-restful-swagger20 v1.0.4-0.20220704025524-9243cbee26b7/go.mod h1:pSGkT+ksxlMgytyJb4IAz8aZih6OLE1++d9CE6aO9Hg= github.com/go-chassis/kie-client v0.0.0-20201210060018-938c7680a9ab/go.mod h1:UTdbtyN5ge/v9DmQzdVRxQP7z51Q4z6hyl+W6ZpUHFM= -github.com/go-chassis/kie-client v0.1.0/go.mod h1:UTdbtyN5ge/v9DmQzdVRxQP7z51Q4z6hyl+W6ZpUHFM= -github.com/go-chassis/kie-client v0.1.1-0.20210926011742-97eed4281056 h1:Y8CyErFNg4d1dPYXvNWxpyzzLQ/kuyuxJF2/7My7qLc= -github.com/go-chassis/kie-client v0.1.1-0.20210926011742-97eed4281056/go.mod h1:N4SrGTb+e9ZiuOOU9vC/AohqsDtCkY2amNAPcvpEem0= +github.com/go-chassis/kie-client v0.2.0 h1:9/BXLu8HaH9b7WLIrtizfhtbBaQEZlsbsk9c8z+mhgc= +github.com/go-chassis/kie-client v0.2.0/go.mod h1:JCyQeOZcr3ti79tc8tix0VzkC2YpB5j7PRbsOmQGcKQ= github.com/go-chassis/openlog v1.1.2/go.mod h1:+eYCADVxWyJkwsFMUBrMxyQlNqW+UUsCxvR2LrYZUaA= github.com/go-chassis/openlog v1.1.3 h1:XqIOvZ8YPJ9o9lLtLBskQNNWolK5kC6a4Sv7r4s9sZ4= github.com/go-chassis/openlog v1.1.3/go.mod h1:+eYCADVxWyJkwsFMUBrMxyQlNqW+UUsCxvR2LrYZUaA= @@ -434,7 +433,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= -github.com/little-cui/etcdadpt v0.3.2 h1:EBXPBxddZXTgWvGsIdAqqG6JCu1TouPNUhVVj9swt/s= github.com/little-cui/etcdadpt v0.3.2/go.mod h1:HnRRpIrVEVNWobkiCvG2EHLWKKZ+L047EcI29ma2zA4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -513,6 +511,7 @@ github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDf github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= +github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.12.2 h1:51L9cDoUHVrXx4zWYlcLQIZ+d+VXHgqnYKkIuq4g/34= github.com/prometheus/client_golang v1.12.2/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= @@ -587,8 +586,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= +github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= @@ -618,20 +617,27 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= -go.etcd.io/etcd/api/v3 v3.5.0 h1:GsV3S+OfZEOCNXdtNkBSR7kgLobAa/SO6tCxRa0GAYw= go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= -go.etcd.io/etcd/client/pkg/v3 v3.5.0 h1:2aQv6F436YnN7I4VbI8PPYrBhu+SmrTaADcf8Mi/6PU= +go.etcd.io/etcd/api/v3 v3.5.4 h1:OHVyt3TopwtUQ2GKdd5wu3PmmipR4FTwCqoEjSyRdIc= +go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= -go.etcd.io/etcd/client/v2 v2.305.0 h1:ftQ0nOOHMcbMS3KIaDQ0g5Qcd6bhaBrQT6b89DfwLTs= +go.etcd.io/etcd/client/pkg/v3 v3.5.4 h1:lrneYvz923dvC14R54XcA7FXoZ3mlGZAgmwhfm7HqOg= +go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ= -go.etcd.io/etcd/client/v3 v3.5.0 h1:62Eh0XOro+rDwkrypAGDfgmNh5Joq+z+W9HZdlXMzek= +go.etcd.io/etcd/client/v2 v2.305.4 h1:Dcx3/MYyfKcPNLpR4VVQUP5KgYrBeJtktBwEKkw08Ao= +go.etcd.io/etcd/client/v2 v2.305.4/go.mod h1:Ud+VUwIi9/uQHOMA+4ekToJ12lTxlv0zB/+DHwTGEbU= go.etcd.io/etcd/client/v3 v3.5.0/go.mod h1:AIKXXVX/DQXtfTEqBryiLTUXwON+GuvO6Z7lLS/oTh0= -go.etcd.io/etcd/pkg/v3 v3.5.0 h1:ntrg6vvKRW26JRmHTE0iNlDgYK6JX3hg/4cD62X0ixk= +go.etcd.io/etcd/client/v3 v3.5.4 h1:p83BUL3tAYS0OT/r0qglgc3M1JjhM0diV8DSWAhVXv4= +go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= go.etcd.io/etcd/pkg/v3 v3.5.0/go.mod h1:UzJGatBQ1lXChBkQF0AuAtkRQMYnHubxAEYIrC3MSsE= -go.etcd.io/etcd/raft/v3 v3.5.0 h1:kw2TmO3yFTgE+F0mdKkG7xMxkit2duBDa2Hu6D/HMlw= +go.etcd.io/etcd/pkg/v3 v3.5.4 h1:V5Dvl7S39ZDwjkKqJG2BfXgxZ3QREqqKifWQgIw5IM0= +go.etcd.io/etcd/pkg/v3 v3.5.4/go.mod h1:OI+TtO+Aa3nhQSppMbwE4ld3uF1/fqqwbpfndbbrEe0= go.etcd.io/etcd/raft/v3 v3.5.0/go.mod h1:UFOHSIvO/nKwd4lhkwabrTD3cqW5yVyYYf/KlD00Szc= -go.etcd.io/etcd/server/v3 v3.5.0 h1:jk8D/lwGEDlQU9kZXUFMSANkE22Sg5+mW27ip8xcF9E= +go.etcd.io/etcd/raft/v3 v3.5.4 h1:YGrnAgRfgXloBNuqa+oBI/aRZMcK/1GS6trJePJ/Gqc= +go.etcd.io/etcd/raft/v3 v3.5.4/go.mod h1:SCuunjYvZFC0fBX0vxMSPjuZmpcSk+XaAcMrD6Do03w= go.etcd.io/etcd/server/v3 v3.5.0/go.mod h1:3Ah5ruV+M+7RZr0+Y/5mNLwC+eQlni+mQmOVdCRJoS4= +go.etcd.io/etcd/server/v3 v3.5.4 h1:CMAZd0g8Bn5NRhynW6pKhc4FRg41/0QYy3d7aNm9874= +go.etcd.io/etcd/server/v3 v3.5.4/go.mod h1:S5/YTU15KxymM5l3T6b09sNOHPXqGYIZStpuuGbb65c= go.mongodb.org/mongo-driver v1.5.1 h1:9nOVLGDfOaZ9R0tBumx/BcuqkbFpyTCU2r/Po7A2azI= go.mongodb.org/mongo-driver v1.5.1/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -688,8 +694,8 @@ golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b h1:7mWr3k41Qtv8XlltBkDkl8LoP3mpSgBW8BUoxtEdbXg= -golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE= +golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -762,10 +768,11 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -843,8 +850,10 @@ golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -854,8 +863,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -915,8 +924,8 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1038,8 +1047,9 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/common/common.go b/pkg/common/common.go index 9e452f34..7eb1d3d7 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -40,6 +40,7 @@ const ( QueryParamURLPath = "urlPath" QueryParamUserAgent = "userAgent" QueryParamOverride = "override" + QueryParamMode = "mode" ) // http headers diff --git a/pkg/model/db_schema.go b/pkg/model/db_schema.go index 442da43e..13e10b36 100644 --- a/pkg/model/db_schema.go +++ b/pkg/model/db_schema.go @@ -95,7 +95,7 @@ type GetKVRequest struct { type ListKVRequest struct { Project string `json:"project,omitempty" yaml:"project,omitempty" validate:"min=1,max=256,commonName"` Domain string `json:"domain,omitempty" yaml:"domain,omitempty" validate:"min=1,max=256,commonName"` //redundant - Key string `json:"key" yaml:"key" validate:"max=128,getKey"` + Key string `json:"key" yaml:"key" validate:"max=256,getKey"` Value string `json:"value" yaml:"value" validate:"max=128"` Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty" validate:"max=8,dive,keys,labelK,endkeys,labelV"` //redundant Offset int64 `validate:"min=0"` diff --git a/scripts/start.sh b/scripts/start.sh index 8e0b662f..c25f7835 100755 --- a/scripts/start.sh +++ b/scripts/start.sh @@ -73,6 +73,7 @@ EOM db: kind: ${db_type} uri: ${uri} + localFilePath: ${KVS_ROOT_PATH} EOM } diff --git a/server/config/struct.go b/server/config/struct.go index 83e91f08..73910272 100644 --- a/server/config/struct.go +++ b/server/config/struct.go @@ -22,7 +22,7 @@ type Config struct { DB DB `yaml:"db"` RBAC RBAC `yaml:"rbac"` Sync Sync `yaml:"sync"` - //config from cli + // config from cli ConfigFile string NodeName string ListenPeerAddr string @@ -41,11 +41,12 @@ type TLS struct { // DB is yaml file struct to set persistent config type DB struct { - TLS `yaml:",inline" json:",inline"` - URI string `yaml:"uri" json:"uri,omitempty"` - Kind string `yaml:"kind" json:"kind,omitempty"` - PoolSize int `yaml:"poolSize" json:"pool_size,omitempty"` - Timeout string `yaml:"timeout" json:"timeout,omitempty"` + TLS `yaml:",inline" json:",inline"` + URI string `yaml:"uri" json:"uri,omitempty"` + Kind string `yaml:"kind" json:"kind,omitempty"` + LocalFilePath string `yaml:"localFilePath" json:"local_file_path,omitempty"` + PoolSize int `yaml:"poolSize" json:"pool_size,omitempty"` + Timeout string `yaml:"timeout" json:"timeout,omitempty"` } // RBAC is rbac config diff --git a/server/datasource/dao.go b/server/datasource/dao.go index bce146b1..a4cc01eb 100644 --- a/server/datasource/dao.go +++ b/server/datasource/dao.go @@ -23,10 +23,9 @@ import ( "errors" "fmt" + "github.com/apache/servicecomb-kie/pkg/model" "github.com/apache/servicecomb-kie/server/datasource/rbac" "github.com/go-chassis/openlog" - - "github.com/apache/servicecomb-kie/pkg/model" ) var ( @@ -120,9 +119,12 @@ type ViewDao interface { func Init(kind string) error { var err error f, ok := plugins[kind] + if !ok { + openlog.Info(fmt.Sprintf("do not support '%s'", kind)) return fmt.Errorf("do not support '%s'", kind) } + dbc := &Config{} if b, err = f(dbc); err != nil { return err diff --git a/server/datasource/etcd/counter/revision.go b/server/datasource/etcd/counter/revision.go index 07d14489..3d677776 100644 --- a/server/datasource/etcd/counter/revision.go +++ b/server/datasource/etcd/counter/revision.go @@ -20,11 +20,12 @@ package counter import ( "context" - "github.com/apache/servicecomb-kie/server/datasource" - "github.com/apache/servicecomb-kie/server/datasource/etcd/key" "github.com/go-chassis/cari/config" + "github.com/go-chassis/etcdadpt" "github.com/go-chassis/openlog" - "github.com/little-cui/etcdadpt" + + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/apache/servicecomb-kie/server/datasource/etcd/key" ) const revision = "revision_counter" diff --git a/server/datasource/etcd/history/history_dao.go b/server/datasource/etcd/history/history_dao.go index f1ce04d1..cd4dfca3 100644 --- a/server/datasource/etcd/history/history_dao.go +++ b/server/datasource/etcd/history/history_dao.go @@ -21,8 +21,8 @@ import ( "context" "encoding/json" + "github.com/go-chassis/etcdadpt" "github.com/go-chassis/openlog" - "github.com/little-cui/etcdadpt" "github.com/apache/servicecomb-kie/server/datasource/auth" diff --git a/server/datasource/etcd/key/key.go b/server/datasource/etcd/key/key.go index 7bf25c32..1df46653 100644 --- a/server/datasource/etcd/key/key.go +++ b/server/datasource/etcd/key/key.go @@ -28,28 +28,8 @@ const ( keyCounter = "counter" keyHistory = "kv-history" keyTrack = "track" - syncer = "syncer" - task = "task" - tombstone = "tombstone" ) -func getSyncRootKey() string { - return split + syncer + split + task -} - -func getTombstoneRootKey() string { - return split + tombstone -} - -func TaskKey(domain, project, taskID string, timestamp int64) string { - strTimestamp := strconv.FormatInt(timestamp, 10) - return strings.Join([]string{getSyncRootKey(), domain, project, strTimestamp, taskID}, split) -} - -func TombstoneKey(domain, project, resourceType, resourceID string) string { - return strings.Join([]string{getTombstoneRootKey(), domain, project, resourceType, resourceID}, split) -} - func KV(domain, project, kvID string) string { return strings.Join([]string{keyKV, domain, project, kvID}, split) } diff --git a/server/datasource/etcd/kv/kv_cache.go b/server/datasource/etcd/kv/kv_cache.go index 61d017b2..e890b87c 100644 --- a/server/datasource/etcd/kv/kv_cache.go +++ b/server/datasource/etcd/kv/kv_cache.go @@ -9,15 +9,16 @@ import ( "sync" "time" - "github.com/apache/servicecomb-kie/pkg/model" - "github.com/apache/servicecomb-kie/pkg/stringutil" - "github.com/apache/servicecomb-kie/server/datasource" - "github.com/apache/servicecomb-kie/server/datasource/etcd/key" + "github.com/go-chassis/etcdadpt" "github.com/go-chassis/foundation/backoff" "github.com/go-chassis/openlog" - "github.com/little-cui/etcdadpt" goCache "github.com/patrickmn/go-cache" "go.etcd.io/etcd/api/v3/mvccpb" + + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/pkg/stringutil" + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/apache/servicecomb-kie/server/datasource/etcd/key" ) func Init() { @@ -35,8 +36,6 @@ const ( backOffMinInterval = 5 * time.Second ) -type IDSet map[string]struct{} - type Cache struct { timeOut time.Duration client etcdadpt.Client @@ -158,11 +157,13 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) { cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels) m, ok := kc.LoadKvIDSet(cacheKey) if !ok { - kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}}) + z := &sync.Map{} + z.Store(kvDoc.ID, struct{}{}) + kc.StoreKvIDSet(cacheKey, z) openlog.Info("cacheKey " + cacheKey + "not exists") continue } - m[kvDoc.ID] = struct{}{} + m.Store(kvDoc.ID, struct{}{}) } } @@ -180,23 +181,23 @@ func (kc *Cache) cacheDelete(rsp *etcdadpt.Response) { openlog.Error("cacheKey " + cacheKey + "not exists") continue } - delete(m, kvDoc.ID) + m.Delete(kvDoc.ID) } } -func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) { +func (kc *Cache) LoadKvIDSet(cacheKey string) (*sync.Map, bool) { val, ok := kc.kvIDCache.Load(cacheKey) if !ok { return nil, false } - kvIds, ok := val.(IDSet) + kvIds, ok := val.(*sync.Map) if !ok { return nil, false } return kvIds, true } -func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) { +func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds *sync.Map) { kc.kvIDCache.Store(cacheKey, kvIds) } @@ -220,9 +221,9 @@ func (kc *Cache) DeleteKvDoc(kvID string) { kc.kvDocCache.Delete(kvID) } -func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool) { +func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) { if !req.Opts.ExactLabels { - return nil, false + return nil, false, nil } openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts)) @@ -232,22 +233,25 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool) cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels) kvIds, ok := kvCache.LoadKvIDSet(cacheKey) if !ok { - kvCache.StoreKvIDSet(cacheKey, IDSet{}) - return result, true + kvCache.StoreKvIDSet(cacheKey, &sync.Map{}) + return result, true, nil } var docs []*model.KVDoc var kvIdsLeft []string - for kvID := range kvIds { - if doc, ok := kvCache.LoadKvDoc(kvID); ok { + kvIds.Range(func(kvID, value any) bool { + if doc, ok := kvCache.LoadKvDoc(kvID.(string)); ok { docs = append(docs, doc) - continue + } else { + kvIdsLeft = append(kvIdsLeft, kvID.(string)) } - kvIdsLeft = append(kvIdsLeft, kvID) + return true + }) + tpData, err := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft) + if err != nil { + return nil, true, err } - - tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft) docs = append(docs, tpData...) for _, doc := range docs { @@ -257,17 +261,18 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool) } } result.Total = len(result.Data) - return result, true + return result, true, nil } -func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc { +func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) ([]*model.KVDoc, error) { if len(kvIdsLeft) == 0 { - return nil + return nil, nil } openlog.Debug("get kv from etcd by kvId") wg := sync.WaitGroup{} docs := make([]*model.KVDoc, len(kvIdsLeft)) + var getKvErr error for i, kvID := range kvIdsLeft { wg.Add(1) go func(kvID string, cnt int) { @@ -277,12 +282,14 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe kv, err := etcdadpt.Get(ctx, docKey) if err != nil { openlog.Error(fmt.Sprintf("failed to get kv from etcd, err %v", err)) + getKvErr = err return } doc, err := kc.GetKvDoc(kv) if err != nil { openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err)) + getKvErr = err return } @@ -291,7 +298,10 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe }(kvID, i) } wg.Wait() - return docs + if getKvErr != nil { + return nil, getKvErr + } + return docs, nil } func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool { diff --git a/server/datasource/etcd/kv/kv_cache_test.go b/server/datasource/etcd/kv/kv_cache_test.go index 22866996..56711acf 100644 --- a/server/datasource/etcd/kv/kv_cache_test.go +++ b/server/datasource/etcd/kv/kv_cache_test.go @@ -3,7 +3,7 @@ package kv import ( "testing" - "github.com/little-cui/etcdadpt" + "github.com/go-chassis/etcdadpt" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/api/v3/mvccpb" ) diff --git a/server/datasource/etcd/kv/kv_dao.go b/server/datasource/etcd/kv/kv_dao.go index 2332b586..8d9f63b9 100644 --- a/server/datasource/etcd/kv/kv_dao.go +++ b/server/datasource/etcd/kv/kv_dao.go @@ -24,14 +24,16 @@ import ( "strings" "github.com/go-chassis/cari/sync" + "github.com/go-chassis/etcdadpt" "github.com/go-chassis/openlog" - "github.com/little-cui/etcdadpt" "github.com/apache/servicecomb-kie/pkg/model" "github.com/apache/servicecomb-kie/pkg/util" "github.com/apache/servicecomb-kie/server/datasource" "github.com/apache/servicecomb-kie/server/datasource/auth" "github.com/apache/servicecomb-kie/server/datasource/etcd/key" + + eventbase "github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/key" ) // Dao operate data in mongodb @@ -95,7 +97,7 @@ func txnCreate(ctx context.Context, kv *model.KVDoc) (bool, error) { return false, err } kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.KV(kv.Domain, kv.Project, kv.ID)), etcdadpt.WithValue(kvBytes)) - taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(kv.Domain, kv.Project, task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes)) + taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(eventbase.TaskKey(kv.Domain, kv.Project, task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes)) resp, err := etcdadpt.TxnWithCmp(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut}, etcdadpt.If(etcdadpt.NotExistKey(string(kvOpPut.Key)), etcdadpt.NotExistKey(string(taskOpPut.Key))), nil) if err != nil { @@ -165,7 +167,7 @@ func txnUpdate(ctx context.Context, kv *model.KVDoc) error { return err } kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(keyKV), etcdadpt.WithValue(kvBytes)) - taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(kv.Domain, kv.Project, task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes)) + taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(eventbase.TaskKey(kv.Domain, kv.Project, task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes)) return etcdadpt.Txn(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut}) } @@ -295,9 +297,9 @@ func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*mo return nil, err } kvOpDel := etcdadpt.OpDel(etcdadpt.WithStrKey(kvKey)) - taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(domain, project, + taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(eventbase.TaskKey(domain, project, task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes)) - tombstoneOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TombstoneKey(domain, project, tombstone.ResourceType, tombstone.ResourceID)), etcdadpt.WithValue(tombstoneBytes)) + tombstoneOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(eventbase.TombstoneKey(domain, project, tombstone.ResourceType, tombstone.ResourceID)), etcdadpt.WithValue(tombstoneBytes)) err = etcdadpt.Txn(ctx, []etcdadpt.OpOptions{kvOpDel, taskOpPut, tombstoneOpPut}) if err != nil { openlog.Error("find and delete error", openlog.WithTags(openlog.Tags{ @@ -427,7 +429,7 @@ func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain s openlog.Error("fail to marshal task" + err.Error()) return nil, 0, err } - opOptions = append(opOptions, etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(domain, project, + opOptions = append(opOptions, etcdadpt.OpPut(etcdadpt.WithStrKey(eventbase.TaskKey(domain, project, task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes))) } for _, tombstone := range tombstones { @@ -436,7 +438,7 @@ func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain s openlog.Error("fail to marshal tombstone" + err.Error()) return nil, 0, err } - opOptions = append(opOptions, etcdadpt.OpPut(etcdadpt.WithStrKey(key.TombstoneKey(domain, project, + opOptions = append(opOptions, etcdadpt.OpPut(etcdadpt.WithStrKey(eventbase.TombstoneKey(domain, project, tombstone.ResourceType, tombstone.ResourceID)), etcdadpt.WithValue(tombstoneBytes))) } err := etcdadpt.Txn(ctx, opOptions) @@ -524,15 +526,18 @@ func (s *Dao) listData(ctx context.Context, project, domain string, options ...d } if Enabled() { - result, useCache := Search(ctx, &CacheSearchReq{ + result, useCache, err := Search(ctx, &CacheSearchReq{ Domain: domain, Project: project, Opts: &opts, Regex: regex, }) - if useCache { + if useCache && err == nil { return result, opts, nil } + if useCache && err != nil { + openlog.Error("using cache to search kv failed: " + err.Error()) + } } result, err := matchLabelsSearch(ctx, domain, project, regex, opts) diff --git a/server/datasource/etcd/rbac/rbac.go b/server/datasource/etcd/rbac/rbac.go index 8c1249a6..0e1a529a 100644 --- a/server/datasource/etcd/rbac/rbac.go +++ b/server/datasource/etcd/rbac/rbac.go @@ -23,8 +23,8 @@ import ( "errors" crbac "github.com/go-chassis/cari/rbac" + "github.com/go-chassis/etcdadpt" "github.com/go-chassis/openlog" - "github.com/little-cui/etcdadpt" ) func generateRBACRoleKey(name string) string { diff --git a/server/datasource/etcd/track/polling_detail_dao.go b/server/datasource/etcd/track/polling_detail_dao.go index 58bd1838..7da6407b 100644 --- a/server/datasource/etcd/track/polling_detail_dao.go +++ b/server/datasource/etcd/track/polling_detail_dao.go @@ -21,11 +21,12 @@ import ( "context" "encoding/json" + "github.com/go-chassis/etcdadpt" + "github.com/go-chassis/openlog" + "github.com/apache/servicecomb-kie/pkg/model" "github.com/apache/servicecomb-kie/server/datasource" "github.com/apache/servicecomb-kie/server/datasource/etcd/key" - "github.com/go-chassis/openlog" - "github.com/little-cui/etcdadpt" ) // Dao is the implementation diff --git a/server/datasource/kv_dao_test.go b/server/datasource/kv_dao_test.go index 977944db..6e5fae3d 100644 --- a/server/datasource/kv_dao_test.go +++ b/server/datasource/kv_dao_test.go @@ -23,12 +23,13 @@ import ( "github.com/stretchr/testify/assert" + _ "github.com/apache/servicecomb-kie/test" + "github.com/apache/servicecomb-kie/pkg/common" "github.com/apache/servicecomb-kie/pkg/model" "github.com/apache/servicecomb-kie/server/datasource" kvsvc "github.com/apache/servicecomb-kie/server/service/kv" "github.com/apache/servicecomb-kie/server/service/sync" - "github.com/apache/servicecomb-kie/test" emodel "github.com/apache/servicecomb-service-center/eventbase/model" "github.com/apache/servicecomb-service-center/eventbase/service/task" "github.com/apache/servicecomb-service-center/eventbase/service/tombstone" @@ -109,10 +110,6 @@ func TestList(t *testing.T) { } func TestWithSync(t *testing.T) { - if test.IsEmbeddedetcdMode() { - return - } - t.Run("create kv with sync enabled", func(t *testing.T) { t.Run("creating a kv will create a task should pass", func(t *testing.T) { // set the sync enabled @@ -150,7 +147,7 @@ func TestWithSync(t *testing.T) { Project: "sync-create", ResourceType: datasource.ConfigResource, } - tombstones, tempErr := tombstone.List(ctx, &tbListReq) + tombstones, _ := tombstone.List(ctx, &tbListReq) assert.Equal(t, 1, len(tombstones)) tempErr = tombstone.Delete(ctx, tombstones...) assert.Nil(t, tempErr) @@ -222,7 +219,7 @@ func TestWithSync(t *testing.T) { Project: "sync-update", ResourceType: datasource.ConfigResource, } - tombstones, tempErr := tombstone.List(ctx, &tbListReq) + tombstones, _ := tombstone.List(ctx, &tbListReq) assert.Equal(t, 2, len(tombstones)) tempErr = tombstone.Delete(ctx, tombstones...) assert.Nil(t, tempErr) diff --git a/server/datasource/local/counter/revision.go b/server/datasource/local/counter/revision.go new file mode 100644 index 00000000..756b79f3 --- /dev/null +++ b/server/datasource/local/counter/revision.go @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package counter + +import ( + "context" + "os" + "path" + "strconv" + + "github.com/apache/servicecomb-kie/server/datasource/local/file" + "github.com/go-chassis/openlog" +) + +// Dao is the implementation +type Dao struct { +} + +// GetRevision return current revision number +func (s *Dao) GetRevision(ctx context.Context, domain string) (int64, error) { + revisionPath := path.Join(file.FileRootPath, domain, "revision") + + revisionByte, err := file.ReadFile(revisionPath) + + if err != nil { + if os.IsNotExist(err) { + return 0, nil + } + openlog.Error("get error: " + err.Error()) + return 0, err + } + if revisionByte == nil || string(revisionByte) == "" { + return 0, nil + } + + revisionNum, err := strconv.Atoi(string(revisionByte)) + if err != nil { + return 0, err + } + return int64(revisionNum), nil +} + +// ApplyRevision increase revision number and return modified value +func (s *Dao) ApplyRevision(ctx context.Context, domain string) (int64, error) { + currentRevisionNum, err := s.GetRevision(ctx, domain) + if err != nil { + return 0, err + } + err = file.CreateOrUpdateFile(path.Join(file.FileRootPath, domain, "revision"), []byte(strconv.Itoa(int(currentRevisionNum+1))), &[]file.FileDoRecord{}, false) + if err != nil { + return 0, err + } + return currentRevisionNum + 1, nil +} diff --git a/server/datasource/local/file/fileprocess.go b/server/datasource/local/file/fileprocess.go new file mode 100644 index 00000000..30fa486e --- /dev/null +++ b/server/datasource/local/file/fileprocess.go @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package file + +import ( + "io/fs" + "os" + "path" + "path/filepath" + "strings" + "sync" + + log "github.com/go-chassis/openlog" +) + +var FileRootPath = "/data/kvs" + +var NewstKVFile = "newest_version.json" + +var MutexMap = make(map[string]*sync.RWMutex) +var mutexMapLock = &sync.Mutex{} +var rollbackMutexLock = &sync.Mutex{} +var createDirMutexLock = &sync.Mutex{} + +type SchemaDAO struct{} + +type FileDoRecord struct { + filepath string + content []byte +} + +func GetOrCreateMutex(path string) *sync.RWMutex { + mutexMapLock.Lock() + mutex, ok := MutexMap[path] + if !ok { + mutex = &sync.RWMutex{} + MutexMap[path] = mutex + } + mutexMapLock.Unlock() + + return mutex +} + +func ExistDir(path string) error { + _, err := os.ReadDir(path) + if err != nil { + // create the dir if not exist + if os.IsNotExist(err) { + createDirMutexLock.Lock() + defer createDirMutexLock.Unlock() + err = os.MkdirAll(path, fs.ModePerm) + if err != nil { + log.Error("failed to make dir: " + path + " " + err.Error()) + + return err + } + return nil + } + log.Error("failed to read dir: " + path + " " + err.Error()) + return err + } + + return nil +} + +func MoveDir(srcDir string, dstDir string) (err error) { + srcMutex := GetOrCreateMutex(srcDir) + dstMutex := GetOrCreateMutex(dstDir) + srcMutex.Lock() + dstMutex.Lock() + defer srcMutex.Unlock() + defer dstMutex.Unlock() + + var movedFiles []string + files, err := os.ReadDir(srcDir) + if err != nil { + log.Error("move schema files failed " + err.Error()) + return err + } + for _, file := range files { + err = ExistDir(dstDir) + if err != nil { + log.Error("move schema files failed " + err.Error()) + return err + } + srcFile := filepath.Join(srcDir, file.Name()) + dstFile := filepath.Join(dstDir, file.Name()) + err = os.Rename(srcFile, dstFile) + if err != nil { + log.Error("move schema files failed " + err.Error()) + break + } + movedFiles = append(movedFiles, file.Name()) + } + + if err != nil { + log.Error("Occur error when move schema files, begain rollback... " + err.Error()) + for _, fileName := range movedFiles { + srcFile := filepath.Join(srcDir, fileName) + dstFile := filepath.Join(dstDir, fileName) + err = os.Rename(dstFile, srcFile) + if err != nil { + log.Error("rollback move schema files failed and continue" + err.Error()) + } + } + } + return err +} + +func CreateOrUpdateFile(filepath string, content []byte, rollbackOperations *[]FileDoRecord, isRollback bool) error { + err := ExistDir(path.Dir(filepath)) + + if !isRollback { + mutex := GetOrCreateMutex(path.Dir(filepath)) + mutex.Lock() + defer mutex.Unlock() + } + + if err != nil { + log.Error("failed to build new schema file dir " + filepath + ", " + err.Error()) + return err + } + + fileExist := true + _, err = os.Stat(filepath) + if err != nil { + fileExist = false + } + + if fileExist { + oldcontent, err := os.ReadFile(filepath) + if err != nil { + log.Error("failed to read content to file " + filepath + ", " + err.Error()) + return err + } + *rollbackOperations = append(*rollbackOperations, FileDoRecord{filepath: filepath, content: oldcontent}) + } else { + *rollbackOperations = append(*rollbackOperations, FileDoRecord{filepath: filepath, content: nil}) + } + + err = os.WriteFile(filepath, content, 0600) + if err != nil { + log.Error("failed to create file " + filepath + ", " + err.Error()) + return err + } + return nil +} + +func DeleteFile(filepath string, rollbackOperations *[]FileDoRecord) error { + _, err := os.Stat(filepath) + if err != nil { + log.Error("file does not exist when deleting file " + filepath + ", " + err.Error()) + return nil + } + + oldcontent, err := os.ReadFile(filepath) + if err != nil { + log.Error("failed to read content to file " + filepath + ", " + err.Error()) + return err + } + + *rollbackOperations = append(*rollbackOperations, FileDoRecord{filepath: filepath, content: oldcontent}) + + err = os.Remove(filepath) + if err != nil { + log.Error("failed to delete file " + filepath + ", " + err.Error()) + return err + } + return nil +} + +func CleanDir(dir string) error { + mutex := GetOrCreateMutex(dir) + mutex.Lock() + defer delete(MutexMap, dir) + defer mutex.Unlock() + + rollbackOperations := []FileDoRecord{} + _, err := os.Stat(dir) + if err != nil { + return nil + } + + files, err := os.ReadDir(dir) + if err != nil { + return nil + } + + for _, file := range files { + if file.IsDir() { + continue + } + filepath := filepath.Join(dir, file.Name()) + err = DeleteFile(filepath, &rollbackOperations) + if err != nil { + break + } + } + + if err != nil { + log.Error("Occur error when create schema files, begain rollback... " + err.Error()) + Rollback(rollbackOperations) + return err + } + + err = os.Remove(dir) + if err != nil { + log.Error("OOccur error when remove service schema dir, begain rollback... " + err.Error()) + Rollback(rollbackOperations) + return err + } + + return nil +} + +func ReadFile(filepath string) ([]byte, error) { + // check the file is empty + mutex := GetOrCreateMutex(path.Dir(filepath)) + mutex.RLocker() + defer mutex.RLocker() + + content, err := os.ReadFile(filepath) + if err != nil { + log.Error("failed to read content to file " + filepath + ", " + err.Error()) + return nil, err + } + return content, nil +} + +func CountInDomain(dir string) (int, error) { + mutex := GetOrCreateMutex(dir) + mutex.RLock() + defer mutex.RUnlock() + + files, err := os.ReadDir(dir) + if err != nil { + log.Error("failed to read directory " + dir + ", " + err.Error()) + return 0, err + } + + count := 0 + for _, projectFolder := range files { + if projectFolder.IsDir() { + count++ + } + } + // count kv numbers + return count, nil +} + +func ReadAllKvsFromProjectFolder(dir string) ([][]byte, error) { + var kvs [][]byte + + kvDir, err := os.ReadDir(dir) + if err != nil { + log.Error("failed to read directory " + dir + ", " + err.Error()) + return nil, err + } + + for _, file := range kvDir { + if file.IsDir() { + filepath := path.Join(dir, file.Name(), NewstKVFile) + content, err := ReadFile(filepath) + if err != nil { + log.Error("failed to read content to file " + filepath + ", " + err.Error()) + return nil, err + } + kvs = append(kvs, content) + } + } + return kvs, nil +} + +func ReadAllFiles(dir string) ([]string, [][]byte, error) { + mutex := GetOrCreateMutex(dir) + mutex.RLock() + defer mutex.RUnlock() + + files := []string{} + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + if !strings.Contains(path, NewstKVFile) { + files = append(files, path) + } + return nil + }) + + if err != nil { + return nil, nil, err + } + + var contentArray [][]byte + + for _, file := range files { + content, err := os.ReadFile(file) + if err != nil { + log.Error("failed to read content from schema file " + file + ", " + err.Error()) + return nil, nil, err + } + contentArray = append(contentArray, content) + } + return files, contentArray, nil +} + +func Rollback(rollbackOperations []FileDoRecord) { + rollbackMutexLock.Lock() + defer rollbackMutexLock.Unlock() + + var err error + for _, fileOperation := range rollbackOperations { + if fileOperation.content == nil { + err = DeleteFile(fileOperation.filepath, &[]FileDoRecord{}) + } else { + err = CreateOrUpdateFile(fileOperation.filepath, fileOperation.content, &[]FileDoRecord{}, true) + } + if err != nil { + log.Error("Occur error when rolling back schema files: " + err.Error()) + } + } +} diff --git a/server/datasource/local/history/history_dao.go b/server/datasource/local/history/history_dao.go new file mode 100644 index 00000000..5766beb8 --- /dev/null +++ b/server/datasource/local/history/history_dao.go @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package history + +import ( + "context" + "encoding/json" + "path" + "strconv" + + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/apache/servicecomb-kie/server/datasource/auth" + "github.com/apache/servicecomb-kie/server/datasource/local/file" + "github.com/go-chassis/openlog" +) + +// Dao is the implementation +type Dao struct { +} + +// GetHistory get all history by label id +func (s *Dao) GetHistory(ctx context.Context, kvID, project, domain string, options ...datasource.FindOption) (*model.KVResponse, error) { + kvreq := &model.GetKVRequest{ + Domain: domain, + Project: project, + ID: kvID, + } + kvdoc, err := datasource.GetBroker().GetKVDao().Get(ctx, kvreq) + if err != nil { + return nil, err + } + if err := auth.CheckGetKV(ctx, kvdoc); err != nil { + return nil, err + } + + opts := datasource.FindOptions{} + for _, o := range options { + o(&opts) + } + kvFolderPath := path.Join(file.FileRootPath, domain, project, kvID) + _, kvs, err := file.ReadAllFiles(kvFolderPath) + + if err != nil { + openlog.Error(err.Error()) + return nil, err + } + histories := make([]*model.KVDoc, 0, len(kvs)) + for _, kv := range kvs { + var doc model.KVDoc + err := json.Unmarshal(kv, &doc) + if err != nil { + openlog.Error("decode error: " + err.Error()) + continue + } + histories = append(histories, &doc) + } + return &model.KVResponse{ + Data: pagingResult(histories, opts.Offset, opts.Limit), + Total: len(kvs), + }, nil +} + +func pagingResult(histories []*model.KVDoc, offset, limit int64) []*model.KVDoc { + total := int64(len(histories)) + if limit != 0 && offset >= total { + return []*model.KVDoc{} + } + + datasource.ReverseByPriorityAndUpdateRev(histories) + + if limit == 0 { + return histories + } + end := offset + limit + if end > total { + end = total + } + return histories[offset:end] +} + +// AddHistory add kv history +func (s *Dao) AddHistory(ctx context.Context, kv *model.KVDoc) error { + err := s.historyRotate(ctx, kv.ID, kv.Project, kv.Domain) + return err +} + +// DelayDeletionTime add delete time to all revisions of the kv, +// thus these revisions will be automatically deleted by TTL index. +// TODO support delay deletion +func (s *Dao) DelayDeletionTime(ctx context.Context, kvIDs []string, project, domain string) error { + // history have been deleted in function and in kv_dao.go + return nil +} + +// historyRotate delete historical versions for a key that exceeds the limited number +func (s *Dao) historyRotate(ctx context.Context, kvID, project, domain string) error { + resp, err := s.GetHistory(ctx, kvID, project, domain) + if err != nil { + openlog.Error(err.Error()) + return err + } + if resp.Total <= datasource.MaxHistoryNum { + return nil + } + kvs := resp.Data + kvs = kvs[datasource.MaxHistoryNum:] + + mutex := file.GetOrCreateMutex(path.Join(file.FileRootPath, domain, project, kvID)) + mutex.Lock() + defer mutex.Unlock() + + for _, kv := range kvs { + revision := kv.UpdateRevision + revisionFilePath := path.Join(file.FileRootPath, domain, project, kvID, strconv.FormatInt(revision, 10)+".json") + + err = file.DeleteFile(revisionFilePath, &[]file.FileDoRecord{}) + if err != nil { + openlog.Error(err.Error()) + return err + } + } + return nil +} diff --git a/server/datasource/local/init.go b/server/datasource/local/init.go new file mode 100644 index 00000000..e78c1f32 --- /dev/null +++ b/server/datasource/local/init.go @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package local + +import ( + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/apache/servicecomb-kie/server/datasource/local/counter" + "github.com/apache/servicecomb-kie/server/datasource/local/history" + "github.com/apache/servicecomb-kie/server/datasource/local/kv" + "github.com/apache/servicecomb-kie/server/datasource/local/rbac" + "github.com/apache/servicecomb-kie/server/datasource/local/track" + rbacdao "github.com/apache/servicecomb-kie/server/datasource/rbac" +) + +type Broker struct { +} + +func NewFrom(c *datasource.Config) (datasource.Broker, error) { + kv.Init() + return &Broker{}, nil +} +func (*Broker) GetRevisionDao() datasource.RevisionDao { + return &counter.Dao{} +} +func (*Broker) GetKVDao() datasource.KVDao { + return &kv.Dao{} +} +func (*Broker) GetHistoryDao() datasource.HistoryDao { + return &history.Dao{} +} +func (*Broker) GetTrackDao() datasource.TrackDao { + return &track.Dao{} +} +func (*Broker) GetRbacDao() rbacdao.Dao { + return &rbac.Dao{} +} + +func init() { + datasource.RegisterPlugin("etcd_with_localstorage", NewFrom) + datasource.RegisterPlugin("embedded_etcd_with_localstorage", NewFrom) +} diff --git a/server/datasource/local/kv/kv_cache.go b/server/datasource/local/kv/kv_cache.go new file mode 100644 index 00000000..3eb843b0 --- /dev/null +++ b/server/datasource/local/kv/kv_cache.go @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kv + +import ( + "context" + "encoding/json" + "fmt" + "regexp" + "strings" + "sync" + "time" + + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/pkg/stringutil" + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/go-chassis/openlog" + goCache "github.com/patrickmn/go-cache" + "go.etcd.io/etcd/api/v3/mvccpb" +) + +type IDSet map[string]struct{} + +func Init() { + kvCache = NewKvCache() + go kvCache.Refresh(context.Background()) +} + +const ( + cacheExpirationTime = 10 * time.Minute + cacheCleanupInterval = 11 * time.Minute + backOffMinInterval = 5 * time.Second +) + +var kvCache *Cache + +type CacheSearchReq struct { + Domain string + Project string + Opts *datasource.FindOptions + Regex *regexp.Regexp +} + +type Cache struct { + revision int64 + kvIDCache sync.Map + kvDocCache *goCache.Cache +} + +func NewKvCache() *Cache { + kvDocCache := goCache.New(cacheExpirationTime, cacheCleanupInterval) + return &Cache{ + revision: 0, + kvDocCache: kvDocCache, + } +} + +func (kc *Cache) Refresh(ctx context.Context) { + openlog.Info("start to list and watch") + + timer := time.NewTimer(backOffMinInterval) + defer timer.Stop() + for { + nextPeriod := backOffMinInterval + select { + case <-ctx.Done(): + openlog.Info("stop to list and watch") + return + case <-timer.C: + timer.Reset(nextPeriod) + } + } +} + +func (kc *Cache) GetKvDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) { + kvDoc := &model.KVDoc{} + err := json.Unmarshal(kv.Value, kvDoc) + if err != nil { + return nil, err + } + return kvDoc, nil +} + +func (kc *Cache) GetCacheKey(domain, project string, labels map[string]string) string { + labelFormat := stringutil.FormatMap(labels) + inputKey := strings.Join([]string{ + "", + domain, + project, + labelFormat, + }, "/") + return inputKey +} + +func (kc *Cache) StoreKvDoc(kvID string, kvDoc *model.KVDoc) { + kc.kvDocCache.SetDefault(kvID, kvDoc) +} + +func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) { + kc.kvIDCache.Store(cacheKey, kvIds) +} + +func (kc *Cache) DeleteKvDoc(kvID string) { + kc.kvDocCache.Delete(kvID) +} + +func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) { + val, ok := kc.kvIDCache.Load(cacheKey) + if !ok { + return nil, false + } + kvIds, ok := val.(IDSet) + if !ok { + return nil, false + } + return kvIds, true +} + +func (kc *Cache) LoadKvDoc(kvID string) (*model.KVDoc, bool) { + val, ok := kc.kvDocCache.Get(kvID) + if !ok { + return nil, false + } + doc, ok := val.(*model.KVDoc) + if !ok { + return nil, false + } + return doc, true +} + +func (kc *Cache) CachePut(kvs []*model.KVDoc) { + for _, kvDoc := range kvs { + kc.StoreKvDoc(kvDoc.ID, kvDoc) + cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels) + m, ok := kc.LoadKvIDSet(cacheKey) + if !ok { + kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}}) + openlog.Info("cacheKey " + cacheKey + "not exists") + continue + } + m[kvDoc.ID] = struct{}{} + } +} + +func (kc *Cache) CacheDelete(kvs []*model.KVDoc) { + for _, kvDoc := range kvs { + kc.DeleteKvDoc(kvDoc.ID) + cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels) + m, ok := kc.LoadKvIDSet(cacheKey) + if !ok { + openlog.Error("cacheKey " + cacheKey + "not exists") + continue + } + delete(m, kvDoc.ID) + } +} + +func Search(req *CacheSearchReq) (*model.KVResponse, bool, []string) { + if !req.Opts.ExactLabels { + return nil, false, nil + } + + openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts)) + result := &model.KVResponse{ + Data: []*model.KVDoc{}, + } + cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels) + kvIds, ok := kvCache.LoadKvIDSet(cacheKey) + if !ok { + kvCache.StoreKvIDSet(cacheKey, IDSet{}) + return result, true, nil + } + + var docs []*model.KVDoc + + var kvIdsInCache []string + for kvID := range kvIds { + if doc, ok := kvCache.LoadKvDoc(kvID); ok { + docs = append(docs, doc) + kvIdsInCache = append(kvIdsInCache, kvID) + continue + } + } + + for _, doc := range docs { + if isMatch(req, doc) { + bytes, _ := json.Marshal(doc) + var docDeepCopy model.KVDoc + err := json.Unmarshal(bytes, &docDeepCopy) + if err != nil { + return nil, false, nil + } + datasource.ClearPart(&docDeepCopy) + result.Data = append(result.Data, &docDeepCopy) + } + } + result.Total = len(result.Data) + return result, true, kvIdsInCache +} + +func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool { + if doc == nil { + return false + } + if req.Opts.Status != "" && doc.Status != req.Opts.Status { + return false + } + if req.Regex != nil && !req.Regex.MatchString(doc.Key) { + return false + } + if req.Opts.Value != "" && !strings.Contains(doc.Value, req.Opts.Value) { + return false + } + return true +} diff --git a/server/datasource/local/kv/kv_dao.go b/server/datasource/local/kv/kv_dao.go new file mode 100644 index 00000000..f16ec090 --- /dev/null +++ b/server/datasource/local/kv/kv_dao.go @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kv + +import ( + "context" + "encoding/json" + "os" + "path" + "regexp" + "strconv" + "strings" + + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/pkg/util" + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/apache/servicecomb-kie/server/datasource/auth" + "github.com/apache/servicecomb-kie/server/datasource/local/file" + "github.com/go-chassis/openlog" +) + +// Dao operate data in local +type Dao struct { +} + +func (s *Dao) Create(ctx context.Context, kv *model.KVDoc, options ...datasource.WriteOption) (*model.KVDoc, error) { + if err := auth.CheckCreateKV(ctx, kv); err != nil { + return nil, err + } + + err := create(kv) + if err != nil { + openlog.Error("create error", openlog.WithTags(openlog.Tags{ + "err": err.Error(), + "kv": kv, + })) + return nil, err + } + kvCache.CachePut([]*model.KVDoc{kv}) + return kv, nil +} + +func create(kv *model.KVDoc) (err error) { + data, _ := json.Marshal(&kv) + rollbackOperations := []file.FileDoRecord{} + + defer func() { + if err != nil { + file.Rollback(rollbackOperations) + } + }() + + err = file.CreateOrUpdateFile(path.Join(file.FileRootPath, kv.Domain, kv.Project, kv.ID, strconv.FormatInt(kv.UpdateRevision, 10)+".json"), data, &rollbackOperations, false) + if err != nil { + return err + } + + err = file.CreateOrUpdateFile(path.Join(file.FileRootPath, kv.Domain, kv.Project, kv.ID, file.NewstKVFile), data, &rollbackOperations, false) + return err +} + +// Update update key value +func (s *Dao) Update(ctx context.Context, kv *model.KVDoc, options ...datasource.WriteOption) error { + kvpath := path.Join(file.FileRootPath, kv.Domain, kv.Project, kv.ID, file.NewstKVFile) + kvInfo, err := file.ReadFile(kvpath) + if err != nil { + openlog.Error(err.Error()) + return err + } + if kvInfo == nil { + return datasource.ErrKeyNotExists + } + var oldKV model.KVDoc + err = json.Unmarshal(kvInfo, &oldKV) + if err != nil { + openlog.Error(err.Error()) + return err + } + + if err := auth.CheckUpdateKV(ctx, &oldKV); err != nil { + return err + } + + oldKV.LabelFormat = kv.LabelFormat + oldKV.Value = kv.Value + oldKV.Status = kv.Status + oldKV.Checker = kv.Checker + oldKV.UpdateTime = kv.UpdateTime + oldKV.UpdateRevision = kv.UpdateRevision + + err = create(kv) + if err != nil { + openlog.Error(err.Error()) + return err + } + + kvCache.CachePut([]*model.KVDoc{kv}) + return nil +} + +// Extract key values +func getValue(str string) string { + rex := regexp.MustCompile(`\(([^)]+)\)`) + res := rex.FindStringSubmatch(str) + return res[len(res)-1] +} + +// Exist supports you query a key value by label map or labels id +func (s *Dao) Exist(ctx context.Context, key, project, domain string, options ...datasource.FindOption) (bool, error) { + opts := datasource.FindOptions{Key: key} + for _, o := range options { + o(&opts) + } + kvs, err := s.listNoAuth(ctx, project, domain, + datasource.WithExactLabels(), + datasource.WithLabels(opts.Labels), + datasource.WithLabelFormat(opts.LabelFormat), + datasource.WithKey(key), + datasource.WithCaseSensitive()) + if err != nil { + openlog.Error("check kv exist: " + err.Error()) + return false, err + } + if IsUniqueFind(opts) && len(kvs.Data) == 0 { + return false, nil + } + if len(kvs.Data) != 1 { + return false, datasource.ErrTooMany + } + return true, nil +} + +func (s *Dao) GetByKey(ctx context.Context, key, project, domain string, options ...datasource.FindOption) ([]*model.KVDoc, error) { + opts := datasource.FindOptions{Key: key} + for _, o := range options { + o(&opts) + } + kvs, err := s.listNoAuth(ctx, project, domain, + datasource.WithExactLabels(), + datasource.WithLabels(opts.Labels), + datasource.WithLabelFormat(opts.LabelFormat), + datasource.WithKey(key), + datasource.WithCaseSensitive()) + if err != nil { + openlog.Error("check kv exist: " + err.Error()) + return nil, err + } + if IsUniqueFind(opts) && len(kvs.Data) == 0 { + return nil, datasource.ErrKeyNotExists + } + if len(kvs.Data) != 1 { + return nil, datasource.ErrTooMany + } + return kvs.Data, nil +} + +// FindOneAndDelete deletes one kv by id and return the deleted kv as these appeared before deletion +// domain=tenant +func (s *Dao) FindOneAndDelete(ctx context.Context, kvID, project, domain string, options ...datasource.WriteOption) (*model.KVDoc, error) { + kvDoc := model.KVDoc{} + kvpath := path.Join(file.FileRootPath, domain, project, kvID, file.NewstKVFile) + kvFolderPath := path.Join(file.FileRootPath, domain, project, kvID) + kvTmpFolderPath := path.Join(file.FileRootPath, "tmp", domain, project, kvID) + + kvInfo, err := file.ReadFile(kvpath) + if err != nil { + return nil, err + } + + if kvInfo == nil { + return nil, datasource.ErrKeyNotExists + } + + err = file.MoveDir(kvFolderPath, kvTmpFolderPath) + + if err != nil { + openlog.Error("delete Key error: " + err.Error()) + return nil, err + } + + err = json.Unmarshal(kvInfo, &kvDoc) + if err != nil { + openlog.Error("decode error: " + err.Error()) + moveDirErr := file.MoveDir(kvTmpFolderPath, kvFolderPath) + if moveDirErr != nil { + openlog.Error("rollback error when delete kv: " + err.Error()) + } + return nil, err + } + err = file.CleanDir(kvTmpFolderPath) + if err != nil { + openlog.Warn("clean tmp dir error when delete kv: " + err.Error()) + } + err = file.CleanDir(kvFolderPath) + if err != nil { + openlog.Warn("clean dir error when delete kv: " + err.Error()) + } + // delete Cache + kvCache.CacheDelete([]*model.KVDoc{&kvDoc}) + return &kvDoc, nil +} + +// FindManyAndDelete deletes multiple kvs and return the deleted kv list as these appeared before deletion +func (s *Dao) FindManyAndDelete(ctx context.Context, kvIDs []string, project, domain string, options ...datasource.WriteOption) ([]*model.KVDoc, int64, error) { + var docs []*model.KVDoc + var removedIds []string + kvParentPath := path.Join(file.FileRootPath, domain, project) + kvTmpParentPath := path.Join(file.FileRootPath, "tmp", domain, project) + var err error + + defer func() { + if err != nil { + for _, id := range removedIds { + err = file.MoveDir(path.Join(kvTmpParentPath, id), path.Join(kvParentPath, id)) + if err != nil { + openlog.Warn("move tmp dir to real dir error when delete many kvs: " + err.Error()) + } + err = file.CleanDir(path.Join(kvTmpParentPath, id)) + if err != nil { + openlog.Warn("clean tmp dir error when delete many kvs: " + err.Error()) + } + } + } else { + for _, id := range removedIds { + err = file.CleanDir(path.Join(kvTmpParentPath, id)) + if err != nil { + openlog.Warn("clean tmp dir error when delete many kvs: " + err.Error()) + } + err = file.CleanDir(path.Join(kvParentPath, id)) + if err != nil { + openlog.Warn("clean real dir error when delete many kvs: " + err.Error()) + } + } + } + }() + + for _, id := range kvIDs { + kvPath := path.Join(kvParentPath, id, file.NewstKVFile) + kvInfo, kvErr := getKVDoc(kvPath) + err = kvErr + if err != nil { + return nil, 0, err + } + docs = append(docs, kvInfo) + + err = file.MoveDir(path.Join(kvParentPath, id), path.Join(kvTmpParentPath, id)) + if err != nil { + return nil, 0, err + } else { + removedIds = append(removedIds, id) + } + } + + if len(docs) == 0 { + return nil, 0, datasource.ErrKeyNotExists + } + kvCache.CacheDelete(docs) + return docs, int64(len(docs)), nil +} + +// Get get kv by kv id +func (s *Dao) Get(ctx context.Context, req *model.GetKVRequest) (*model.KVDoc, error) { + kvpath := path.Join(file.FileRootPath, req.Domain, req.Project, req.ID, file.NewstKVFile) + curKV, err := getKVDoc(kvpath) + if err != nil { + return nil, err + } + if err := auth.CheckGetKV(ctx, curKV); err != nil { + return nil, err + } + return curKV, nil +} + +func getKVDoc(kvpath string) (*model.KVDoc, error) { + kvInfo, err := file.ReadFile(kvpath) + if err != nil { + openlog.Error(err.Error()) + return nil, err + } + if kvInfo == nil { + return nil, datasource.ErrKeyNotExists + } + curKV := &model.KVDoc{} + err = json.Unmarshal(kvInfo, curKV) + if err != nil { + openlog.Error("decode error: " + err.Error()) + return nil, err + } + return curKV, nil +} + +func (s *Dao) Total(ctx context.Context, project, domain string) (int64, error) { + kvParentPath := path.Join(file.FileRootPath, domain, project) + total, err := file.CountInDomain(kvParentPath) + + if err != nil { + if os.IsNotExist(err) { + return 0, nil + } + openlog.Error("find total number: " + err.Error()) + return 0, err + } + return int64(total), nil +} + +// List get kv list by key and criteria +func (s *Dao) List(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, error) { + result, opts, err := s.listData(ctx, project, domain, options...) + if err != nil { + return nil, err + } + + filterKVs, err := auth.FilterKVList(ctx, result.Data) + if err != nil { + return nil, err + } + + result.Data = filterKVs + result.Total = len(filterKVs) + + return pagingResult(result, opts), nil +} + +func (s *Dao) listNoAuth(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, error) { + result, opts, err := s.listData(ctx, project, domain, options...) + if err != nil { + return nil, err + } + + return pagingResult(result, opts), nil +} + +// List get kv list by key and criteria +func (s *Dao) listData(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, datasource.FindOptions, error) { + opts := datasource.NewDefaultFindOpts() + for _, o := range options { + o(&opts) + } + ctx, cancel := context.WithTimeout(ctx, opts.Timeout) + defer cancel() + + regex, err := toRegex(opts) + if err != nil { + return nil, opts, err + } + + resultInCache, useCache, kvIdsInCache := Search(&CacheSearchReq{ + Domain: domain, + Project: project, + Opts: &opts, + Regex: regex, + }) + if useCache { + openlog.Info("Use Cache Find Success") + } + + result, err := matchLabelsSearchLocally(ctx, domain, project, regex, opts, kvIdsInCache) + if err != nil { + if os.IsNotExist(err) { + return &model.KVResponse{ + Data: []*model.KVDoc{}, + }, opts, nil + } + openlog.Error("list kv failed: " + err.Error()) + return nil, opts, err + } + + if resultInCache != nil { + result.Data = append(result.Data, resultInCache.Data...) + result.Total += len(resultInCache.Data) + } + return result, opts, nil +} + +func matchLabelsSearchLocally(ctx context.Context, domain, project string, regex *regexp.Regexp, opts datasource.FindOptions, kvIdsInCache []string) (*model.KVResponse, error) { + openlog.Debug("using labels to search kv") + kvParentPath := path.Join(file.FileRootPath, domain, project) + kvs, err := file.ReadAllKvsFromProjectFolder(kvParentPath) + if err != nil { + return nil, err + } + result := &model.KVResponse{ + Data: []*model.KVDoc{}, + } + var docs []*model.KVDoc + for _, kv := range kvs { + var doc model.KVDoc + err := json.Unmarshal(kv, &doc) + if err != nil { + openlog.Error("decode to KVList error: " + err.Error()) + continue + } + var exist = false + for _, v := range kvIdsInCache { + if v == doc.ID { + exist = true + break + } + } + if exist { + continue + } + + if !filterMatch(&doc, opts, regex) { + continue + } + bytes, _ := json.Marshal(doc) + var docDeepCopy model.KVDoc + err = json.Unmarshal(bytes, &docDeepCopy) + if err != nil { + openlog.Error("decode to KVList error: " + err.Error()) + continue + } + docs = append(docs, &docDeepCopy) + datasource.ClearPart(&doc) + result.Data = append(result.Data, &doc) + result.Total++ + + if IsUniqueFind(opts) { + break + } + } + kvCache.CachePut(docs) + + return result, nil +} + +func IsUniqueFind(opts datasource.FindOptions) bool { + return opts.LabelFormat != "" && opts.Key != "" +} + +func toRegex(opts datasource.FindOptions) (*regexp.Regexp, error) { + var value string + if opts.Key == "" { + return nil, nil + } + switch { + case strings.HasPrefix(opts.Key, "beginWith("): + value = strings.ReplaceAll(getValue(opts.Key), ".", "\\.") + ".*" + case strings.HasPrefix(opts.Key, "wildcard("): + value = strings.ReplaceAll(getValue(opts.Key), ".", "\\.") + value = strings.ReplaceAll(value, "*", ".*") + default: + value = strings.ReplaceAll(opts.Key, ".", "\\.") + } + value = "^" + value + "$" + if !opts.CaseSensitive { + value = "(?i)" + value + } + regex, err := regexp.Compile(value) + if err != nil { + openlog.Error("invalid wildcard expr: " + value + ", error: " + err.Error()) + return nil, err + } + return regex, nil +} + +func pagingResult(result *model.KVResponse, opts datasource.FindOptions) *model.KVResponse { + datasource.ReverseByPriorityAndUpdateRev(result.Data) + + if opts.Limit == 0 { + return result + } + total := int64(result.Total) + if opts.Offset >= total { + result.Data = []*model.KVDoc{} + return result + } + end := opts.Offset + opts.Limit + if end > total { + end = total + } + result.Data = result.Data[opts.Offset:end] + return result +} + +func filterMatch(doc *model.KVDoc, opts datasource.FindOptions, regex *regexp.Regexp) bool { + if opts.Status != "" && doc.Status != opts.Status { + return false + } + if regex != nil && !regex.MatchString(doc.Key) { + return false + } + if len(opts.Labels) != 0 { + if opts.ExactLabels && !util.IsEquivalentLabel(opts.Labels, doc.Labels) { + return false + } + if !opts.ExactLabels && !util.IsContainLabel(doc.Labels, opts.Labels) { + return false + } + } + if opts.LabelFormat != "" && doc.LabelFormat != opts.LabelFormat { + return false + } + if opts.Value != "" && !strings.Contains(doc.Value, opts.Value) { + return false + } + return true +} diff --git a/server/datasource/local/rbac/rbac.go b/server/datasource/local/rbac/rbac.go new file mode 100644 index 00000000..0e1a529a --- /dev/null +++ b/server/datasource/local/rbac/rbac.go @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rbac + +import ( + "context" + "encoding/json" + "errors" + + crbac "github.com/go-chassis/cari/rbac" + "github.com/go-chassis/etcdadpt" + "github.com/go-chassis/openlog" +) + +func generateRBACRoleKey(name string) string { + return "/cse-sr/roles/" + name +} + +func generateRBACAccountKey(name string) string { + return "/cse-sr/accounts/" + name +} + +type Dao struct { +} + +func (re *Dao) GetRole(ctx context.Context, name string) (*crbac.Role, error) { + kv, err := etcdadpt.Get(ctx, generateRBACRoleKey(name)) + if err != nil { + return nil, err + } + if kv == nil { + return nil, errors.New("role not exist") + } + role := &crbac.Role{} + err = json.Unmarshal(kv.Value, role) + if err != nil { + openlog.Error("role info format invalid", openlog.WithErr(err)) + return nil, err + } + return role, nil +} + +func (re *Dao) AccountExist(ctx context.Context, name string) (bool, error) { + return etcdadpt.Exist(ctx, generateRBACAccountKey(name)) +} diff --git a/server/datasource/local/track/polling_detail_dao.go b/server/datasource/local/track/polling_detail_dao.go new file mode 100644 index 00000000..eab584bb --- /dev/null +++ b/server/datasource/local/track/polling_detail_dao.go @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package track + +import ( + "context" + "encoding/json" + "os" + "path" + + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/apache/servicecomb-kie/server/datasource/local/file" + "github.com/go-chassis/openlog" +) + +// Dao is the implementation +type Dao struct { +} + +// CreateOrUpdate create a record or update exist record +// If revision and session_id exists then update else insert +func (s *Dao) CreateOrUpdate(ctx context.Context, detail *model.PollingDetail) (*model.PollingDetail, error) { + bytes, err := json.Marshal(detail) + if err != nil { + openlog.Error("encode polling detail error: " + err.Error()) + return nil, err + } + + revision := "default" + if detail.Revision != "" { + revision = detail.Revision + } + trackPath := path.Join(file.FileRootPath, "track", detail.Domain, detail.Project, revision, detail.SessionID+".json") + + err = file.CreateOrUpdateFile(trackPath, bytes, &[]file.FileDoRecord{}, false) + if err != nil { + openlog.Error(err.Error()) + return nil, err + } + return detail, nil +} + +// GetPollingDetail is to get a track data +func (s *Dao) GetPollingDetail(ctx context.Context, detail *model.PollingDetail) ([]*model.PollingDetail, error) { + trackFolderPath := path.Join(file.FileRootPath, "track", detail.Domain, detail.Project) + _, kvs, err := file.ReadAllFiles(trackFolderPath) + if err != nil { + if os.IsNotExist(err) { + return make([]*model.PollingDetail, 0), nil + } + openlog.Error(err.Error()) + return nil, err + } + + records := make([]*model.PollingDetail, 0, len(kvs)) + for _, kv := range kvs { + var doc model.PollingDetail + err := json.Unmarshal(kv, &doc) + if err != nil { + openlog.Error("decode polling detail error: " + err.Error()) + continue + } + if detail.SessionID != "" && doc.SessionID != detail.SessionID { + continue + } + if detail.IP != "" && doc.IP != detail.IP { + continue + } + if detail.UserAgent != "" && doc.UserAgent != detail.UserAgent { + continue + } + if detail.URLPath != "" && doc.URLPath != detail.URLPath { + continue + } + if detail.Revision != "" && doc.Revision != detail.Revision { + continue + } + records = append(records, &doc) + } + if len(records) == 0 { + return nil, datasource.ErrRecordNotExists + } + return records, nil +} diff --git a/server/db/db.go b/server/db/db.go index 5bacd371..7f75b32f 100644 --- a/server/db/db.go +++ b/server/db/db.go @@ -24,6 +24,7 @@ import ( "github.com/apache/servicecomb-kie/server/config" "github.com/apache/servicecomb-kie/server/config/tlsutil" + "github.com/apache/servicecomb-kie/server/datasource/local/file" "github.com/go-chassis/cari/db" dconfig "github.com/go-chassis/cari/db/config" "github.com/go-chassis/openlog" @@ -59,6 +60,18 @@ func Init(c config.DB) error { return errors.New("tls setting invalid:" + err.Error()) } } + + if c.Kind == "etcd_with_localstorage" || c.Kind == "embedded_etcd_with_localstorage" { + if c.Kind == "embedded_etcd_with_localstorage" { + c.Kind = "embedded_etcd" + } + if c.Kind == "etcd_with_localstorage" { + c.Kind = "etcd" + } + if c.LocalFilePath != "" { + file.FileRootPath = c.LocalFilePath + } + } return db.Init(&dconfig.Config{ Kind: c.Kind, URI: c.URI, diff --git a/server/handler/track_handler.go b/server/handler/track_handler.go index 29328e1c..d327f33a 100644 --- a/server/handler/track_handler.go +++ b/server/handler/track_handler.go @@ -78,7 +78,10 @@ func (h *TrackHandler) Handle(chain *handler.Chain, inv *invocation.Invocation, data.Domain = v1.ReadDomain(req.Request.Context()) data.Project = req.PathParameter(common.PathParameterProject) data.IP = iputil.ClientIP(req.Request) - data.ResponseBody = req.Attribute(common.RespBodyContextKey).([]*model.KVDoc) + responseBodyAttr := req.Attribute(common.RespBodyContextKey) + if responseBodyAttr != nil { + data.ResponseBody = responseBodyAttr.([]*model.KVDoc) + } data.ResponseCode = ir.Status data.Timestamp = time.Now() if resp != nil { @@ -96,7 +99,6 @@ func (h *TrackHandler) Handle(chain *handler.Chain, inv *invocation.Invocation, return } cb(ir) - }) } diff --git a/server/metrics/prometheus.go b/server/metrics/prometheus.go new file mode 100644 index 00000000..6d38031d --- /dev/null +++ b/server/metrics/prometheus.go @@ -0,0 +1,54 @@ +package metrics + +import ( + "context" + "time" + + "github.com/go-chassis/go-archaius" + "github.com/go-chassis/go-chassis/v2/pkg/metrics" + "github.com/go-chassis/openlog" + + "github.com/apache/servicecomb-kie/server/datasource" +) + +const domain = "default" +const project = "default" + +func InitMetric() error { + err := metrics.CreateGauge(metrics.GaugeOpts{ + Key: "servicecomb_kie_config_count", + Help: "use to show the number of config under a specifical domain and project pair", + Labels: []string{"domain", "project"}, + }) + if err != nil { + openlog.Error("init servicecomb_kie_config_count Gauge fail:" + err.Error()) + return err + } + reportIntervalstr := archaius.GetString("servicecomb.metrics.interval", "5s") + reportInterval, _ := time.ParseDuration(reportIntervalstr) + reportTicker := time.NewTicker(reportInterval) + go func() { + for { + _, ok := <-reportTicker.C + if !ok { + return + } + getTotalConfigCount(project, domain) + } + }() + return nil +} + +func getTotalConfigCount(project, domain string) { + total, err := datasource.GetBroker().GetKVDao().Total(context.TODO(), project, domain) + if err != nil { + openlog.Error("set total config number fail: " + err.Error()) + return + } + labels := map[string]string{"domain": domain, "project": project} + err = metrics.GaugeSet("servicecomb_kie_config_count", float64(total), labels) + if err != nil { + openlog.Error("set total config number fail:" + err.Error()) + return + } +} diff --git a/server/resource/v1/admin_resource.go b/server/resource/v1/admin_resource.go index 4d79eb10..d63b0a49 100644 --- a/server/resource/v1/admin_resource.go +++ b/server/resource/v1/admin_resource.go @@ -22,13 +22,15 @@ import ( "strconv" "time" - "github.com/apache/servicecomb-kie/pkg/model" - "github.com/apache/servicecomb-kie/server/datasource" goRestful "github.com/emicklei/go-restful" "github.com/go-chassis/cari/config" "github.com/go-chassis/go-chassis/v2/pkg/runtime" "github.com/go-chassis/go-chassis/v2/server/restful" "github.com/go-chassis/openlog" + + "github.com/apache/servicecomb-kie/pkg/common" + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/server/datasource" ) type AdminResource struct { @@ -57,6 +59,10 @@ func (r *AdminResource) URLPatterns() []restful.Route { // HealthCheck provider version info and time info func (r *AdminResource) HealthCheck(context *restful.Context) { + healthCheckMode := context.ReadQueryParameter(common.QueryParamMode) + if healthCheckMode == "liveness" { + return + } domain := ReadDomain(context.Ctx) resp := &model.DocHealthCheck{} latest, err := datasource.GetBroker().GetRevisionDao().GetRevision(context.Ctx, domain) diff --git a/server/resource/v1/admin_resource_test.go b/server/resource/v1/admin_resource_test.go index 2ed6c2fa..f7ed64de 100644 --- a/server/resource/v1/admin_resource_test.go +++ b/server/resource/v1/admin_resource_test.go @@ -27,14 +27,15 @@ import ( _ "github.com/apache/servicecomb-kie/test" - "github.com/apache/servicecomb-kie/pkg/model" - v1 "github.com/apache/servicecomb-kie/server/resource/v1" "github.com/go-chassis/go-chassis/v2/server/restful/restfultest" "github.com/stretchr/testify/assert" + + "github.com/apache/servicecomb-kie/pkg/model" + v1 "github.com/apache/servicecomb-kie/server/resource/v1" ) func Test_HeathCheck(t *testing.T) { - path := fmt.Sprintf("/v1/health") + path := "/v1/health" r, _ := http.NewRequest("GET", path, nil) revision := &v1.AdminResource{} c, err := restfultest.New(revision, nil) @@ -48,3 +49,16 @@ func Test_HeathCheck(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, data) } + +func Test_HeakthCheckLiveMode(t *testing.T) { + path := fmt.Sprintf("/v1/health?mode=liveness") + r, _ := http.NewRequest("GET", path, nil) + + revision := &v1.AdminResource{} + c, err := restfultest.New(revision, nil) + assert.NoError(t, err) + resp := httptest.NewRecorder() + c.ServeHTTP(resp, r) + respcode := resp.Code + assert.NotEmpty(t, respcode) +} diff --git a/server/resource/v1/common.go b/server/resource/v1/common.go index 565d0a7e..97361423 100644 --- a/server/resource/v1/common.go +++ b/server/resource/v1/common.go @@ -53,6 +53,8 @@ const ( AttributeDomainKey = "domain" FmtReadRequestError = "decode request body failed: %v" + + maxTimeoutProtectionIntervalWhenWait = 2 * time.Second ) func NewObserver() (*pubsub.Observer, error) { @@ -210,29 +212,64 @@ func getMatchPattern(rctx *restful.Context) string { } return m } -func eventHappened(waitStr string, topic *pubsub.Topic, ctx context.Context) (bool, string, error) { +func eventHappened(ctx context.Context, waitStr string, topic *pubsub.Topic) (happened bool, topicName string, err error) { d, err := time.ParseDuration(waitStr) if err != nil || d > common.MaxWait { return false, "", errors.New(common.MsgInvalidWait) } - happened := true + o, err := NewObserver() if err != nil { openlog.Error(err.Error()) return false, "", err } - topicName, err := pubsub.AddObserver(o, topic) + topicName, err = pubsub.AddObserver(o, topic) if err != nil { return false, "", errors.New("observe once failed: " + err.Error()) } + // 当事件发生于等待时间即将耗尽时(如仅剩余1ns),使用剩余的deadline查询后端,则大概率超时,这是误报错。 + // 将等待时间按先后划分为常规时间和超时保护时间,在超时保护时间段内,查询后端发生超时错误,屏蔽错误,视为数据未变化,降低误报错概率。 + // + // 事件产生阶段 查询后端是否超时 response + // --------------------------------------------------- + // normal 是 500 + // normal 否 200 + // timeoutProtection 是 304 + // timeoutProtection 否 200 + normalInterval, timeoutProtectionInterval := splitWaitInterval(d) + // 常规时间段 + if waitObserverEventHappened(o, normalInterval) { + prepareCache(ctx, topicName, topic, false) + return true, topicName, nil + } + // 超时保护时间段 + if waitObserverEventHappened(o, timeoutProtectionInterval) { + if prepareCache(ctx, topicName, topic, true) { + return true, topicName, nil + } + return false, topicName, nil + } + + pubsub.RemoveObserver(o.UUID, topic) + return false, topicName, nil +} + +func splitWaitInterval(d time.Duration) (normalInterval time.Duration, timeoutProtectionInterval time.Duration) { + timeoutProtectionInterval = d / 5 // 最后的20%时间 + if timeoutProtectionInterval > maxTimeoutProtectionIntervalWhenWait { + timeoutProtectionInterval = maxTimeoutProtectionIntervalWhenWait + } + normalInterval = d - timeoutProtectionInterval + return normalInterval, timeoutProtectionInterval +} + +func waitObserverEventHappened(o *pubsub.Observer, interval time.Duration) bool { select { - case <-time.After(d): - happened = false - pubsub.RemoveObserver(o.UUID, topic) + case <-time.After(interval): + return false case <-o.Event: - prepareCache(topicName, topic, ctx) + return true } - return happened, topicName, nil } // size from 1 to start @@ -309,7 +346,7 @@ func queryAndResponse(rctx *restful.Context, request *model.ListKVRequest) { } } -func prepareCache(topicName string, topic *pubsub.Topic, ctx context.Context) { +func prepareCache(ctx context.Context, topicName string, topic *pubsub.Topic, ignoreDeadlineExceededErr bool) (prepared bool) { rev, kvs, err := kvsvc.ListKV(ctx, &model.ListKVRequest{ Domain: topic.DomainID, Project: topic.Project, @@ -318,10 +355,20 @@ func prepareCache(topicName string, topic *pubsub.Topic, ctx context.Context) { }) if err != nil { openlog.Error("can not query kvs:" + err.Error()) + if isDeadlineExceededErr(err) && ignoreDeadlineExceededErr { + openlog.Info("ignore DeadlineExceededErr, not prepare cache") + return false + } } + cache.CachedKV().Write(topicName, &cache.DBResult{ KVs: kvs, Rev: rev, Err: err, }) + return true +} + +func isDeadlineExceededErr(err error) bool { + return strings.Contains(err.Error(), context.DeadlineExceeded.Error()) } diff --git a/server/resource/v1/common_test.go b/server/resource/v1/common_test.go index 537120a9..c6b13b7d 100644 --- a/server/resource/v1/common_test.go +++ b/server/resource/v1/common_test.go @@ -15,15 +15,16 @@ * limitations under the License. */ -package v1_test +package v1 import ( + "errors" + "github.com/apache/servicecomb-kie/server/pubsub" + _ "github.com/apache/servicecomb-kie/test" "net/http" "testing" + "time" - _ "github.com/apache/servicecomb-kie/test" - - v1 "github.com/apache/servicecomb-kie/server/resource/v1" "github.com/emicklei/go-restful" "github.com/stretchr/testify/assert" ) @@ -33,7 +34,7 @@ func TestGetLabels(t *testing.T) { "/kv?q=app:mall+service:payment&q=app:mall+service:payment+version:1.0.0", nil) assert.NoError(t, err) - c, err := v1.ReadLabelCombinations(restful.NewRequest(r)) + c, err := ReadLabelCombinations(restful.NewRequest(r)) assert.NoError(t, err) assert.Equal(t, 2, len(c)) @@ -41,7 +42,94 @@ func TestGetLabels(t *testing.T) { "/kv", nil) assert.NoError(t, err) - c, err = v1.ReadLabelCombinations(restful.NewRequest(r)) + c, err = ReadLabelCombinations(restful.NewRequest(r)) assert.NoError(t, err) assert.Equal(t, 1, len(c)) } + +func Test_splitWaitInterval(t *testing.T) { + type args struct { + d time.Duration + } + tests := []struct { + name string + args args + wantNormalInterval time.Duration + wantTimeoutProtectionInterval time.Duration + }{ + { + name: "regular interval", + args: args{ + d: 5 * time.Second, + }, + wantNormalInterval: 4 * time.Second, + wantTimeoutProtectionInterval: 1 * time.Second, + }, + { + name: "interval very big, origin timeout protect bigger than the max value", + args: args{ + d: 5000 * time.Second, + }, + wantNormalInterval: 4998 * time.Second, + wantTimeoutProtectionInterval: 2 * time.Second, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotNormalInterval, gotTimeoutProtectionInterval := splitWaitInterval(tt.args.d) + assert.Equalf(t, tt.wantNormalInterval, gotNormalInterval, "splitWaitInterval(%v)", tt.args.d) + assert.Equalf(t, tt.wantTimeoutProtectionInterval, gotTimeoutProtectionInterval, "splitWaitInterval(%v)", tt.args.d) + }) + } +} + +func getObserverThatHappenedEventAfter10ms() *pubsub.Observer { + observer, _ := NewObserver() + go func() { + time.Sleep(10 * time.Millisecond) + observer.Event <- &pubsub.KVChangeEvent{} + }() + return observer +} + +func Test_waitObserverEventHappened(t *testing.T) { + // timeout + assert.False(t, waitObserverEventHappened(getObserverThatHappenedEventAfter10ms(), 5*time.Millisecond)) + + // not timeout + assert.True(t, waitObserverEventHappened(getObserverThatHappenedEventAfter10ms(), 50*time.Millisecond)) + + // zero interval + assert.False(t, waitObserverEventHappened(getObserverThatHappenedEventAfter10ms(), 0*time.Millisecond)) +} + +func Test_isDeadlineExceededErr(t *testing.T) { + type args struct { + err error + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "DeadlineExceededErr", + args: args{ + err: errors.New("error is context deadline exceeded"), + }, + want: true, + }, + { + name: "no DeadlineExceededErr", + args: args{ + err: errors.New("error is unknown"), + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, isDeadlineExceededErr(tt.args.err), "isDeadlineExceededErr(%v)", tt.args.err) + }) + } +} diff --git a/server/resource/v1/doc_struct.go b/server/resource/v1/doc_struct.go index 6241738c..d8b31b1d 100644 --- a/server/resource/v1/doc_struct.go +++ b/server/resource/v1/doc_struct.go @@ -202,7 +202,7 @@ type KVUpdateBody struct { // DeleteBody is the request body struct of delete multiple kvs interface type DeleteBody struct { - IDs []string `json:"ids"` + IDs []string `json:"ids" validate:"required,min=1,max=100,dive,required"` } // ErrorMsg is open api doc diff --git a/server/resource/v1/history_resource_test.go b/server/resource/v1/history_resource_test.go index 14efae65..a6956395 100644 --- a/server/resource/v1/history_resource_test.go +++ b/server/resource/v1/history_resource_test.go @@ -88,7 +88,7 @@ func TestHistoryResource_GetRevisions(t *testing.T) { body, err := ioutil.ReadAll(resp.Body) assert.NoError(t, err) var data model.KVResponse - err = json.Unmarshal(body, &data) + _ = json.Unmarshal(body, &data) assert.Equal(t, before+1, len(data.Data)) }) } diff --git a/server/resource/v1/kv_resource.go b/server/resource/v1/kv_resource.go index 5a163fb4..1c99390f 100644 --- a/server/resource/v1/kv_resource.go +++ b/server/resource/v1/kv_resource.go @@ -263,12 +263,12 @@ func isLegalWaitRequest(rctx *restful.Context, request *model.ListKVRequest) boo return true } func watch(rctx *restful.Context, request *model.ListKVRequest, wait string) bool { - changed, topic, err := eventHappened(wait, &pubsub.Topic{ + changed, topic, err := eventHappened(rctx.Ctx, wait, &pubsub.Topic{ Labels: request.Labels, Project: request.Project, MatchType: request.Match, DomainID: request.Domain, - }, rctx.Ctx) + }) if err != nil { WriteErrResponse(rctx, config.ErrObserveEvent, err.Error()) return true @@ -325,7 +325,14 @@ func (r *KVResource) DeleteList(rctx *restful.Context) { WriteErrResponse(rctx, config.ErrInvalidParams, fmt.Sprintf(FmtReadRequestError, err)) return } - err := validateDeleteList(domain, project) + + err := validator.Validate(b) + if err != nil { + WriteErrResponse(rctx, config.ErrInvalidParams, err.Error()) + return + } + + err = validateDeleteList(domain, project) if err != nil { WriteErrResponse(rctx, config.ErrInvalidParams, err.Error()) return diff --git a/server/resource/v1/kv_resource_test.go b/server/resource/v1/kv_resource_test.go index c52c3aa1..39fa3d01 100644 --- a/server/resource/v1/kv_resource_test.go +++ b/server/resource/v1/kv_resource_test.go @@ -356,7 +356,7 @@ func TestKVResource_List(t *testing.T) { c.ServeHTTP(resp2, r2) rev = resp2.Header().Get(common2.HeaderRevision) t.Log(rev) - body, err := ioutil.ReadAll(resp2.Body) + body, _ := ioutil.ReadAll(resp2.Body) time.Sleep(1 * time.Second) t.Log(string(body)) assert.Equal(t, http.StatusNotModified, resp2.Result().StatusCode) @@ -933,3 +933,56 @@ func TestKVResource_DeleteList(t *testing.T) { assert.Equal(t, 0, len(result.Data)) }) } + +func Test_ValidateDeleteBody(t *testing.T) { + tests := []struct { + name string + deleteBody *v1.DeleteBody + wantError assert.ErrorAssertionFunc + }{ + { + name: "normal case", + deleteBody: &v1.DeleteBody{ + IDs: []string{"a", "b"}, + }, + wantError: assert.NoError, + }, + { + name: "empty string case", + deleteBody: &v1.DeleteBody{ + IDs: []string{"a", ""}, + }, + wantError: assert.Error, + }, + { + name: "empty slices case", + deleteBody: &v1.DeleteBody{ + IDs: []string{}, + }, + wantError: assert.Error, + }, + { + name: "nil case", + deleteBody: &v1.DeleteBody{ + IDs: nil, + }, + wantError: assert.Error, + }, + { + name: "ids numbers exceeds 100 case", + deleteBody: &v1.DeleteBody{ + IDs: []string{"", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""}, + }, + wantError: assert.Error, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := validator.Validate(tt.deleteBody) + if !tt.wantError(t, gotErr, fmt.Sprintf("validator.Validate(%v)", tt.deleteBody)) { + return + } + }) + } +} diff --git a/server/server.go b/server/server.go index 4214262f..eaf13e5a 100644 --- a/server/server.go +++ b/server/server.go @@ -22,6 +22,7 @@ import ( "github.com/apache/servicecomb-kie/server/config" "github.com/apache/servicecomb-kie/server/datasource" "github.com/apache/servicecomb-kie/server/db" + "github.com/apache/servicecomb-kie/server/metrics" "github.com/apache/servicecomb-kie/server/pubsub" "github.com/apache/servicecomb-kie/server/rbac" v1 "github.com/apache/servicecomb-kie/server/resource/v1" @@ -46,6 +47,9 @@ func Run() { if err := datasource.Init(config.GetDB().Kind); err != nil { openlog.Fatal(err.Error()) } + if err := metrics.InitMetric(); err != nil { + openlog.Fatal(err.Error()) + } if err := validator.Init(); err != nil { openlog.Fatal("validate init failed: " + err.Error()) } diff --git a/test/init.go b/test/init.go index 40b2e033..561c8bda 100644 --- a/test/init.go +++ b/test/init.go @@ -21,29 +21,34 @@ import ( "fmt" "math/rand" - "github.com/apache/servicecomb-kie/server/db" _ "github.com/go-chassis/cari/db/bootstrap" + "github.com/apache/servicecomb-kie/server/db" + + _ "github.com/apache/servicecomb-service-center/eventbase/bootstrap" + _ "github.com/go-chassis/go-chassis/v2/security/cipher/plugins/plain" + _ "github.com/apache/servicecomb-kie/server/datasource/etcd" + _ "github.com/apache/servicecomb-kie/server/datasource/local" _ "github.com/apache/servicecomb-kie/server/datasource/mongo" _ "github.com/apache/servicecomb-kie/server/plugin/qms" _ "github.com/apache/servicecomb-kie/server/pubsub/notifier" - _ "github.com/apache/servicecomb-service-center/eventbase/bootstrap" - _ "github.com/go-chassis/go-chassis/v2/security/cipher/plugins/plain" - "github.com/apache/servicecomb-kie/pkg/validator" - "github.com/apache/servicecomb-kie/server/config" - "github.com/apache/servicecomb-kie/server/datasource" - "github.com/apache/servicecomb-kie/server/pubsub" edatasource "github.com/apache/servicecomb-service-center/eventbase/datasource" "github.com/go-chassis/go-archaius" "github.com/go-chassis/go-chassis/v2/pkg/backends/quota" "github.com/go-chassis/go-chassis/v2/security/cipher" + + "github.com/apache/servicecomb-kie/pkg/validator" + "github.com/apache/servicecomb-kie/server/config" + "github.com/apache/servicecomb-kie/server/datasource" + "github.com/apache/servicecomb-kie/server/pubsub" ) var ( - uri string - kind string + uri string + kind string + localFilePath string ) func init() { @@ -54,6 +59,8 @@ func init() { } kind = archaius.GetString("TEST_DB_KIND", "etcd") uri = archaius.GetString("TEST_DB_URI", "http://127.0.0.1:2379") + localFilePath = archaius.GetString("TEST_KVS_ROOT_PATH", "") + err = archaius.Init(archaius.WithMemorySource()) if err != nil { panic(err) @@ -71,9 +78,10 @@ func init() { panic(err) } err = db.Init(config.DB{ - URI: uri, - Timeout: "10s", - Kind: kind, + URI: uri, + Timeout: "10s", + Kind: kind, + LocalFilePath: localFilePath, }) if err != nil { panic(err) @@ -82,12 +90,20 @@ func init() { if err != nil { panic(err) } - err = edatasource.Init(kind) + + edatasourceKind := kind + if kind == "etcd_with_localstorage" { + edatasourceKind = "etcd" + } + if kind == "embedded_etcd_with_localstorage" { + edatasourceKind = "embedded_etcd" + } + err = edatasource.Init(&edatasource.Config{Kind: edatasourceKind}) if err != nil { panic(err) } - //for UT + // for UT addr := randomListenAddress() config.Configurations = &config.Config{ DB: config.DB{}, @@ -113,7 +129,3 @@ func randomListenAddress() string { addr := fmt.Sprintf("127.0.0.1:%d", port) return addr } - -func IsEmbeddedetcdMode() bool { - return kind == "embedded_etcd" -}