diff --git a/README.md b/README.md index 0e1bf75..e917eee 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,8 @@ path match: - [ ] epoch based retry & loadbalancing - [ ] modularize - [ ] easyjson & msgp - +- [ ] consider https://dgraph.io/blog/post/introducing-ristretto-high-perf-go-cache/ +- [ ] replace logrus with zerolog # ref diff --git a/cache_manager.go b/cache_manager.go deleted file mode 100644 index 0a25f3a..0000000 --- a/cache_manager.go +++ /dev/null @@ -1,156 +0,0 @@ -package main - -import ( - jsoniter "github.com/json-iterator/go" - "github.com/savsgio/gotils" - log "github.com/sirupsen/logrus" - "github.com/valyala/fasthttp" - "go.uber.org/multierr" - "time" -) - -type CacheManager struct { - cache1m ProxyCache - cache1h ProxyCache - cacheSolid ProxyCache -} - -func NewCacheManager() *CacheManager { - return &CacheManager{ - cache1m: NewBigCacheTTL(time.Minute, 30*time.Second, 64), - cache1h: NewBigCacheTTL(time.Hour, time.Minute, 128), - cacheSolid: NewBigCacheTTL(0, 0, 256), - } -} - -func (c *CacheManager) Set(key string, val []byte, ttl time.Duration) error { - if ttl <= 0 { - return nil - } - return c.getCacheForTTL(ttl).Set(key, val, ttl) -} - -func (c *CacheManager) getCacheForTTL(ttl time.Duration) ProxyCache { - switch { - case ttl < time.Minute: - return c.cache1m - case ttl < time.Hour: - return c.cache1h - default: - return c.cacheSolid - } -} - -func (c *CacheManager) Get(key string, suggestTTL time.Duration) []byte { - if val := c.getCacheForTTL(suggestTTL).Get(key); val != nil { - return val - } else if val := c.cacheSolid.Get(key); val != nil { - return val - } else if val := c.cache1h.Get(key); val != nil { - return val - } else if val := c.cache1m.Get(key); val != nil { - return val - } - return nil -} - -func (c *CacheManager) GetItem(key string, suggestTTL time.Duration) *CachedItem { - val := c.Get(key, suggestTTL) - if val == nil { - return nil - } - item := AcquireCachedItem() - err := jsoniter.Unmarshal(val, item) - if err != nil { - log.WithError(err).Error("failed to unmarshal cached item") - ReleaseCachedItem(item) - return nil - } - return item -} - -func (c *CacheManager) Clear() error { - return multierr.Combine( - c.cache1m.Clear(), - c.cache1h.Clear(), - c.cacheSolid.Clear(), - ) -} - -type CachedHttpResp struct { - Code int `json:"c,omitempty"` - ContentEncoding []byte `json:"e,omitempty"` - ContentType []byte `json:"t,omitempty"` - Body []byte `json:"b,omitempty"` -} - -type CachedItem struct { - RpcError *RpcError `json:"e,omitempty"` - Result jsoniter.RawMessage `json:"r,omitempty"` - HttpResponse *CachedHttpResp `json:"h,omitempty"` -} - -func (i *CachedItem) Marshal() []byte { - d, _ := jsoniter.Marshal(i) - return d -} - -func (i *CachedItem) IsEmpty() bool { - return i.RpcError == nil && i.HttpResponse == nil && len(i.Result) == 0 -} - -func (i *CachedItem) Reset() { - i.RpcError = nil - i.HttpResponse = nil - i.Result = nil -} - -func (i *CachedItem) IsRpc() bool { - return i.RpcError != nil || i.Result != nil -} - -func (i *CachedItem) IsRpcError() bool { - return i.RpcError != nil -} - -func (i *CachedItem) GetRpcError() *RpcError { - return i.RpcError -} - -func (i *CachedItem) IsHttpResponse() bool { - return i.HttpResponse != nil -} - -func (i *CachedItem) WriteHttpResponse(r *fasthttp.Response) { - r.Header.SetStatusCode(i.HttpResponse.Code) - r.Header.SetBytesV(fasthttp.HeaderContentEncoding, i.HttpResponse.ContentEncoding) - r.Header.SetContentType(gotils.B2S(i.HttpResponse.ContentType)) - r.SetBody(i.HttpResponse.Body) -} - -func (i *CachedItem) IsRpcResult() bool { - return i.Result != nil -} - -func (i *CachedItem) GetRpcResponse(id interface{}) *RpcResponse { - r := AcquireRpcResponse() - r.Id = id - if i.RpcError != nil { - r.Error = i.RpcError - } else if i.Result != nil { - r.Result = i.Result - } - return r -} - -func (i *CachedItem) WriteToRpcResponse(r *RpcResponse, id interface{}) { - r.Jsonrpc = JSONRPC2 - r.Id = id - if i.RpcError != nil { - r.Error = i.RpcError - } else if i.Result != nil { - r.Result = i.Result - } -} - -type CachedItems []CachedItem diff --git a/cache.go b/caching/bigcache.go similarity index 81% rename from cache.go rename to caching/bigcache.go index e84ec37..5e21d67 100644 --- a/cache.go +++ b/caching/bigcache.go @@ -1,19 +1,14 @@ -package main +package caching import ( "encoding/binary" "errors" "github.com/allegro/bigcache" - log "github.com/sirupsen/logrus" + "github.com/revolution1/jsonrpc-proxy/fnv64" + "github.com/sirupsen/logrus" "time" ) -type ProxyCache interface { - Set(key string, val []byte, ttl time.Duration) error - Get(key string) []byte - Clear() error -} - type BigCacheTTL struct { *bigcache.BigCache } @@ -26,9 +21,9 @@ func NewBigCacheTTL(maxTTL, cleanWindow time.Duration, maxSizeMb int) *BigCacheT MaxEntriesInWindow: 1000 * 10 * 60, MaxEntrySize: 500, Verbose: true, - Hasher: fnv64a{}, + Hasher: fnv64.Fnv64a{}, HardMaxCacheSize: maxSizeMb, - Logger: log.StandardLogger(), + Logger: logrus.StandardLogger(), }) if err != nil { panic(err) @@ -65,8 +60,8 @@ func (c *BigCacheTTL) Get(key string) []byte { return val[8:] } -func (c *BigCacheTTL) Clear() error { - return c.BigCache.Reset() +func (c *BigCacheTTL) Clear() { + _ = c.BigCache.Reset() } func (c *BigCacheTTL) Iterator() {} diff --git a/cache_test.go b/caching/bigcache_test.go similarity index 59% rename from cache_test.go rename to caching/bigcache_test.go index 089fa14..2ed14aa 100644 --- a/cache_test.go +++ b/caching/bigcache_test.go @@ -1,23 +1,38 @@ -package main +package caching import ( + "github.com/revolution1/jsonrpc-proxy/utils" assertion "github.com/stretchr/testify/assert" "strconv" "testing" "time" ) +func TestCacheValSize(t *testing.T) { + assert := assertion.New(t) + c := NewBigCacheTTL(time.Second, time.Second, 64) + for i := 1024 * 256; i < 1024*1024; i++ { + err := c.Set("key", utils.Blob('x', i), time.Millisecond) + assert.NoError(err) + _ = c.Delete("key") + if err != nil { + t.Log("size: ", i) + break + } + } +} + func TestBigCacheTTL(t *testing.T) { assert := assertion.New(t) - c := NewBigCacheTTL(time.Second, time.Second) + c := NewBigCacheTTL(time.Second, time.Second, 64) assert.Nil(c.Get("a")) assert.NoError(c.Set("1", []byte("val"), time.Millisecond)) assert.Equal([]byte("val"), c.Get("1")) time.Sleep(2 * time.Millisecond) assert.Nil(c.Get("1")) - assert.NoError(c.Clear()) + c.Clear() - c = NewBigCacheTTL(0, 0) + c = NewBigCacheTTL(0, 0, 64) assert.NoError(c.Set("1", []byte("val"), time.Millisecond)) assert.Equal([]byte("val"), c.Get("1")) time.Sleep(2 * time.Millisecond) @@ -29,7 +44,7 @@ func TestBigCacheTTL(t *testing.T) { } func BenchmarkBigCacheTTL(b *testing.B) { - c := NewBigCacheTTL(time.Second, time.Second) + c := NewBigCacheTTL(time.Second, time.Second, 64) for i := 0; i < b.N; i++ { _ = c.Set(strconv.Itoa(i), []byte("test val"), time.Millisecond) _ = c.Get(strconv.Itoa(i - 1)) diff --git a/caching/bigmanager.go b/caching/bigmanager.go new file mode 100644 index 0000000..92f5d3f --- /dev/null +++ b/caching/bigmanager.go @@ -0,0 +1,72 @@ +package caching + +import ( + jsoniter "github.com/json-iterator/go" + "time" +) + +type BigCacheManager struct { + cache1m ProxyCache + cache1h ProxyCache + cacheSolid ProxyCache +} + +func NewCacheManager() *BigCacheManager { + return &BigCacheManager{ + cache1m: NewBigCacheTTL(time.Minute, 30*time.Second, 64), + cache1h: NewBigCacheTTL(time.Hour, time.Minute, 128), + cacheSolid: NewBigCacheTTL(0, 0, 256), + } +} + +func (c *BigCacheManager) Set(key string, val []byte, ttl time.Duration) error { + if ttl <= 0 { + return nil + } + return c.getCacheForTTL(ttl).Set(key, val, ttl) +} + +func (c *BigCacheManager) getCacheForTTL(ttl time.Duration) ProxyCache { + switch { + case ttl < time.Minute: + return c.cache1m + case ttl < time.Hour: + return c.cache1h + default: + return c.cacheSolid + } +} + +func (c *BigCacheManager) Get(key string, suggestTTL time.Duration) []byte { + if val := c.getCacheForTTL(suggestTTL).Get(key); val != nil { + return val + } else if val := c.cacheSolid.Get(key); val != nil { + return val + } else if val := c.cache1h.Get(key); val != nil { + return val + } else if val := c.cache1m.Get(key); val != nil { + return val + } + return nil +} + +func (c *BigCacheManager) GetItem(key string, suggestTTL time.Duration) *CachedItem { + val := c.Get(key, suggestTTL) + if val == nil { + return nil + } + item := AcquireCachedItem() + err := jsoniter.Unmarshal(val, item) + if err != nil { + log.WithError(err).Error("failed to unmarshal cached item") + ReleaseCachedItem(item) + return nil + } + return item +} + +func (c *BigCacheManager) Clear() { + c.cache1m.Clear() + c.cache1h.Clear() + c.cacheSolid.Clear() +} diff --git a/caching/cache.go b/caching/cache.go new file mode 100644 index 0000000..c14529e --- /dev/null +++ b/caching/cache.go @@ -0,0 +1,34 @@ +package caching + +import ( + jsoniter "github.com/json-iterator/go" + "github.com/revolution1/jsonrpc-proxy/jsonrpc" + "github.com/valyala/fasthttp" + "time" +) + +type ProxyCache interface { + Set(key string, val []byte, ttl time.Duration) error + Get(key string) []byte + Clear() +} + +type CacheManager interface { + SetRpcCache(req *jsonrpc.RpcRequest, resp *jsonrpc.RpcResponse, ttl time.Duration) error + SetHttpCache(req *jsonrpc.RpcRequest, resp *fasthttp.Response, ttl time.Duration) error + GetItem(req *jsonrpc.RpcRequest) *CachedItem + Clear() +} + +type CachedHttpResp struct { + Code int `json:"c,omitempty"` + ContentEncoding []byte `json:"e,omitempty"` + ContentType []byte `json:"t,omitempty"` + Body []byte `json:"b,omitempty"` +} + +type CachedItem struct { + RpcError *jsonrpc.RpcError `json:"e,omitempty"` + Result jsoniter.RawMessage `json:"r,omitempty"` + HttpResponse *CachedHttpResp `json:"h,omitempty"` +} diff --git a/caching/item.go b/caching/item.go new file mode 100644 index 0000000..e5f0b6d --- /dev/null +++ b/caching/item.go @@ -0,0 +1,96 @@ +package caching + +import ( + jsoniter "github.com/json-iterator/go" + "github.com/revolution1/jsonrpc-proxy/jsonrpc" + "github.com/savsgio/gotils" + "github.com/valyala/fasthttp" + "sync" +) + +func (c *CachedHttpResp) Size() int { + if c == nil { + return 0 + } + return 8 + len(c.ContentType) + len(c.ContentEncoding) + len(c.Body) +} + +func (i *CachedItem) Marshal() []byte { + d, _ := jsoniter.Marshal(i) + return d +} + +func (i *CachedItem) IsEmpty() bool { + return i.RpcError == nil && i.HttpResponse == nil && len(i.Result) == 0 +} + +func (i *CachedItem) Reset() { + i.RpcError = nil + i.HttpResponse = nil + i.Result = nil +} + +func (i *CachedItem) IsRpc() bool { + return i.RpcError != nil || i.Result != nil +} + +func (i *CachedItem) IsRpcError() bool { + return i.RpcError != nil +} + +func (i *CachedItem) GetRpcError() *jsonrpc.RpcError { + return i.RpcError +} + +func (i *CachedItem) IsHttpResponse() bool { + return i.HttpResponse != nil +} + +func (i *CachedItem) WriteHttpResponse(r *fasthttp.Response) { + r.Header.SetStatusCode(i.HttpResponse.Code) + r.Header.SetBytesV(fasthttp.HeaderContentEncoding, i.HttpResponse.ContentEncoding) + r.Header.SetContentType(gotils.B2S(i.HttpResponse.ContentType)) + r.SetBody(i.HttpResponse.Body) +} + +func (i *CachedItem) IsRpcResult() bool { + return i.Result != nil +} + +func (i *CachedItem) GetRpcResponse(id interface{}) *jsonrpc.RpcResponse { + r := jsonrpc.AcquireRpcResponse() + r.Id = id + if i.RpcError != nil { + r.Error = i.RpcError + } else if i.Result != nil { + r.Result = i.Result + } + return r +} + +func (i *CachedItem) WriteToRpcResponse(r *jsonrpc.RpcResponse, id interface{}) { + r.Jsonrpc = jsonrpc.JSONRPC2 + r.Id = id + if i.RpcError != nil { + r.Error = i.RpcError + } else if i.Result != nil { + r.Result = i.Result + } +} + +type CachedItems []CachedItem + +var cachedItemPool = sync.Pool{New: func() interface{} { return &CachedItem{} }} + +func AcquireCachedItem() *CachedItem { + v := cachedItemPool.Get() + if v == nil { + return &CachedItem{} + } + return v.(*CachedItem) +} + +func ReleaseCachedItem(item *CachedItem) { + item.Reset() + cachedItemPool.Put(item) +} diff --git a/caching/logger.go b/caching/logger.go new file mode 100644 index 0000000..e1415db --- /dev/null +++ b/caching/logger.go @@ -0,0 +1,7 @@ +package caching + +import ( + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("logger", "caching") diff --git a/caching/riscahe_test.go b/caching/riscahe_test.go new file mode 100644 index 0000000..2e8ceb5 --- /dev/null +++ b/caching/riscahe_test.go @@ -0,0 +1,109 @@ +package caching + +import ( + "github.com/dustin/go-humanize" + jsoniter "github.com/json-iterator/go" + "github.com/revolution1/jsonrpc-proxy/jsonrpc" + "github.com/revolution1/jsonrpc-proxy/utils" + assertion "github.com/stretchr/testify/assert" + "github.com/valyala/fasthttp" + "testing" + "time" +) + +var testCacheSize int64 = 1 * humanize.MiByte + +func TestRisCache(t *testing.T) { + assert := assertion.New(t) + c := NewRisCache(testCacheSize) + + i, e := c.IncrInt64By("foo", 5, 0) + assert.NoError(e) + assert.Equal(int64(5), i) + time.Sleep(10 * time.Millisecond) + i, e = c.GetInt64("foo") + assert.NoError(e) + assert.Equal(int64(5), i) + + i, e = c.IncrInt64By("foo", 1, 0) + assert.NoError(e) + assert.Equal(int64(6), i) + time.Sleep(10 * time.Millisecond) + i, e = c.GetInt64("foo") + assert.NoError(e) + assert.Equal(int64(6), i) + + i, e = c.IncrInt64By("foo", -2, 0) + assert.NoError(e) + assert.Equal(int64(4), i) + time.Sleep(10 * time.Millisecond) + i, e = c.GetInt64("foo") + assert.NoError(e) + assert.Equal(int64(4), i) +} + +func TestRisCapacity(t *testing.T) { + assert := assertion.New(t) + const key = "key" + c := NewRisCache(testCacheSize) + ok := c.SetBytes(key, utils.Blob('x', 1024*1024), 0) + assert.True(ok) + time.Sleep(10 * time.Millisecond) + v, err := c.GetBytes(key) + assert.Error(err) + + c = NewRisCache(2 * testCacheSize) + ok = c.SetBytes(key, utils.Blob('x', 1024*1024), 0) + assert.True(ok) + time.Sleep(10 * time.Millisecond) + v, err = c.GetBytes(key) + assert.NoError(err) + assert.Equal(1024*1024, len(v)) +} + +func TestRisCachedItem(t *testing.T) { + assert := assertion.New(t) + c := NewRisCache(testCacheSize) + req := &jsonrpc.RpcRequest{ + Method: "blah", + Params: []string{"a", "b"}, + } + + resp := &jsonrpc.RpcResponse{ + Result: "abc", + } + assert.NoError(c.SetRpcCache(req, resp, 0)) + time.Sleep(10 * time.Millisecond) + item := c.GetItem(req) + assert.NotNil(item) + if item != nil { + assert.True(item.IsRpc()) + assert.Equal(jsoniter.RawMessage("\"abc\""), item.Result) + } + + errResp := &jsonrpc.RpcResponse{ + Error: jsonrpc.ErrRpcInvalidRequest, + } + assert.NoError(c.SetRpcCache(req, errResp, 0)) + time.Sleep(10 * time.Millisecond) + item = c.GetItem(req) + assert.NotNil(item) + if item != nil { + assert.True(item.IsRpcError()) + assert.EqualError(item.RpcError, jsonrpc.ErrRpcInvalidRequest.Error()) + } + + httpResp := fasthttp.AcquireResponse() + httpResp.SetStatusCode(200) + httpResp.SetBodyRaw([]byte("body content")) + httpResp.Header.SetContentType("content-type") + httpResp.Header.Set(fasthttp.HeaderContentEncoding, "utf8") + assert.NoError(c.SetHttpCache(req, httpResp, 0)) + time.Sleep(10 * time.Millisecond) + item = c.GetItem(req) + assert.NotNil(item) + if item != nil { + assert.True(item.IsHttpResponse()) + assert.Equal([]byte("utf8"), item.HttpResponse.ContentEncoding) + } +} diff --git a/caching/ristretto.go b/caching/ristretto.go new file mode 100644 index 0000000..3d22af9 --- /dev/null +++ b/caching/ristretto.go @@ -0,0 +1,157 @@ +package caching + +import ( + "github.com/dgraph-io/ristretto" + jsoniter "github.com/json-iterator/go" + "github.com/pkg/errors" + "github.com/revolution1/jsonrpc-proxy/jsonrpc" + "github.com/valyala/fasthttp" + "sync/atomic" + "time" +) + +var ( + ErrCacheDropped = errors.New("cache setting failed: dropped") + ErrCacheKeyNotFound = errors.New("key not found") + ErrCacheKeySetFail = errors.New("key set fail") + ErrCacheValueIsNotBytesType = errors.New("value is not bytes type") + ErrCacheValueIsNotInt64Type = errors.New("value is not int64 type") +) + +type RisCache struct { + *ristretto.Cache +} + +func NewRisCache(maxSizeBytes int64) *RisCache { + c, _ := ristretto.NewCache(&ristretto.Config{ + NumCounters: 1e7, + MaxCost: maxSizeBytes, + BufferItems: 64, + Metrics: false, + OnEvict: evictItem, + KeyToHash: nil, + Cost: nil, + }) + return &RisCache{c} +} + +func (c *RisCache) SetBytes(key string, val []byte, ttl time.Duration) bool { + cost := int64(len(key) + len(val)) + return c.Cache.SetWithTTL(key, val, cost, ttl) +} + +func (c *RisCache) GetBytes(key string) ([]byte, error) { + v, ok := c.Cache.Get(key) + if !ok { + return nil, ErrCacheKeyNotFound + } + b, ok := v.([]byte) + if !ok { + return nil, ErrCacheValueIsNotBytesType + } + return b, nil +} + +func (c *RisCache) SetInt64(key string, val int64, ttl time.Duration) bool { + return c.Cache.SetWithTTL(key, &val, int64(len(key)+8), ttl) +} + +func (c *RisCache) GetInt64(key string) (int64, error) { + v, ok := c.Cache.Get(key) + if !ok { + return 0, ErrCacheKeyNotFound + } + b, ok := v.(*int64) + if !ok { + return 0, ErrCacheValueIsNotBytesType + } + return *b, nil +} + +func (c *RisCache) IncrInt64By(key string, by int64, ttl time.Duration) (int64, error) { + v, ok := c.Cache.Get(key) + if !ok { + ok = c.SetInt64(key, by, ttl) + if !ok { + return 0, ErrCacheKeySetFail + } + return by, nil + } + b, ok := v.(*int64) + if !ok { + return 0, ErrCacheValueIsNotInt64Type + } + atomic.AddInt64(b, by) + return *b, nil +} + +func (c *RisCache) SetFloat64(key string, val float64, ttl time.Duration) bool { + return c.Cache.SetWithTTL(key, val, int64(len(key)+8), ttl) +} + +func (c *RisCache) SetRpcCache(req *jsonrpc.RpcRequest, resp *jsonrpc.RpcResponse, ttl time.Duration) error { + key, err := req.ToCacheKey() + if err != nil { + return err + } + item := AcquireCachedItem() + item.RpcError = resp.Error + result, err := jsoniter.Marshal(resp.Result) + if err != nil { + return err + } + item.Result = result + item.HttpResponse = nil + ok := c.SetWithTTL(key, item, int64(len(key)+len(result)), ttl) + if !ok { + return ErrCacheDropped + } + return nil +} + +func (c *RisCache) SetHttpCache(req *jsonrpc.RpcRequest, resp *fasthttp.Response, ttl time.Duration) error { + key, err := req.ToCacheKey() + if err != nil { + return err + } + item := AcquireCachedItem() + item.Result = nil + item.RpcError = nil + item.HttpResponse = &CachedHttpResp{ + Code: resp.StatusCode(), + ContentEncoding: resp.Header.Peek(fasthttp.HeaderContentEncoding), + ContentType: resp.Header.ContentType(), + Body: resp.Body(), + } + ok := c.SetWithTTL(key, item, int64(len(key)+item.HttpResponse.Size()), ttl) + if !ok { + return ErrCacheDropped + } + return nil +} + +func (c *RisCache) GetItem(req *jsonrpc.RpcRequest) *CachedItem { + key, err := req.ToCacheKey() + if err != nil { + return nil + } + item, ok := c.Get(key) + if !ok { + return nil + } + i, ok := item.(*CachedItem) + if !ok { + return nil + } + return i +} + +func (c *RisCache) Clear() { + c.Cache.Clear() +} + +func evictItem(key, conflict uint64, value interface{}, cost int64) { + if i, ok := value.(*CachedItem); ok { + ReleaseCachedItem(i) + } +} diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..9a348fa --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,186 @@ +package main + +import ( + "context" + "fmt" + "github.com/fasthttp/router" + "github.com/google/gops/agent" + "github.com/revolution1/jsonrpc-proxy/manage" + "github.com/revolution1/jsonrpc-proxy/middleware" + "github.com/revolution1/jsonrpc-proxy/oldconfig" + "github.com/revolution1/jsonrpc-proxy/types" + "github.com/revolution1/jsonrpc-proxy/proxy" + "github.com/revolution1/jsonrpc-proxy/utils" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/valyala/fasthttp" + "os" + "os/signal" + "runtime" + "strconv" + "strings" + "sync" + "syscall" + "time" +) + +func main() { + rootCmd := cobra.Command{ + Use: "jsonrpc-proxy", + Short: "proxy for jsonrpc service", + } + flags := rootCmd.Flags() + printVer := flags.BoolP("version", "v", false, "print version") + path := flags.StringP("config", "c", "proxy.yaml", "the path of config file") + _ = rootCmd.MarkFlagFilename("config", "yaml", "yml") + //_ = cobra.MarkFlagRequired(flags, "config") + //ctx, cancel := context.WithCancel(context.Background()) + rootCmd.RunE = func(cmd *cobra.Command, args []string) error { + if *printVer { + fmt.Println(printVersion()) + return nil + } + // gops agent + if err := agent.Listen(agent.Options{}); err != nil { + return err + } + log.Infof("Loading config from %s", *path) + log.Infof("Version: %s", printVersion()) + conf, err := types. + if err != nil { + return err + } + //conf.MustValidate() + initLog(conf) + return runMain(conf) + } + _ = rootCmd.Execute() +} + +func runMain(config *oldconfig.Config) error { + utils.CheckFdLimit() + log.Infof("Build: %s %s %s, PID: %d", runtime.GOOS, runtime.Compiler, runtime.Version(), os.Getpid()) + r := router.New() + //if debugMode { + // log.Infof("Debug Mode enabled") + // r.GET("/debug/pprof/{name:*}", pprofhandler.PprofHandler) + //} + p := proxy.NewProxy(config) + p.RegisterHandler(r) + + serverListen := utils.GetHostFromUrl(config.Listen) + manageListen := utils.GetHostFromUrl(config.Manage.Listen) + + var manageServer *fasthttp.Server + m := manage.NewManage(config, p) + if serverListen == manageListen { + log.Warn("Manage Server listens at the same address with RPC Server") + m.RegisterHandler(r) + } else { + r := router.New() + m.RegisterHandler(r) + h := middleware.UseMiddleWares( + r.Handler, + middleware.PanicHandler, + middleware.Cors, + fasthttp.CompressHandler, + middleware.AccessLogMetricHandler("[Manage] ", config), + ) + manageServer = newServer("JSON-RPC Proxy Manage Server", h, log.TraceLevel, config) + } + h := middleware.UseMiddleWares( + r.Handler, + middleware.PanicHandler, + middleware.Cors, + fasthttp.CompressHandler, + middleware.AccessLogMetricHandler("", config), + ) + server := newServer("JSON-RPC Proxy Server", h, log.TraceLevel, config) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + go runServer(ctx, server, serverListen, wg) + + if manageServer != nil { + wg.Add(1) + go runServer(ctx, manageServer, manageListen, wg) + } + + sigCh := make(chan os.Signal) + signal.Notify(sigCh, os.Interrupt, os.Kill, syscall.SIGTERM) + go func() { + sig := <-sigCh + log.Infof("received signal '%s', shutting down server...", strings.ToUpper(sig.String())) + cancel() + }() + wg.Wait() + return nil +} + +func initLog(conf *oldconfig.Config) { + log.SetOutput(os.Stdout) + log.SetFormatter(&log.TextFormatter{ + FullTimestamp: true, + TimestampFormat: time.RFC3339, + QuoteEmptyFields: true, + ForceColors: conf.LogForceColors, + }) + level, err := log.ParseLevel(strings.ToLower(conf.LogLevel)) + if err != nil { + log.Fatal("Invalid logLevel") + } + oldconfig.DebugMode, err = strconv.ParseBool(os.Getenv("DEBUG")) + if err != nil { + oldconfig.DebugMode = conf.Debug + } + if oldconfig.DebugMode && level < log.DebugLevel { + level = log.DebugLevel + } + log.SetLevel(level) + log.Debugf("LogLevel: %s", log.GetLevel()) +} + +func newServer(name string, h fasthttp.RequestHandler, level log.Level, conf *oldconfig.Config) *fasthttp.Server { + return &fasthttp.Server{ + Name: name, + Handler: h, + ErrorHandler: nil, + HeaderReceived: nil, + ContinueHandler: nil, + TCPKeepalive: true, + ReadTimeout: conf.ReadTimeout.Duration, + WriteTimeout: conf.WriteTimeout.Duration, + IdleTimeout: conf.IdleTimeout.Duration, + Concurrency: 0, + DisableKeepalive: false, + ReduceMemoryUsage: false, + LogAllErrors: false, + Logger: middleware.LeveledLogger{Level: level}, + } +} + +func runServer(ctx context.Context, server *fasthttp.Server, listen string, wg *sync.WaitGroup) { + defer wg.Done() + if listen == "" { + log.Errorf("empty listen address for %s", server.Name) + } + + errCh := make(chan error) + go func() { + defer close(errCh) + log.Infof("%s listening at %s", server.Name, listen) + if err := server.ListenAndServe(listen); err != nil { + errCh <- err + } + }() + select { + case err := <-errCh: + log.Infof("%s exited with error %s...", server.Name, err.Error()) + case <-ctx.Done(): + if err := server.Shutdown(); err != nil { + log.WithError(err).WithField("name", server.Name).Error("error while shutting down server") + } + log.Infof("shutting down %s...", server.Name) + } +} diff --git a/cmd/version.go b/cmd/version.go new file mode 100644 index 0000000..58a178d --- /dev/null +++ b/cmd/version.go @@ -0,0 +1,38 @@ +package main + +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/revolution1/jsonrpc-proxy/metrics" +) + +var version = "0.0.3" + +var ( + commit = "" + branch = "" + tag = "" + buildInfo = "" + date = "" +) + +func printVersion() string { + return fmt.Sprintf("%s (commit='%s', branch='%s', tag='%s', date='%s', build='%s')", version, commit, branch, tag, date, buildInfo) +} + +func init() { + versionGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Name: "version", + Help: "version info of jsonrpc proxy", + ConstLabels: prometheus.Labels{ + "version": version, + "branch": branch, + "tag": tag, + "buildinfo": buildInfo, + "date": date, + }, + }) + prometheus.MustRegister(versionGauge) + versionGauge.Set(1) +} diff --git a/config.yaml b/config.yaml index b67939a..18c7b99 100644 --- a/config.yaml +++ b/config.yaml @@ -1,26 +1,137 @@ -# draft for future version version: 1.0 -proxy: - debug: false - accessLog: false - errorLog: false -manage: - listen: -backends: - mainnet-api: - keepalive: false - hosts: - - https://api.zilliqa.com - k8s_sd: - scheme: http + +listeners: + - id: http + address: 0.0.0.0:4201 + - id: tcp + address: 0.0.0.0:4203 + - id: manage + address: 0.0.0.0:8088 + +upstreams: + - name: seedpub + k8s_service: port: 4201 - labelSelecotr: - app: zilliqa - type: level2lookup + namespace: default + name: xxx + - name: mainnet + endpoints: + - https://api.zilliqa.com + + servers: - zilliqa-rpc: - # unix-socket tcp http websocket - listen: tcp://0.0.0.0:9090 - # domains: - # - a.b.com + - name: zilliqa-rpc + type: http + listeners: + - http + plugins: + - id: cors routers: + - path: / + plugins: + - id: logging + format: text # text | json + verbose: 1 # 1 2 3 + stream: stdout + - id: zilliqa-api + + - path: /ws + plugins: + - id: websocket + - id: websocket-to-simple-jsonrpc + - id: logging + format: text # text | json + verbose: 1 # 1 2 3 + stream: stdout + - id: zilliqa-api + + # https://pkg.go.dev/github.com/fasthttp/router + - path: /manage/{path:*} + plugins: + - id: basic-auth + username: admin + password: admin + - id: manage + + - name: zilliqa-tcp + type: tcp + listeners: + - tcp + KeepAlive: true + Delimiter: '\n' + plugins: + - id: tcp-to-simple-jsonrpc + - id: zilliqa-api + + +# user-defined plugin +processors: + - id: zilliqa-api + description: proxy jsonrpc request to zilliqa server and cache the responses + context: jsonrpc + plugins: + - id: jsonrpc + supportBatch: true + keepAlive: true + forward: + requestTimeout: 10s + supportBatch: true + keepAlive: true + upstreams: + - seedpub + defaultResponse: + code: 503 + content: "Service Unavaliable" + headers: null + - id: cache + maxSize: 256Mb + errFor: 1s + groups: + - methods: + # method that has no + - GetBlockchainInfo + - GetCurrentDSEpoch + - GetCurrentMiniEpoch + - GetDSBlockRate + - GetLatestDsBlock + - GetLatestTxBlock + - GetNumDSBlocks + - GetNumTransactions + - GetNumTxBlocks + - GetPrevDifficulty + - GetPrevDSDifficulty + - GetTotalCoinSupply + - GetTransactionRate + - GetTxBlockRate + - GetMinimumGasPrice + - GetNumTxnsDSEpoch + - GetNumTxnsTxEpoch + - GetPendingTxn + - GetPendingTxns + - GetRecentTransactions + # method that has param + - DSBlockListing + - TxBlockListing + - GetDsBlock + - GetTxBlock + - GetTransaction + - GetTransactionsForTxBlock + - GetTxnBodiesForTxBlock + - GetSmartContractInit + - GetSmartContracts + - GetSmartContractState + - GetSmartContractSubState + - GetBalance + for: 5s + errFor: 1s + # long term + - methods: + - GetContractAddressFromTransactionID + - GetSmartContractCode + for: 1h + errFor: 1s + # Permanent + - methods: + - GetNetworkId + for: 1h + errFor: 1s diff --git a/experiment/fasthttpserver/main.go b/experiment/fasthttpserver/main.go new file mode 100644 index 0000000..9f995a3 --- /dev/null +++ b/experiment/fasthttpserver/main.go @@ -0,0 +1,29 @@ +package main + +import ( + "fmt" + "github.com/valyala/fasthttp" + "golang.org/x/sync/errgroup" + "net" +) + +func main() { + listener1, _ := net.Listen("tcp4", "") + listener2, _ := net.Listen("tcp4", "") + server := fasthttp.Server{Name: "test", Handler: func(ctx *fasthttp.RequestCtx) { + fmt.Printf("Got request of Host %s, URI: %s\n", ctx.Host(), ctx.URI()) + ctx.SuccessString("text/html", "ok") + }} + defer server.Shutdown() + eg := errgroup.Group{} + eg.Go(func() error { + return server.Serve(listener1) + }) + eg.Go(func() error { + return server.Serve(listener2) + }) + fmt.Printf("listener1: http://%s\n", listener1.Addr().String()) + fmt.Printf("listener2: http://%s\n", listener2.Addr().String()) + eg.Wait() + fmt.Println("exit") +} diff --git a/experiment/fastwebsocket/ws.go b/experiment/fastwebsocket/ws.go new file mode 100644 index 0000000..9bd7cbb --- /dev/null +++ b/experiment/fastwebsocket/ws.go @@ -0,0 +1,216 @@ +package main + +import ( + "bytes" + "flag" + "github.com/fasthttp/websocket" + "log" + "time" + + "github.com/valyala/fasthttp" +) + +var addr = flag.String("addr", ":8080", "http service address") + +func serveHome(ctx *fasthttp.RequestCtx) { + log.Println(string(ctx.Path())) + + if !ctx.IsGet() { + ctx.Error("Method not allowed", fasthttp.StatusMethodNotAllowed) + return + } + fasthttp.ServeFile(ctx, "../home.html") +} + +func main() { + flag.Parse() + hub := newHub() + go hub.run() + + requestHandler := func(ctx *fasthttp.RequestCtx) { + switch string(ctx.Path()) { + case "/": + serveHome(ctx) + case "/ws": + serveWs(ctx, hub) + default: + ctx.Error("Unsupported path", fasthttp.StatusNotFound) + } + } + + server := fasthttp.Server{ + Name: "ChatExample", + Handler: requestHandler, + } + + log.Fatal(server.ListenAndServe(*addr)) +} + +type Hub struct { + // Registered clients. + clients map[*Client]bool + + // Inbound messages from the clients. + broadcast chan []byte + + // Register requests from the clients. + register chan *Client + + // Unregister requests from clients. + unregister chan *Client +} + +func newHub() *Hub { + return &Hub{ + broadcast: make(chan []byte), + register: make(chan *Client), + unregister: make(chan *Client), + clients: make(map[*Client]bool), + } +} + +func (h *Hub) run() { + for { + select { + case client := <-h.register: + h.clients[client] = true + case client := <-h.unregister: + if _, ok := h.clients[client]; ok { + delete(h.clients, client) + close(client.send) + } + case message := <-h.broadcast: + for client := range h.clients { + select { + case client.send <- message: + default: + close(client.send) + delete(h.clients, client) + } + } + } + } +} + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + + // Maximum message size allowed from peer. + maxMessageSize = 512 +) + +var ( + newline = []byte{'\n'} + space = []byte{' '} +) + +var upgrader = websocket.FastHTTPUpgrader{ + CheckOrigin: func(ctx *fasthttp.RequestCtx) bool { return true }, + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// Client is a middleman between the websocket connection and the hub. +type Client struct { + hub *Hub + + // The websocket connection. + conn *websocket.Conn + + // Buffered channel of outbound messages. + send chan []byte +} + +// readPump pumps messages from the websocket connection to the hub. +// +// The application runs readPump in a per-connection goroutine. The application +// ensures that there is at most one reader on a connection by executing all +// reads from this goroutine. +func (c *Client) readPump() { + defer func() { + c.hub.unregister <- c + c.conn.Close() + }() + c.conn.SetReadLimit(maxMessageSize) + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + for { + _, message, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("error: %v", err) + } + break + } + message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) + c.hub.broadcast <- message + } +} + +// writePump pumps messages from the hub to the websocket connection. +// +// A goroutine running writePump is started for each connection. The +// application ensures that there is at most one writer to a connection by +// executing all writes from this goroutine. +func (c *Client) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.conn.Close() + }() + for { + select { + case message, ok := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // The hub closed the channel. + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := c.conn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + w.Write(message) + + // Add queued chat messages to the current websocket message. + n := len(c.send) + for i := 0; i < n; i++ { + w.Write(newline) + w.Write(<-c.send) + } + + if err := w.Close(); err != nil { + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +// serveWs handles websocket requests from the peer. +func serveWs(ctx *fasthttp.RequestCtx, hub *Hub) { + err := upgrader.Upgrade(ctx, func(conn *websocket.Conn) { + client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)} + client.hub.register <- client + + go client.writePump() + client.readPump() + }) + + if err != nil { + log.Println(err) + } +} diff --git a/experiment/quote/quote_test.go b/experiment/quote/quote_test.go new file mode 100644 index 0000000..57bf0bb --- /dev/null +++ b/experiment/quote/quote_test.go @@ -0,0 +1,22 @@ +package quote + +import ( + "strconv" + "testing" +) + +func TestQuote(t *testing.T) { + const line = "aa\\nbb" + t.Log(line) + s, err := strconv.Unquote(line) + t.Log("unquoted:", s, "err:", err) + + v, mb, tail, err := strconv.UnquoteChar(`\"Fran & Freddie's Diner\"`, '"') + if err != nil { + t.Log(err) + } + + t.Log("value:", string(v)) + t.Log("multibyte:", mb) + t.Log("tail:", tail) +} diff --git a/fnv64/fnv64.go b/fnv64/fnv64.go new file mode 100644 index 0000000..cec768c --- /dev/null +++ b/fnv64/fnv64.go @@ -0,0 +1,21 @@ +package fnv64 + +type Fnv64a struct{} + +const ( + // offset64 FNVa offset basis. See https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function#FNV-1a_hash + offset64 = 14695981039346656037 + // prime64 FNVa prime value. See https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function#FNV-1a_hash + prime64 = 1099511628211 +) + +// Sum64 gets the string and returns its uint64 hash value. +func (f Fnv64a) Sum64(key string) uint64 { + var hash uint64 = offset64 + for i := 0; i < len(key); i++ { + hash ^= uint64(key[i]) + hash *= prime64 + } + + return hash +} diff --git a/go.mod b/go.mod index f0755f0..c21f479 100644 --- a/go.mod +++ b/go.mod @@ -8,12 +8,16 @@ require ( github.com/allegro/bigcache v1.2.1 github.com/andybalholm/brotli v1.0.1 // indirect github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054 + github.com/dgraph-io/ristretto v0.0.3 + github.com/dustin/go-humanize v1.0.0 github.com/fasthttp/router v1.3.3 + github.com/fasthttp/websocket v1.4.3 github.com/ghodss/yaml v1.0.0 github.com/google/gops v0.3.14 github.com/json-iterator/go v1.1.10 github.com/klauspost/compress v1.11.3 // indirect - github.com/mailru/easyjson v0.7.6 + github.com/mattn/go-sqlite3 v1.14.6 // indirect + github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.8.0 github.com/prometheus/common v0.15.0 // indirect @@ -23,8 +27,11 @@ require ( github.com/stretchr/testify v1.6.1 github.com/valyala/fasthttp v1.18.0 go.uber.org/multierr v1.6.0 + golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sys v0.0.0-20201211090839-8ad439b19e0f // indirect google.golang.org/protobuf v1.25.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gorm.io/driver/sqlite v1.1.4 // indirect + gorm.io/gorm v1.20.11 // indirect sigs.k8s.io/yaml v1.2.0 ) diff --git a/jsonrpc.go b/jsonrpc/jsonrpc.go similarity index 85% rename from jsonrpc.go rename to jsonrpc/jsonrpc.go index e0cd885..6adfbe0 100644 --- a/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -1,5 +1,5 @@ // ref: https://www.jsonrpc.org/specification -package main +package jsonrpc import ( "bytes" @@ -19,12 +19,12 @@ const JSONRPC2 = "2.0" var jsonSorted = jsoniter.Config{SortMapKeys: true, EscapeHTML: true}.Froze() -type rpcHeader struct { +type RpcHeader struct { Jsonrpc string `json:"jsonrpc,intern"` Id interface{} `json:"id,omitempty"` } -func (h rpcHeader) Validate() bool { +func (h RpcHeader) Validate() bool { switch h.Id.(type) { case string, float64, types.Nil: return h.Jsonrpc == JSONRPC2 @@ -33,17 +33,17 @@ func (h rpcHeader) Validate() bool { } type RpcRequest struct { - rpcHeader + RpcHeader Method string `json:"method"` Params interface{} `json:"params,omitempty"` } func NewRpcRequest(id int, method string, params interface{}) *RpcRequest { - return &RpcRequest{rpcHeader: rpcHeader{JSONRPC2, id}, Method: method, Params: params} + return &RpcRequest{RpcHeader: RpcHeader{JSONRPC2, id}, Method: method, Params: params} } func (r RpcRequest) Validate() bool { - return r.rpcHeader.Validate() && r.Method != "" + return r.RpcHeader.Validate() && r.Method != "" } func (r RpcRequest) String() string { @@ -59,13 +59,13 @@ func (r RpcRequest) ToCacheKey() (string, error) { } func (r *RpcRequest) Reset() { - r.rpcHeader.Id = nil + r.RpcHeader.Id = nil r.Method = "" r.Params = nil } type RpcResponse struct { - rpcHeader + RpcHeader Error *RpcError `json:"error,omitempty"` Result interface{} `json:"result,omitempty"` } @@ -75,7 +75,7 @@ func (r RpcResponse) Success() bool { } func (r *RpcResponse) Reset() { - r.rpcHeader.Id = nil + r.RpcHeader.Id = nil r.Error = nil r.Result = nil } @@ -107,7 +107,7 @@ type RpcRequests []*RpcRequest // //func (rs RpcRequests) Validate() bool { // for _,r:=range rs { -// return rs.rpcHeader.Validate() && r.Method != "" +// return rs.RpcHeader.Validate() && r.Method != "" // } //} // @@ -122,7 +122,7 @@ type RpcRequests []*RpcRequest func AcquireRpcRequest() *RpcRequest { r := rpcReqPool.Get() if r == nil { - return &RpcRequest{rpcHeader: rpcHeader{Jsonrpc: JSONRPC2}} + return &RpcRequest{RpcHeader: RpcHeader{Jsonrpc: JSONRPC2}} } return r.(*RpcRequest) } @@ -135,7 +135,7 @@ func ReleaseRpcRequest(r *RpcRequest) { func AcquireRpcResponse() *RpcResponse { r := rpcReqPool.Get() if r == nil { - return &RpcResponse{rpcHeader: rpcHeader{Jsonrpc: JSONRPC2}} + return &RpcResponse{RpcHeader: RpcHeader{Jsonrpc: JSONRPC2}} } return r.(*RpcResponse) } diff --git a/jsonrpc_errors.go b/jsonrpc/jsonrpc_errors.go similarity index 95% rename from jsonrpc_errors.go rename to jsonrpc/jsonrpc_errors.go index eaa06b7..da51e47 100644 --- a/jsonrpc_errors.go +++ b/jsonrpc/jsonrpc_errors.go @@ -1,10 +1,10 @@ -package main +package jsonrpc import ( "fmt" jsoniter "github.com/json-iterator/go" - log "github.com/sirupsen/logrus" "github.com/valyala/fasthttp" + "strconv" ) type RpcError struct { @@ -37,6 +37,13 @@ func (r *RpcError) WriteToRpcResponse(res *RpcResponse, id interface{}) { res.Id = id } +func (r *RpcError) Name() string { + if r.name != "" { + return r.name + } + return strconv.Itoa(r.Code) +} + func (r *RpcError) AccessLogError() string { return fmt.Sprintf("%s(%d)", r.name, r.Code) } diff --git a/jsonrpc_test.go b/jsonrpc/jsonrpc_test.go similarity index 98% rename from jsonrpc_test.go rename to jsonrpc/jsonrpc_test.go index d596bd8..858cdb5 100644 --- a/jsonrpc_test.go +++ b/jsonrpc/jsonrpc_test.go @@ -1,4 +1,4 @@ -package main +package jsonrpc import ( jsoniter "github.com/json-iterator/go" diff --git a/jsonrpc/logger.go b/jsonrpc/logger.go new file mode 100644 index 0000000..2b40fe8 --- /dev/null +++ b/jsonrpc/logger.go @@ -0,0 +1,7 @@ +package jsonrpc + +import ( + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("logger", "jsonrpc") diff --git a/listener/manage.go b/listener/manage.go new file mode 100644 index 0000000..1b800bc --- /dev/null +++ b/listener/manage.go @@ -0,0 +1,27 @@ +package listener + +import ( + "github.com/pkg/errors" + "github.com/revolution1/jsonrpc-proxy/server" + "github.com/revolution1/jsonrpc-proxy/types" + "net" + "sync" +) + +var ( + listens map[types.ListenerID]net.Listener + acquired map[types.ListenerID]server.ID + lmu sync.Mutex +) + +func AcquireListener(id types.ListenerID) (net.Listener, error) { + lmu.Lock() + defer lmu.Unlock() + if a, ok := acquired[id]; ok { + return nil, errors.Errorf("Listener already acquired by server %s", a) + } + if l, ok := listens[id]; ok { + return l, nil + } + return nil, errors.Errorf("Listener %s not found", id) +} diff --git a/main.go b/main.go index 5a26e3f..c42079a 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,11 @@ import ( "fmt" "github.com/fasthttp/router" "github.com/google/gops/agent" + "github.com/revolution1/jsonrpc-proxy/oldconfig" + "github.com/revolution1/jsonrpc-proxy/manage" + "github.com/revolution1/jsonrpc-proxy/middleware" + "github.com/revolution1/jsonrpc-proxy/proxy" + "github.com/revolution1/jsonrpc-proxy/utils" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/valyala/fasthttp" @@ -18,8 +23,6 @@ import ( "time" ) -var debugMode bool - func main() { rootCmd := cobra.Command{ Use: "jsonrpc-proxy", @@ -42,43 +45,55 @@ func main() { } log.Infof("Loading config from %s", *path) log.Infof("Version: %s", printVersion()) - config, err := LoadConfig(*path) + conf, err := oldconfig.LoadConfig(*path) if err != nil { return err } - config.MustValidate() - initLog(config) - return runMain(config) + conf.MustValidate() + initLog(conf) + return runMain(conf) } _ = rootCmd.Execute() } -func runMain(config *Config) error { - CheckFdLimit() +func runMain(config *oldconfig.Config) error { + utils.CheckFdLimit() log.Infof("Build: %s %s %s, PID: %d", runtime.GOOS, runtime.Compiler, runtime.Version(), os.Getpid()) r := router.New() //if debugMode { // log.Infof("Debug Mode enabled") // r.GET("/debug/pprof/{name:*}", pprofhandler.PprofHandler) //} - p := NewProxy(config) + p := proxy.NewProxy(config) p.RegisterHandler(r) - serverListen := GetHostFromUrl(config.Listen) - manageListen := GetHostFromUrl(config.Manage.Listen) + serverListen := utils.GetHostFromUrl(config.Listen) + manageListen := utils.GetHostFromUrl(config.Manage.Listen) var manageServer *fasthttp.Server - m := NewManage(config, p) + m := manage.NewManage(config, p) if serverListen == manageListen { log.Warn("Manage Server listens at the same address with RPC Server") - m.registerHandler(r) + m.RegisterHandler(r) } else { r := router.New() - m.registerHandler(r) - h := useMiddleWares(r.Handler, panicHandler, Cors, fasthttp.CompressHandler, accessLogMetricHandler("[Manage] ", config)) + m.RegisterHandler(r) + h := middleware.UseMiddleWares( + r.Handler, + middleware.PanicHandler, + middleware.Cors, + fasthttp.CompressHandler, + middleware.AccessLogMetricHandler("[Manage] ", config), + ) manageServer = newServer("JSON-RPC Proxy Manage Server", h, log.TraceLevel, config) } - h := useMiddleWares(r.Handler, panicHandler, Cors, fasthttp.CompressHandler, accessLogMetricHandler("", config)) + h := middleware.UseMiddleWares( + r.Handler, + middleware.PanicHandler, + middleware.Cors, + fasthttp.CompressHandler, + middleware.AccessLogMetricHandler("", config), + ) server := newServer("JSON-RPC Proxy Server", h, log.TraceLevel, config) ctx, cancel := context.WithCancel(context.Background()) @@ -102,30 +117,30 @@ func runMain(config *Config) error { return nil } -func initLog(config *Config) { +func initLog(conf *oldconfig.Config) { log.SetOutput(os.Stdout) log.SetFormatter(&log.TextFormatter{ FullTimestamp: true, TimestampFormat: time.RFC3339, QuoteEmptyFields: true, - ForceColors: config.LogForceColors, + ForceColors: conf.LogForceColors, }) - level, err := log.ParseLevel(strings.ToLower(config.LogLevel)) + level, err := log.ParseLevel(strings.ToLower(conf.LogLevel)) if err != nil { log.Fatal("Invalid logLevel") } - debugMode, err = strconv.ParseBool(os.Getenv("DEBUG")) + oldconfig.DebugMode, err = strconv.ParseBool(os.Getenv("DEBUG")) if err != nil { - debugMode = config.Debug + oldconfig.DebugMode = conf.Debug } - if debugMode && level < log.DebugLevel { + if oldconfig.DebugMode && level < log.DebugLevel { level = log.DebugLevel } log.SetLevel(level) log.Debugf("LogLevel: %s", log.GetLevel()) } -func newServer(name string, h fasthttp.RequestHandler, level log.Level, config *Config) *fasthttp.Server { +func newServer(name string, h fasthttp.RequestHandler, level log.Level, conf *oldconfig.Config) *fasthttp.Server { return &fasthttp.Server{ Name: name, Handler: h, @@ -133,14 +148,14 @@ func newServer(name string, h fasthttp.RequestHandler, level log.Level, config * HeaderReceived: nil, ContinueHandler: nil, TCPKeepalive: true, - ReadTimeout: config.ReadTimeout.Duration, - WriteTimeout: config.WriteTimeout.Duration, - IdleTimeout: config.IdleTimeout.Duration, + ReadTimeout: conf.ReadTimeout.Duration, + WriteTimeout: conf.WriteTimeout.Duration, + IdleTimeout: conf.IdleTimeout.Duration, Concurrency: 0, DisableKeepalive: false, ReduceMemoryUsage: false, LogAllErrors: false, - Logger: LeveledLogger{level: level}, + Logger: middleware.LeveledLogger{Level: level}, } } diff --git a/manage.go b/manage/manage.go similarity index 54% rename from manage.go rename to manage/manage.go index a492e85..1db0578 100644 --- a/manage.go +++ b/manage/manage.go @@ -1,7 +1,10 @@ -package main +package manage import ( "github.com/fasthttp/router" + "github.com/revolution1/jsonrpc-proxy/oldconfig" + "github.com/revolution1/jsonrpc-proxy/metrics" + "github.com/revolution1/jsonrpc-proxy/proxy" "github.com/savsgio/gotils/nocopy" "github.com/valyala/fasthttp" "github.com/valyala/fasthttp/pprofhandler" @@ -9,17 +12,17 @@ import ( type Manage struct { nocopy.NoCopy - config *Config - Proxy *Proxy + config *oldconfig.Config + Proxy *proxy.Proxy } -func NewManage(config *Config, proxy *Proxy) *Manage { +func NewManage(config *oldconfig.Config, proxy *proxy.Proxy) *Manage { return &Manage{config: config, Proxy: proxy} } -func (m *Manage) registerHandler(r *router.Router) { +func (m *Manage) RegisterHandler(r *router.Router) { r.GET("/debug/pprof/{name:*}", pprofhandler.PprofHandler) - r.GET(m.config.Manage.MetricsPath, PrometheusHandler) + r.GET(m.config.Manage.MetricsPath, metrics.PrometheusHandler) group := r.Group(m.config.Manage.Path) group.GET("/", m.Index) } diff --git a/metrics.go b/metrics/metrics.go similarity index 91% rename from metrics.go rename to metrics/metrics.go index d97a9ba..e513aa0 100644 --- a/metrics.go +++ b/metrics/metrics.go @@ -1,4 +1,4 @@ -package main +package metrics import ( "github.com/prometheus/client_golang/prometheus" @@ -6,7 +6,7 @@ import ( "github.com/valyala/fasthttp/fasthttpadaptor" ) -const MetricsNs = "jsonrpc_proxy" +const Namespace = "jsonrpc_proxy" // since prometheus/client_golang use net/http we need this net/http adapter for fasthttp var PrometheusHandler = fasthttpadaptor.NewFastHTTPHandler(promhttp.Handler()) @@ -15,7 +15,7 @@ var PrometheusHandler = fasthttpadaptor.NewFastHTTPHandler(promhttp.Handler()) var ( ReqDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Namespace: MetricsNs, + Namespace: Namespace, Name: "request_duration_seconds", Help: "request latencies of success requests", Buckets: []float64{.005, .01, .02, .04, .06, .1, .2, .4, .6, 1, 2, 4}, @@ -24,7 +24,7 @@ var ( ) ReqCount = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: MetricsNs, + Namespace: Namespace, Name: "requests_total", Help: "Total number of rpc requests.", }, @@ -32,26 +32,26 @@ var ( ) HttpReqCnt = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: MetricsNs, + Namespace: Namespace, Name: "http_requests_total", Help: "Total number of rpc requests by HTTP status code.", }, []string{"code", "path", "method"}, ) SentBytes = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: MetricsNs, + Namespace: Namespace, Name: "server_bytes_sent", Help: "total bytes sent by server", }) RecvBytes = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: MetricsNs, + Namespace: Namespace, Name: "server_bytes_recv", Help: "total bytes received by server", }) RpcCacheHit = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: MetricsNs, + Namespace: Namespace, Name: "rpc_cache_hit", Help: "Total number of rpc requests cache hit.", }, @@ -59,7 +59,7 @@ var ( ) RpcCacheMiss = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: MetricsNs, + Namespace: Namespace, Name: "rpc_cache_miss", Help: "Total number of rpc requests cache miss.", }, diff --git a/middlewares.go b/middleware/accessLog.go similarity index 61% rename from middlewares.go rename to middleware/accessLog.go index 0c15230..8195d73 100644 --- a/middlewares.go +++ b/middleware/accessLog.go @@ -1,45 +1,23 @@ -package main +package middleware import ( "bytes" "fmt" - "github.com/AdhityaRamadhanus/fasthttpcors" realip "github.com/Ferluci/fast-realip" "github.com/prometheus/client_golang/prometheus" + "github.com/revolution1/jsonrpc-proxy/oldconfig" + "github.com/revolution1/jsonrpc-proxy/jsonrpc" + "github.com/revolution1/jsonrpc-proxy/metrics" + "github.com/revolution1/jsonrpc-proxy/proxy" "github.com/savsgio/gotils" log "github.com/sirupsen/logrus" "github.com/valyala/fasthttp" - "runtime/debug" "strconv" "strings" "time" ) -var WellKnownHealthCheckerUserAgentPrefixes = []string{ - "ELB-HealthChecker", - "kube-probe", - "Prometheus", -} - -func guessIsHealthChecker(ua string) bool { - for _, p := range WellKnownHealthCheckerUserAgentPrefixes { - if strings.HasPrefix(ua, p) { - return true - } - } - return false -} - -type MiddleWare func(h fasthttp.RequestHandler) fasthttp.RequestHandler - -func useMiddleWares(handler fasthttp.RequestHandler, middleware ...MiddleWare) fasthttp.RequestHandler { - for _, m := range middleware { - handler = m(handler) - } - return handler -} - -func accessLogMetricHandler(prefix string, config *Config) MiddleWare { +func AccessLogMetricHandler(prefix string, config *oldconfig.Config) MiddleWare { // TODO: from-cache, is-batch return func(h fasthttp.RequestHandler) fasthttp.RequestHandler { return func(ctx *fasthttp.RequestCtx) { @@ -69,14 +47,14 @@ func accessLogMetricHandler(prefix string, config *Config) MiddleWare { if status != 200 { errStr = fmt.Sprintf("%d(%s)", status, fasthttp.StatusMessage(status)) } - if err, ok := ctx.UserValue("rpcErr").(*RpcError); ok { + if err, ok := ctx.UserValue("rpcErr").(*jsonrpc.RpcError); ok { errStr = err.AccessLogError() rpcErrCode = err.Code } else if err, ok := ctx.UserValue("rpcErr").(error); ok { errStr = err.Error() } methodsStr := "" - if rpcMethods = getCtxRpcMethods(ctx); rpcMethods != nil { + if rpcMethods = proxy.GetCtxRpcMethods(ctx); rpcMethods != nil { methodsStr = strings.Join(rpcMethods, ",") } if config.AccessLog { @@ -117,11 +95,11 @@ func accessLogMetricHandler(prefix string, config *Config) MiddleWare { if status != fasthttp.StatusNotFound && path != config.Manage.MetricsPath && !bytes.Equal(ctx.Method(), []byte(fasthttp.MethodOptions)) { - RecvBytes.Add(float64(reqSize)) - SentBytes.Add(float64(resSize)) + metrics.RecvBytes.Add(float64(reqSize)) + metrics.SentBytes.Add(float64(resSize)) if isRpcReq { for _, m := range rpcMethods { - ReqDuration.With(prometheus.Labels{ + metrics.ReqDuration.With(prometheus.Labels{ "code": strconv.Itoa(rpcErrCode), "path": path, "method": method, @@ -129,7 +107,7 @@ func accessLogMetricHandler(prefix string, config *Config) MiddleWare { }).Observe(float64(duration) / float64(time.Second)) } } else { - ReqDuration.With(prometheus.Labels{ + metrics.ReqDuration.With(prometheus.Labels{ "code": strconv.Itoa(status), "path": path, "method": method, @@ -140,38 +118,3 @@ func accessLogMetricHandler(prefix string, config *Config) MiddleWare { } } } - -func panicHandler(h fasthttp.RequestHandler) fasthttp.RequestHandler { - return func(ctx *fasthttp.RequestCtx) { - defer func() { - if r := recover(); r != nil { - debug.PrintStack() - ctx.ResetBody() - ctx.SetStatusCode(fasthttp.StatusInternalServerError) - if debugMode { - _, _ = ctx.Write(debug.Stack()) - } - } - }() - h(ctx) - } -} - -type LeveledLogger struct { - level log.Level -} - -func (l LeveledLogger) Printf(format string, args ...interface{}) { - log.StandardLogger().Logf(l.level, format, args...) -} - -var Cors MiddleWare - -func init() { - corsHandler := fasthttpcors.NewCorsHandler(fasthttpcors.Options{ - AllowedOrigins: []string{"*"}, - AllowedHeaders: []string{"*"}, - AllowedMethods: []string{"GET", "POST"}, - }) - Cors = corsHandler.CorsMiddleware -} diff --git a/middleware/cors.go b/middleware/cors.go new file mode 100644 index 0000000..7c43d79 --- /dev/null +++ b/middleware/cors.go @@ -0,0 +1,14 @@ +package middleware + +import "github.com/AdhityaRamadhanus/fasthttpcors" + +var Cors MiddleWare + +func init() { + corsHandler := fasthttpcors.NewCorsHandler(fasthttpcors.Options{ + AllowedOrigins: []string{"*"}, + AllowedHeaders: []string{"*"}, + AllowedMethods: []string{"GET", "POST"}, + }) + Cors = corsHandler.CorsMiddleware +} diff --git a/middleware/leveledLogger.go b/middleware/leveledLogger.go new file mode 100644 index 0000000..af8ad56 --- /dev/null +++ b/middleware/leveledLogger.go @@ -0,0 +1,11 @@ +package middleware + +import log "github.com/sirupsen/logrus" + +type LeveledLogger struct { + Level log.Level +} + +func (l LeveledLogger) Printf(format string, args ...interface{}) { + log.StandardLogger().Logf(l.Level, format, args...) +} diff --git a/middleware/middlewares.go b/middleware/middlewares.go new file mode 100644 index 0000000..01958a2 --- /dev/null +++ b/middleware/middlewares.go @@ -0,0 +1,30 @@ +package middleware + +import ( + "github.com/valyala/fasthttp" + "strings" +) + +var WellKnownHealthCheckerUserAgentPrefixes = []string{ + "ELB-HealthChecker", + "kube-probe", + "Prometheus", +} + +func guessIsHealthChecker(ua string) bool { + for _, p := range WellKnownHealthCheckerUserAgentPrefixes { + if strings.HasPrefix(ua, p) { + return true + } + } + return false +} + +type MiddleWare func(h fasthttp.RequestHandler) fasthttp.RequestHandler + +func UseMiddleWares(handler fasthttp.RequestHandler, middleware ...MiddleWare) fasthttp.RequestHandler { + for _, m := range middleware { + handler = m(handler) + } + return handler +} diff --git a/middleware/panic.go b/middleware/panic.go new file mode 100644 index 0000000..139ec2d --- /dev/null +++ b/middleware/panic.go @@ -0,0 +1,25 @@ +package middleware + +import ( + "github.com/revolution1/jsonrpc-proxy/oldconfig" + log "github.com/sirupsen/logrus" + "github.com/valyala/fasthttp" + "runtime/debug" +) + +func PanicHandler(h fasthttp.RequestHandler) fasthttp.RequestHandler { + return func(ctx *fasthttp.RequestCtx) { + defer func() { + if r := recover(); r != nil { + debug.PrintStack() + log.Errorf("panic %v", r) + ctx.ResetBody() + ctx.SetStatusCode(fasthttp.StatusInternalServerError) + if oldconfig.DebugMode { + _, _ = ctx.Write(debug.Stack()) + } + } + }() + h(ctx) + } +} diff --git a/config.go b/oldconfig/config.go similarity index 65% rename from config.go rename to oldconfig/config.go index afa759d..055c303 100644 --- a/config.go +++ b/oldconfig/config.go @@ -1,9 +1,9 @@ -package main +package oldconfig import ( "fmt" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "github.com/revolution1/jsonrpc-proxy/types" "io/ioutil" "net/url" "sigs.k8s.io/yaml" @@ -12,21 +12,21 @@ import ( ) type Config struct { - LogLevel string `json:"logLevel"` - LogForceColors bool `json:"logForceColors"` - Debug bool `json:"debug"` - AccessLog bool `json:"accessLog"` - Manage ManageConfig `json:"manage"` - Upstreams []string `json:"upstreams"` - Listen string `json:"listen"` - Path string `json:"path"` - KeepAlive string `json:"keepAlive"` - UpstreamRequestTimeout Duration `json:"upstreamRequestTimeout"` - ReadTimeout Duration `json:"readTimeout"` - WriteTimeout Duration `json:"writeTimeout"` - IdleTimeout Duration `json:"idleTimeout"` - ErrFor Duration `json:"errFor"` - CacheConfigs []CacheConfig `json:"cacheConfigs"` + LogLevel string `json:"logLevel"` + LogForceColors bool `json:"logForceColors"` + Debug bool `json:"debug"` + AccessLog bool `json:"accessLog"` + Manage ManageConfig `json:"manage"` + Upstreams []string `json:"upstreams"` + Listen string `json:"listen"` + Path string `json:"path"` + KeepAlive string `json:"keepAlive"` + UpstreamRequestTimeout types.Duration `json:"upstreamRequestTimeout"` + ReadTimeout types.Duration `json:"readTimeout"` + WriteTimeout types.Duration `json:"writeTimeout"` + IdleTimeout types.Duration `json:"idleTimeout"` + ErrFor types.Duration `json:"errFor"` + CacheConfigs []CacheConfig `json:"cacheConfigs"` } type ManageConfig struct { @@ -36,9 +36,9 @@ type ManageConfig struct { } type CacheConfig struct { - Methods []string `json:"methods"` - For Duration `json:"for"` - ErrFor Duration `json:"errFor"` + Methods []string `json:"methods"` + For types.Duration `json:"for"` + ErrFor types.Duration `json:"errFor"` } func (cc *CacheConfig) Sort() { diff --git a/config_test.go b/oldconfig/config_test.go similarity index 94% rename from config_test.go rename to oldconfig/config_test.go index e541469..1f160ab 100644 --- a/config_test.go +++ b/oldconfig/config_test.go @@ -1,4 +1,4 @@ -package main +package oldconfig import ( "github.com/ghodss/yaml" diff --git a/oldconfig/debug.go b/oldconfig/debug.go new file mode 100644 index 0000000..d44ad17 --- /dev/null +++ b/oldconfig/debug.go @@ -0,0 +1,3 @@ +package oldconfig + +var DebugMode bool diff --git a/oldconfig/logger.go b/oldconfig/logger.go new file mode 100644 index 0000000..2492426 --- /dev/null +++ b/oldconfig/logger.go @@ -0,0 +1,7 @@ +package oldconfig + +import ( + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("logger", "config") diff --git a/plugin/config.go b/plugin/config.go new file mode 100644 index 0000000..28115b0 --- /dev/null +++ b/plugin/config.go @@ -0,0 +1,41 @@ +package plugin + +import ( + "github.com/pkg/errors" + "github.com/revolution1/jsonrpc-proxy/types" +) + +//Config the config of plugin instance +type Config struct { + ID string + Spec map[string]interface{} +} + +func LoadPluginConfig(raw types.RawPluginConfig) (*Config, error) { + conf := &Config{} + if rawID, ok := raw["id"]; !ok { + return nil, errors.New("missing required field 'id' in plugin config") + } else if id, ok := rawID.(string); !ok { + return nil, errors.New("field 'id' of plugin config should be string") + } else { + conf.ID = id + } + conf.Spec = make(map[string]interface{}) + for k, v := range raw { + if k != "id" { + conf.Spec[k] = v + } + } + return conf, nil +} + +func LoadPluginConfigs(raws []types.RawPluginConfig) (configs []*Config, err error) { + for _, raw := range raws { + conf, err := LoadPluginConfig(raw) + if err != nil { + return nil, err + } + configs = append(configs, conf) + } + return +} diff --git a/plugin/load.go b/plugin/load.go new file mode 100644 index 0000000..7be2f1a --- /dev/null +++ b/plugin/load.go @@ -0,0 +1,3 @@ +package plugin + + diff --git a/plugin/logger.go b/plugin/logger.go new file mode 100644 index 0000000..77097c0 --- /dev/null +++ b/plugin/logger.go @@ -0,0 +1,7 @@ +package plugin + +import ( + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("logger", "plugin") diff --git a/plugin/plugin.go b/plugin/plugin.go new file mode 100644 index 0000000..8978f42 --- /dev/null +++ b/plugin/plugin.go @@ -0,0 +1,49 @@ +package plugin + +import ( + "github.com/pkg/errors" + "github.com/revolution1/jsonrpc-proxy/proxyctx" +) + +type HandleFunc func(ctx proxyctx.Context) error + +//DefaultTerminator terminate a context flow +func DefaultTerminator(proxyctx.Context) error { return nil } + +type Plugin interface { + ID() string + Info() *Info + New(*Config) (Plugin, error) + Handler(HandleFunc) HandleFunc + Destroy() +} + +type Info struct { + ID string + Version string + Description string + AcceptContexts []string + ProvideContext string +} + +func (p *Info) CanHandle(contextType string) bool { + for _, i := range p.AcceptContexts { + if i == "*" { + return true + } + if i == contextType { + return true + } + } + return false +} + +func (p *Info) ActualProvideContext(input string) (string, error) { + if p.ProvideContext != "" { + return p.ProvideContext, nil + } + if !p.CanHandle(input) { + return "", errors.Errorf("plugin %s cannot handle context type %s", p.ID, input) + } + return input, nil +} diff --git a/plugin/plugin_test.go b/plugin/plugin_test.go new file mode 100644 index 0000000..b77f6fc --- /dev/null +++ b/plugin/plugin_test.go @@ -0,0 +1,35 @@ +package plugin + +import ( + assertion "github.com/stretchr/testify/assert" + "testing" +) + + +const conf1 = ` +- id: basic-auth + username: admin + password: password +- id: manage +` + +const conf2 = ` +- id: basic-auth + username: admin + password: password +- random: a +` + +func TestLoadPluginConfig(t *testing.T) { + assert := assertion.New(t) + config, err := LoadPluginsConfig([]byte(conf1)) + assert.NoError(err) + assert.Equal("basic-auth", string(config.ID)) + assert.Equal("admin", config.Spec["username"]) + assert.Equal("manage", string(config.Next.ID)) + assert.Nil(config.Next.Next) + + c2, err := LoadPluginsConfig([]byte(conf2)) + assert.Nil(c2) + assert.Error(err) +} diff --git a/plugin/plugins/logging.go b/plugin/plugins/logging.go new file mode 100644 index 0000000..009cffc --- /dev/null +++ b/plugin/plugins/logging.go @@ -0,0 +1,245 @@ +package plugins + +import ( + "fmt" + realip "github.com/Ferluci/fast-realip" + "github.com/pkg/errors" + "github.com/revolution1/jsonrpc-proxy/plugin" + "github.com/revolution1/jsonrpc-proxy/proxyctx" + "github.com/revolution1/jsonrpc-proxy/utils" + "github.com/sirupsen/logrus" + "os" + "strings" + "sync" + "time" +) + +const ( + FormatText = "text" + FormatJson = "json" +) + +type LoggerConfig struct { + Disabled bool `json:"disabled"` + Format string `json:"format"` + Verbose int `json:"verbose"` + Stream string `json:"stream"` + //MaxSize string `json:"max_size"` +} + +type LoggingConfig struct { + AccessLog LoggerConfig + ErrorLog LoggerConfig +} + +type LoggingPlugin struct { + info *plugin.Info + + config *LoggingConfig + + accessLog *logrus.Logger + accessLogfile *os.File + + errorLog *logrus.Logger + errorLogfile *os.File + + mu sync.Mutex +} + +func (l *LoggingPlugin) ID() string { + return "logging" +} + +func (l *LoggingPlugin) Info() *plugin.Info { + if l.info == nil { + l.info = &plugin.Info{ + ID: l.ID(), + Version: "1", + Description: "Request logging util", + AcceptContexts: []string{ + //"tcp", + "http", + "jsonrpc", + //"websocket", + }, + ProvideContext: "", + } + } + return l.info +} + +func (l *LoggingPlugin) New(config *plugin.Config) (plugin.Plugin, error) { + lc := &LoggingConfig{} + if err := utils.CastToStruct(config.Spec, lc); err != nil { + return nil, err + } + p := &LoggingPlugin{ + info: l.Info(), + config: lc, + } + l.accessLog = logrus.New() + l.errorLog = logrus.New() + l.preSetupLogger(l.accessLog, &lc.AccessLog) + l.preSetupLogger(l.errorLog, &lc.ErrorLog) + + switch lc.AccessLog.Stream { + case "stdout": + l.accessLog.SetOutput(os.Stdout) + case "stderr": + l.accessLog.SetOutput(os.Stderr) + default: + file, err := os.OpenFile(lc.AccessLog.Stream, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0755) + if err != nil { + panic(err) + } + p.accessLogfile = file + l.accessLog.SetOutput(file) + } + switch lc.ErrorLog.Stream { + case "stdout": + l.errorLog.SetOutput(os.Stdout) + case "stderr": + l.errorLog.SetOutput(os.Stderr) + default: + if lc.ErrorLog.Stream == lc.AccessLog.Stream { + l.errorLog.SetOutput(l.accessLogfile) + } else { + file, err := os.OpenFile(lc.ErrorLog.Stream, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0755) + if err != nil { + panic(err) + } + p.errorLogfile = file + l.errorLog.SetOutput(file) + } + } + return p, nil +} + +func (l *LoggingPlugin) Handler(handleFunc plugin.HandleFunc) plugin.HandleFunc { + return func(ctx proxyctx.Context) error { + start := time.Now() + err := handleFunc(ctx) + if err != nil { + return err + } + switch ctx.Type() { + case proxyctx.ContextHTTP: + return l.HandleHTTP(ctx.(*proxyctx.HTTPContext), start) + case proxyctx.ContextJSONRPC: + return l.HandleJSONRPC(ctx.(*proxyctx.JSONRPCContext), start) + } + return nil + } +} + +func (l *LoggingPlugin) HandleHTTP(ctx *proxyctx.HTTPContext, start time.Time) error { + duration := time.Now().Sub(start) + status := ctx.Ctx.Response.StatusCode() + method := string(ctx.Ctx.Method()) + scheme := strings.ToUpper(string(ctx.Ctx.URI().Scheme())) + uri := string(ctx.Ctx.RequestURI()) + //path := string(ctx.Ctx.URI().Path()) + ua := string(ctx.Ctx.UserAgent()) + ip := realip.FromRequest(ctx.Ctx) + var reqSize, resSize int + if ctx.Ctx.Request.IsBodyStream() { + reqSize = len(ctx.Ctx.Request.Header.RawHeaders()) + ctx.Ctx.Request.Header.ContentLength() + 4 + } else { + reqSize = len(ctx.Ctx.Request.Header.RawHeaders()) + len(ctx.Ctx.Request.Body()) + 4 + } + if ctx.Ctx.Response.IsBodyStream() { + resSize = ctx.Ctx.Response.Header.Len() + ctx.Ctx.Response.Header.ContentLength() + 4 + } else { + resSize = ctx.Ctx.Response.Header.Len() + len(ctx.Ctx.Response.Body()) + 4 + } + var printf = l.accessLog.Infof + if guessIsHealthChecker(ua) { + printf = l.accessLog.Tracef + } + if status < 200 && status >= 400 { + printf = l.errorLog.Errorf + } + printf( + `%s - %s - "%s %s" %d %d %d "%s" %s`+"\n", + scheme, ip, method, uri, status, reqSize, resSize, ua, duration, + ) + return nil +} + +func (l *LoggingPlugin) HandleJSONRPC(ctx *proxyctx.JSONRPCContext, start time.Time) error { + var extra []string + methods := make([]string, len(ctx.Requests)) + status := make([]string, len(ctx.Responses)) + duration := time.Now().Sub(start) + if ctx.Parent().Type() == proxyctx.ContextHTTP { + httpReq := ctx.Parent().(*proxyctx.HTTPContext).Ctx + extra = append(extra, fmt.Sprintf("(%s)", string(httpReq.UserAgent()))) + } + for _, m := range ctx.Requests { + methods = append(methods, m.Method) + } + for _, r := range ctx.Responses { + if r.Success() { + status = append(status, "OK") + } else { + status = append(status, r.Error.Name()) + } + } + l.accessLog.Infof( + `JSONRPC - %s - %s "%s(%s)" %s`, + ctx.FromIP, ctx.FromPath, strings.Join(methods, ","), strings.Join(status, ","), duration, + ) + return nil +} + +func (l *LoggingPlugin) Destroy() { + if l.accessLogfile != nil { + l.accessLogfile.Close() + l.accessLogfile = nil + } + if l.errorLogfile != nil { + l.errorLogfile.Close() + l.errorLogfile = nil + } +} + +func (*LoggingPlugin) preSetupLogger(l *logrus.Logger, c *LoggerConfig) { + if c.Disabled { + l.SetLevel(logrus.FatalLevel) + return + } + switch c.Format { + case FormatJson: + l.SetFormatter(&logrus.JSONFormatter{ + TimestampFormat: time.RFC3339, + DisableTimestamp: false, + DisableHTMLEscape: true, + }) + case FormatText: + l.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + TimestampFormat: time.RFC3339, + }) + default: + panic(errors.New("unknown logging format, should be json|text")) + } +} + +var WellKnownHealthCheckerUserAgentPrefixes = []string{ + "ELB-HealthChecker", + "kube-probe", + "Prometheus", +} + +func guessIsHealthChecker(ua string) bool { + for _, p := range WellKnownHealthCheckerUserAgentPrefixes { + if strings.HasPrefix(ua, p) { + return true + } + } + return false +} + +func init() { + plugin.RegisterPlugin(&LoggingPlugin{}) +} diff --git a/plugin/processor.go b/plugin/processor.go new file mode 100644 index 0000000..2b10995 --- /dev/null +++ b/plugin/processor.go @@ -0,0 +1,131 @@ +package plugin + +import ( + "github.com/pkg/errors" + "github.com/revolution1/jsonrpc-proxy/proxyctx" + "github.com/sirupsen/logrus" + "sync" +) + +type Processor interface { + Plugin + InitProcessor(config *ProcessorConfig) error +} + +type processor struct { + id string + contextType string + description string + + config *ProcessorConfig + + pluginInstances []Plugin + handler func(next HandleFunc) HandleFunc + + logger *logrus.Entry + mu sync.Mutex + initialized bool + info *Info + + refCount uint32 +} + +func (p *processor) ID() string { + return p.id +} + +func (p *processor) New(*Config) (Plugin, error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.refCount == 0 && !p.initialized { + err := p.InitProcessor(p.config) + if err != nil { + return nil, err + } + } + p.refCount++ + return p, nil +} + +func (p *processor) InitProcessor(conf *ProcessorConfig) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.initialized { + panic("processor already initialized") + } else { + p.initialized = true + } + p.config = conf + p.logger = log.WithField("processor", conf.ID) + p.id = conf.ID + p.description = conf.Description + p.contextType = conf.Context + p.info = nil + // init child plugins + prevCtxType := p.contextType + for _, pluginConf := range conf.PluginConfigs { + info, err := GetInfoOfID(pluginConf.ID) + if err != nil { + return err + } + if !info.CanHandle(prevCtxType) { + return errors.Errorf("plugin %s cannot handle previous context %s of this processor", pluginConf.ID, prevCtxType) + } + plugin, err := NewPlugin(pluginConf) + if err != nil { + return err + } + p.pluginInstances = append(p.pluginInstances, plugin) + prevCtxType, _ = info.ActualProvideContext(prevCtxType) + } + return nil +} + +func (p *processor) Info() *Info { + if p.info == nil { + if !p.initialized { + panic("processor has not been initialized") + } + p.info = &Info{ + ID: p.id, + Version: "1", + Description: p.description, + AcceptContexts: []string{p.contextType}, + ProvideContext: p.contextType, + } + } + return p.info +} + +func (p *processor) Handler(next HandleFunc) HandleFunc { + f := next + for i := len(p.pluginInstances) - 1; i >= 0; i-- { + f = p.pluginInstances[i].Handler(f) + } + return func(ctx proxyctx.Context) (err error) { + p.logger.Tracef("start handling in processor %s", p.id) + err = f(ctx) + p.logger.Tracef("done handling in processor %s", p.id) + return + } +} + +func (p *processor) Destroy() { + p.mu.Lock() + defer p.mu.Unlock() + p.refCount-- + if p.refCount > 0 { + return + } + wg := &sync.WaitGroup{} + wg.Add(len(p.pluginInstances)) + for _, plug := range p.pluginInstances { + go func(plug Plugin) { + defer wg.Done() + p.logger.Infof("stopping child plugin: %s", plug.ID()) + p.Destroy() + log.Infof("stopped child plugin: %s", plug.ID()) + }(plug) + } + wg.Wait() +} diff --git a/plugin/processor_test.go b/plugin/processor_test.go new file mode 100644 index 0000000..b0736c3 --- /dev/null +++ b/plugin/processor_test.go @@ -0,0 +1 @@ +package plugin diff --git a/plugin/processorconfig.go b/plugin/processorconfig.go new file mode 100644 index 0000000..6f231a8 --- /dev/null +++ b/plugin/processorconfig.go @@ -0,0 +1,24 @@ +package plugin + +import ( + "github.com/revolution1/jsonrpc-proxy/types" +) + +type ProcessorConfig struct { + ID string `json:"ID"` + Description string `json:"description"` + + //Context the name of accept and provide context of this processor + Context string `json:"context"` + PluginConfigs []*Config `json:"plugins"` +} + +func LoadProcessorConfig(raw types.RawProcessorConfig) (*ProcessorConfig, error) { + var err error + conf := &ProcessorConfig{ID: raw.ID, Description: raw.Description, Context: raw.Context} + conf.PluginConfigs, err = LoadPluginConfigs(raw.PluginConfigs) + if err != nil { + return nil, err + } + return conf, nil +} diff --git a/plugin/registry.go b/plugin/registry.go new file mode 100644 index 0000000..28dd65b --- /dev/null +++ b/plugin/registry.go @@ -0,0 +1,38 @@ +package plugin + +import ( + "fmt" + "github.com/pkg/errors" +) + +var pluginRegistry map[string]Plugin + +func RegisterPlugin(p Plugin) { + info := p.Info() + if _, ok := pluginRegistry[info.ID]; ok { + panic(fmt.Sprintf("plugin %s already registered", info.ID)) + } + pluginRegistry[info.ID] = p +} + +func DeregisterPlugin(id string) { + if _, ok := pluginRegistry[id]; ok { + delete(pluginRegistry, id) + } +} + +func GetInfoOfID(id string) (*Info, error) { + p, ok := pluginRegistry[id] + if !ok { + return nil, errors.Errorf("plugin %s not found", id) + } + return p.Info(), nil +} + +func NewPlugin(config *Config) (Plugin, error) { + p, ok := pluginRegistry[config.ID] + if !ok { + return nil, errors.Errorf("plugin %s not found", config.ID) + } + return p.New(config) +} diff --git a/pools.go b/pools.go deleted file mode 100644 index ab8883a..0000000 --- a/pools.go +++ /dev/null @@ -1,28 +0,0 @@ -package main - -import "sync" - -//import "sync" -// -//var rpcReqPool sync.Pool -// -// -// -//func AcquireRpcReq() *RpcError { -// -//} - -var cachedItemPool = sync.Pool{New: func() interface{} { return &CachedItem{} }} - -func AcquireCachedItem() *CachedItem { - v := cachedItemPool.Get() - if v == nil { - return &CachedItem{} - } - return v.(*CachedItem) -} - -func ReleaseCachedItem(item *CachedItem) { - item.Reset() - cachedItemPool.Put(item) -} diff --git a/proxy.yaml b/proxy.yaml index a7a0b51..9fe3708 100644 --- a/proxy.yaml +++ b/proxy.yaml @@ -55,11 +55,7 @@ cacheConfigs: - GetPendingTxn - GetPendingTxns - GetRecentTransactions - for: 5s - errFor: 1s - -# fast changing and with param -- methods: + # fast changing and with param - DSBlockListing - TxBlockListing - GetDsBlock diff --git a/proxy/logger.go b/proxy/logger.go new file mode 100644 index 0000000..e524b14 --- /dev/null +++ b/proxy/logger.go @@ -0,0 +1,7 @@ +package proxy + +import ( + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("logger", "proxy") diff --git a/proxy.go b/proxy/proxy.go similarity index 60% rename from proxy.go rename to proxy/proxy.go index 317ee70..c65f99f 100644 --- a/proxy.go +++ b/proxy/proxy.go @@ -1,14 +1,18 @@ -package main +package proxy import ( "bytes" "fmt" "github.com/fasthttp/router" jsoniter "github.com/json-iterator/go" - "github.com/pkg/errors" + "github.com/revolution1/jsonrpc-proxy/caching" + "github.com/revolution1/jsonrpc-proxy/oldconfig" + "github.com/revolution1/jsonrpc-proxy/jsonrpc" + "github.com/revolution1/jsonrpc-proxy/metrics" + "github.com/revolution1/jsonrpc-proxy/upstream" "github.com/savsgio/gotils" "github.com/savsgio/gotils/nocopy" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "github.com/valyala/fasthttp" "github.com/valyala/fasthttp/pprofhandler" "os" @@ -26,22 +30,22 @@ type Stats struct { type Proxy struct { nocopy.NoCopy - config *Config - CacheManager *CacheManager - um *UpstreamManager + config *oldconfig.Config + CacheManager *caching.BigCacheManager + um *upstream.Manager httpServer *fasthttp.Server stats Stats initOnce sync.Once } -func NewProxy(config *Config) *Proxy { +func NewProxy(config *oldconfig.Config) *Proxy { return &Proxy{config: config} } func (p *Proxy) init() { - p.um = NewUpstreamManager(p.config.Upstreams) - p.CacheManager = NewCacheManager() + p.um = upstream.NewUpstreamManager(p.config.Upstreams) + p.CacheManager = caching.NewCacheManager() p.httpServer = &fasthttp.Server{ Name: "JSON-RPC Proxy Server", Handler: fasthttp.CompressHandler(p.requestHandler), @@ -51,7 +55,7 @@ func (p *Proxy) init() { Concurrency: 0, DisableKeepalive: false, ReduceMemoryUsage: false, - Logger: log.StandardLogger(), + Logger: log, } } @@ -65,7 +69,7 @@ func (p *Proxy) RegisterHandler(r *router.Router) { func (p *Proxy) Serve() error { r := router.New() - if log.GetLevel() == log.DebugLevel { + if log.Level == logrus.DebugLevel { r.GET("/debug", pprofhandler.PprofHandler) } ch := make(chan error) @@ -87,48 +91,48 @@ func (p *Proxy) Serve() error { } func (p *Proxy) requestHandler(ctx *fasthttp.RequestCtx) { - var resps []RpcResponse + var resps []jsonrpc.RpcResponse ctx.SetUserValue("isRpcReq", true) reqBody := bytes.TrimSpace(ctx.Request.Body()) // length of minimum valid request '{"jsonrpc":"2.0","method":"1","id":1}' if len(reqBody) < 37 { - writeRpcErrResp(ctx, ErrRpcParseError, nil) + WriteRpcErrResp(ctx, jsonrpc.ErrRpcParseError, nil) return } - reqs, rpcErr := ParseRequest(reqBody) + reqs, rpcErr := jsonrpc.ParseRequest(reqBody) if rpcErr != nil { - writeRpcErrResp(ctx, rpcErr, nil) + WriteRpcErrResp(ctx, rpcErr, nil) return } isMonoReq := reqBody[0] == '{' // && len(reqs) == 1 if len(reqs) == 0 { - writeRpcErrResp(ctx, ErrRpcInvalidRequest, nil) + WriteRpcErrResp(ctx, jsonrpc.ErrRpcInvalidRequest, nil) return } methodNames := make([]string, len(reqs)) for i, r := range reqs { methodNames[i] = r.Method } - setCtxRpcMethods(ctx, methodNames) + SetCtxRpcMethods(ctx, methodNames) cacheFor := time.Duration(0) errFor := p.config.ErrFor.Duration allCached := true - resps = make([]RpcResponse, len(reqs)) + resps = make([]jsonrpc.RpcResponse, len(reqs)) for idx, req := range reqs { if !req.Validate() { if isMonoReq { - writeRpcErrResp(ctx, ErrRpcInvalidRequest, req.Id) + WriteRpcErrResp(ctx, jsonrpc.ErrRpcInvalidRequest, req.Id) return } // if request is invalid, just set error - ErrRpcInvalidRequest.WriteToRpcResponse(&resps[idx], req.Id) + jsonrpc.ErrRpcInvalidRequest.WriteToRpcResponse(&resps[idx], req.Id) continue } // skip cache if is valid req&upResp but no cache config set cc := p.config.Search(req.Method) if cc == nil { allCached = false - RpcCacheMiss.WithLabelValues(req.Method).Inc() + metrics.RpcCacheMiss.WithLabelValues(req.Method).Inc() break } // use the minimum non-zero cache duration @@ -140,26 +144,26 @@ func (p *Proxy) requestHandler(ctx *fasthttp.RequestCtx) { } res := p.GetCachedItem(req, cc) if res == nil { - RpcCacheMiss.WithLabelValues(req.Method).Inc() + metrics.RpcCacheMiss.WithLabelValues(req.Method).Inc() allCached = false break } // found cached - RpcCacheHit.WithLabelValues(req.Method).Inc() + metrics.RpcCacheHit.WithLabelValues(req.Method).Inc() if res.IsHttpResponse() && isMonoReq { // cached http error or something res.WriteHttpResponse(&ctx.Response) return } else if res.IsRpc() { if isMonoReq { resp := res.GetRpcResponse(req.Id) - writeJsonResp(ctx, resp) + WriteJsonResp(ctx, resp) return } res.WriteToRpcResponse(&resps[idx], req.Id) } } if allCached { - writeJsonResps(ctx, resps) + WriteJsonResps(ctx, resps) //for _, r := range resps { // ReleaseRpcResponse(r) //} @@ -169,26 +173,26 @@ func (p *Proxy) requestHandler(ctx *fasthttp.RequestCtx) { // cache not found upResp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(upResp) - setAcceptEncoding(ctx) + SetAcceptEncoding(ctx) err := p.um.DoTimeout(&ctx.Request, upResp, p.config.UpstreamRequestTimeout.Duration) // network errors if err != nil { log.WithError(err).WithField("methods", methodNames).Warn("error while requesting from upstream") log.WithError(err).Tracef("error while requesting from upstream: \n%s", &ctx.Request) - e := ErrWithData(ErrRpcInternalError, err.Error()) + e := jsonrpc.ErrWithData(jsonrpc.ErrRpcInternalError, err.Error()) for idx, req := range reqs { p.SetCachedError(req, e, errFor) if isMonoReq { - writeRpcErrResp(ctx, e, req.Id) + WriteRpcErrResp(ctx, e, req.Id) return } e.WriteToRpcResponse(&resps[idx], req.Id) } - writeJsonResps(ctx, resps) - ctx.SetStatusCode(StatusCodeOfRpcError(e)) + WriteJsonResps(ctx, resps) + ctx.SetStatusCode(jsonrpc.StatusCodeOfRpcError(e)) return } - upRespBody, err := getResponseBody(upResp) + upRespBody, err := GetResponseBody(upResp) // read body or decompression errors if upResp.StatusCode() < 200 || upResp.StatusCode() >= 400 || err != nil { log.WithError(err).WithField("methods", methodNames).Warn("fail to decode response, simply forward to client") @@ -227,7 +231,7 @@ func (p *Proxy) requestHandler(ctx *fasthttp.RequestCtx) { for idx, resp := range resps { // jsonrpc errors if resp.Error != nil { - if !resp.Error.Is(ErrRpcInvalidRequest) { + if !resp.Error.Is(jsonrpc.ErrRpcInvalidRequest) { log.WithField("rpcErr", resp.Error).Tracef("rpc error while requesting from upstream: \n%s\n", reqs[idx]) p.SetCachedError(reqs[idx], resp.Error, errFor) } @@ -238,29 +242,29 @@ func (p *Proxy) requestHandler(ctx *fasthttp.RequestCtx) { } if isMonoReq { - writeJsonResp(ctx, &resps[0]) + WriteJsonResp(ctx, &resps[0]) } else { - writeJsonResps(ctx, resps) + WriteJsonResps(ctx, resps) } } -func (p *Proxy) SetCachedHttpError(req *RpcRequest, code int, message []byte, errFor time.Duration) { +func (p *Proxy) SetCachedHttpError(req *jsonrpc.RpcRequest, code int, message []byte, errFor time.Duration) { key, err := req.ToCacheKey() if err != nil { return } - err = p.CacheManager.Set(key, (&CachedItem{HttpResponse: &CachedHttpResp{Code: code, Body: message}}).Marshal(), errFor) + err = p.CacheManager.Set(key, (&caching.CachedItem{HttpResponse: &caching.CachedHttpResp{Code: code, Body: message}}).Marshal(), errFor) if err != nil { log.WithError(err).Error("error while setting cached HTTP error") } } -func (p *Proxy) SetCachedResponse(req *RpcRequest, resp *fasthttp.Response, errFor time.Duration) { +func (p *Proxy) SetCachedResponse(req *jsonrpc.RpcRequest, resp *fasthttp.Response, errFor time.Duration) { key, err := req.ToCacheKey() if err != nil { return } - err = p.CacheManager.Set(key, (&CachedItem{HttpResponse: &CachedHttpResp{ + err = p.CacheManager.Set(key, (&caching.CachedItem{HttpResponse: &caching.CachedHttpResp{ Code: resp.StatusCode(), ContentEncoding: resp.Header.Peek(fasthttp.HeaderContentEncoding), ContentType: resp.Header.ContentType(), @@ -271,17 +275,17 @@ func (p *Proxy) SetCachedResponse(req *RpcRequest, resp *fasthttp.Response, errF } } -func (p *Proxy) SetCachedError(req *RpcRequest, e *RpcError, errFor time.Duration) { +func (p *Proxy) SetCachedError(req *jsonrpc.RpcRequest, e *jsonrpc.RpcError, errFor time.Duration) { key, err := req.ToCacheKey() if err != nil { return } - err = p.CacheManager.Set(key, (&CachedItem{RpcError: e}).Marshal(), errFor) + err = p.CacheManager.Set(key, (&caching.CachedItem{RpcError: e}).Marshal(), errFor) if err != nil { log.WithError(err).Error("error while setting cached error") } } -func (p *Proxy) SetCachedRpcResponse(req *RpcRequest, resp *RpcResponse, cacheFor time.Duration) { +func (p *Proxy) SetCachedRpcResponse(req *jsonrpc.RpcRequest, resp *jsonrpc.RpcResponse, cacheFor time.Duration) { key, err := req.ToCacheKey() if err != nil { return @@ -290,13 +294,13 @@ func (p *Proxy) SetCachedRpcResponse(req *RpcRequest, resp *RpcResponse, cacheFo if err != nil { log.WithError(err).Error("error while serializing cached response") } - err = p.CacheManager.Set(key, (&CachedItem{Result: data}).Marshal(), cacheFor) + err = p.CacheManager.Set(key, (&caching.CachedItem{Result: data}).Marshal(), cacheFor) if err != nil { log.WithError(err).Error("error while setting cached response") } } -func (p *Proxy) GetCachedItem(req *RpcRequest, cc *CacheConfig) *CachedItem { +func (p *Proxy) GetCachedItem(req *jsonrpc.RpcRequest, cc *oldconfig.CacheConfig) *caching.CachedItem { dur := time.Duration(0) key, err := req.ToCacheKey() if err != nil { @@ -326,99 +330,3 @@ func (p *Proxy) forwardResponse(ctx *fasthttp.RequestCtx, response *fasthttp.Res r.Header.SetBytesV(fasthttp.HeaderContentEncoding, response.Header.Peek(fasthttp.HeaderContentEncoding)) _ = response.BodyWriteTo(ctx) } - -func writeRpcErrResp(ctx *fasthttp.RequestCtx, rpcError *RpcError, id interface{}) { - ctx.ResetBody() - ctx.SetUserValue("rpcErr", rpcError) - ctx.SetBodyString(rpcError.JsonError(id)) - ctx.SetStatusCode(StatusCodeOfRpcError(rpcError)) - ctx.SetContentType("application/json; charset=utf-8") -} - -func getCtxRpcErr(ctx *fasthttp.RequestCtx) *RpcError { - if e, ok := ctx.UserValue("rpcErr").(*RpcError); ok { - return e - } - return nil -} -func getCtxRpcMethods(ctx *fasthttp.RequestCtx) []string { - if m, ok := ctx.UserValue("rpcMethods").([]string); ok { - return m - } - return nil -} - -func setCtxRpcMethods(ctx *fasthttp.RequestCtx, methodNames []string) { - ctx.SetUserValue("rpcMethods", methodNames) -} - -func writeJsonResp(ctx *fasthttp.RequestCtx, resp *RpcResponse) { - data, err := jsoniter.Marshal(resp) - if err != nil { - log.WithError(err).Panic("fail to marshal response from cache") - } - writeJsonRespRaw(ctx, data, StatusCodeOfRpcError(resp.Error)) - if resp.Error != nil { - ctx.SetUserValue("rpcErr", resp.Error) - } -} - -func writeJsonResps(ctx *fasthttp.RequestCtx, resps []RpcResponse) { - data, err := jsoniter.Marshal(resps) - if err != nil { - log.WithError(err).Panic("fail to marshal response from cache") - } - status := 500 - for _, r := range resps { - c := StatusCodeOfRpcError(r.Error) - if c < status { - status = c - } - } - writeJsonRespRaw(ctx, data, status) -} - -func writeJsonRespRaw(ctx *fasthttp.RequestCtx, body []byte, code int) { - ctx.Response.SetBody(body) - if ctx.Request.Header.ConnectionClose() { - ctx.Response.SetConnectionClose() - } - ctx.SetContentType("application/json; charset=utf-8") - ctx.SetStatusCode(code) -} - -const ( - strGzip = "gzip" - strBr = "br" - strDeflate = "deflate" -) - -var ErrUnknownContentEncoding = errors.New("Unknown Content Encoding") - -func getResponseBody(resp *fasthttp.Response) ([]byte, error) { - encoding := string(bytes.TrimSpace(resp.Header.Peek(fasthttp.HeaderContentEncoding))) - switch encoding { - case strGzip: - return resp.BodyGunzip() - case strBr: - return resp.BodyUnbrotli() - case strDeflate: - return resp.BodyInflate() - default: - body := resp.Body() - //if !isASCII(body) { - // // give it a try - // b, err := resp.BodyGunzip() - // if err != nil { - // return body, nil - // } - // return b, nil - //} - return body, nil - } - //return resp.Body(), errors.Wrapf(ErrUnknownContentEncoding, "encoding: '%s'", encoding) -} - -func setAcceptEncoding(ctx *fasthttp.RequestCtx) { - ctx.Request.Header.Del(fasthttp.HeaderAcceptEncoding) -} diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go new file mode 100644 index 0000000..0ccf992 --- /dev/null +++ b/proxy/proxy_test.go @@ -0,0 +1,11 @@ +package proxy + +import ( + jsoniter "github.com/json-iterator/go" + "github.com/revolution1/jsonrpc-proxy/jsonrpc" + "testing" +) + +func TestProxy(t *testing.T) { + t.Log(jsoniter.MarshalToString(&jsonrpc.RpcRequest{RpcHeader: jsonrpc.RpcHeader{Jsonrpc: jsonrpc.JSONRPC2, Id: 1}, Method: "", Params: jsoniter.RawMessage(`{"a":"b"}`)})) +} diff --git a/proxy/tools.go b/proxy/tools.go new file mode 100644 index 0000000..60b8bd3 --- /dev/null +++ b/proxy/tools.go @@ -0,0 +1,105 @@ +package proxy + +import ( + "bytes" + jsoniter "github.com/json-iterator/go" + "github.com/pkg/errors" + "github.com/revolution1/jsonrpc-proxy/jsonrpc" + "github.com/valyala/fasthttp" +) + +func WriteRpcErrResp(ctx *fasthttp.RequestCtx, rpcError *jsonrpc.RpcError, id interface{}) { + ctx.ResetBody() + ctx.SetUserValue("rpcErr", rpcError) + ctx.SetBodyString(rpcError.JsonError(id)) + ctx.SetStatusCode(jsonrpc.StatusCodeOfRpcError(rpcError)) + ctx.SetContentType("application/json; charset=utf-8") +} + +func GetCtxRpcErr(ctx *fasthttp.RequestCtx) *jsonrpc.RpcError { + if e, ok := ctx.UserValue("rpcErr").(*jsonrpc.RpcError); ok { + return e + } + return nil +} +func GetCtxRpcMethods(ctx *fasthttp.RequestCtx) []string { + if m, ok := ctx.UserValue("rpcMethods").([]string); ok { + return m + } + return nil +} + +func SetCtxRpcMethods(ctx *fasthttp.RequestCtx, methodNames []string) { + ctx.SetUserValue("rpcMethods", methodNames) +} + +func WriteJsonResp(ctx *fasthttp.RequestCtx, resp *jsonrpc.RpcResponse) { + data, err := jsoniter.Marshal(resp) + if err != nil { + log.WithError(err).Panic("fail to marshal response from cache") + } + WriteJsonRespRaw(ctx, data, jsonrpc.StatusCodeOfRpcError(resp.Error)) + if resp.Error != nil { + ctx.SetUserValue("rpcErr", resp.Error) + } +} + +func WriteJsonResps(ctx *fasthttp.RequestCtx, resps []jsonrpc.RpcResponse) { + data, err := jsoniter.Marshal(resps) + if err != nil { + log.WithError(err).Panic("fail to marshal response from cache") + } + status := 500 + for _, r := range resps { + c := jsonrpc.StatusCodeOfRpcError(r.Error) + if c < status { + status = c + } + } + WriteJsonRespRaw(ctx, data, status) +} + +func WriteJsonRespRaw(ctx *fasthttp.RequestCtx, body []byte, code int) { + ctx.Response.SetBody(body) + if ctx.Request.Header.ConnectionClose() { + ctx.Response.SetConnectionClose() + } + ctx.SetContentType("application/json; charset=utf-8") + ctx.SetStatusCode(code) +} + +const ( + strGzip = "gzip" + strBr = "br" + strDeflate = "deflate" +) + +var ErrUnknownContentEncoding = errors.New("Unknown Content Encoding") + +func GetResponseBody(resp *fasthttp.Response) ([]byte, error) { + encoding := string(bytes.TrimSpace(resp.Header.Peek(fasthttp.HeaderContentEncoding))) + switch encoding { + case strGzip: + return resp.BodyGunzip() + case strBr: + return resp.BodyUnbrotli() + case strDeflate: + return resp.BodyInflate() + default: + body := resp.Body() + //if !isASCII(body) { + // // give it a try + // b, err := resp.BodyGunzip() + // if err != nil { + // return body, nil + // } + // return b, nil + //} + return body, nil + } + //return resp.Body(), errors.Wrapf(ErrUnknownContentEncoding, "encoding: '%s'", encoding) +} + +func SetAcceptEncoding(ctx *fasthttp.RequestCtx) { + ctx.Request.Header.Del(fasthttp.HeaderAcceptEncoding) +} diff --git a/proxy_test.go b/proxy_test.go deleted file mode 100644 index 4f16874..0000000 --- a/proxy_test.go +++ /dev/null @@ -1,10 +0,0 @@ -package main - -import ( - jsoniter "github.com/json-iterator/go" - "testing" -) - -func TestProxy(t *testing.T) { - t.Log(jsoniter.MarshalToString(&RpcRequest{rpcHeader: rpcHeader{Jsonrpc: JSONRPC2, Id: 1}, Method: "", Params: jsoniter.RawMessage(`{"a":"b"}`)})) -} diff --git a/proxyctx/context.go b/proxyctx/context.go new file mode 100644 index 0000000..3a59028 --- /dev/null +++ b/proxyctx/context.go @@ -0,0 +1,40 @@ +package proxyctx + +import ( + "github.com/pkg/errors" + "github.com/valyala/fasthttp" +) + +type Context interface { + Type() string + Parent() Context +} + +var ( + //Return returning this error will skip following plugins + Return = errors.New("return to upper level context") +) + +type GeneralError struct { + Name string + Code int + Message string + Body []byte + Headers map[string]string +} + +func (g GeneralError) Error() string { + return g.Name + ": " + g.Message +} + +func (g *GeneralError) WriteHttpCtx(ctx *fasthttp.RequestCtx) { + ctx.Error(g.Error(), g.Code) + if g.Headers != nil { + for k, v := range g.Headers { + ctx.Response.Header.Set(k, v) + } + } + if g.Body != nil { + ctx.SetBody(g.Body) + } +} diff --git a/proxyctx/http.go b/proxyctx/http.go new file mode 100644 index 0000000..d9d69f9 --- /dev/null +++ b/proxyctx/http.go @@ -0,0 +1,22 @@ +package proxyctx + +import "github.com/valyala/fasthttp" + +const ContextHTTP = "http" + +type HTTPContext struct { + parent Context + Ctx *fasthttp.RequestCtx +} + +func NewHTTPContext(parent Context, ctx *fasthttp.RequestCtx) *HTTPContext { + return &HTTPContext{Ctx: ctx, parent: parent} +} + +func (h *HTTPContext) Parent() Context { + return h.parent +} + +func (h *HTTPContext) Type() string { + return ContextHTTP +} diff --git a/proxyctx/jsonrpc.go b/proxyctx/jsonrpc.go new file mode 100644 index 0000000..c0b3209 --- /dev/null +++ b/proxyctx/jsonrpc.go @@ -0,0 +1,29 @@ +package proxyctx + +import ( + "github.com/revolution1/jsonrpc-proxy/jsonrpc" +) + +const ContextJSONRPC = "jsonrpc" + +type JSONRPCContext struct { + parent Context + + FromIP string + FromPath string + IsMono bool + Requests []*jsonrpc.RpcRequest + Responses []*jsonrpc.RpcResponse +} + +func NewJSONRPCContext(parent Context, fromIP string, fromPath string, requests []*jsonrpc.RpcRequest) *JSONRPCContext { + return &JSONRPCContext{parent: parent, FromIP: fromIP, FromPath: fromPath, Requests: requests} +} + +func (j *JSONRPCContext) Parent() Context { + return j.parent +} + +func (J JSONRPCContext) Type() string { + return ContextJSONRPC +} diff --git a/server/http.go b/server/http.go new file mode 100644 index 0000000..6396a76 --- /dev/null +++ b/server/http.go @@ -0,0 +1,137 @@ +package server + +import ( + "github.com/fasthttp/router" + "github.com/pkg/errors" + "github.com/revolution1/jsonrpc-proxy/listener" + "github.com/revolution1/jsonrpc-proxy/plugin" + "github.com/revolution1/jsonrpc-proxy/proxyctx" + "github.com/revolution1/jsonrpc-proxy/types" + "github.com/valyala/fasthttp" + "golang.org/x/sync/errgroup" + "net" +) + +type HTTPConfig struct { + ID ID `json:"iD"` + Type Type `json:"type"` + Listeners []types.ListenerID `json:"listeners"` + Plugins []*plugin.Config `json:"plugins"` + Routers []*HTTPRouterConfig `json:"routers"` +} + +type HTTPServer struct { + server *fasthttp.Server + config *HTTPConfig + listeners []net.Listener + plugins []plugin.Plugin + router *router.Router + httpRouters []*HTTPRouter +} + +func NewHTTPServer(config *HTTPConfig) *HTTPServer { + s := &HTTPServer{config: config} + s.init() + return s +} + +func (h *HTTPServer) init() { + // init listeners + if len(h.config.Listeners) == 0 { + panic(errors.Errorf("empty listener list of %s", h.config.ID)) + } + for _, id := range h.config.Listeners { + l, err := listener.AcquireListener(id) + if err != nil { + panic(errors.Wrapf(err, "server %s failed to acquire listener", h.config.ID)) + } + h.listeners = append(h.listeners, l) + } + // init plugins + for _, pc := range h.config.Plugins { + info, err := plugin.GetInfoOfID(pc.ID) + if err != nil { + panic(err) + } + if !info.CanHandle(proxyctx.ContextHTTP) { + panic(errors.Errorf("plugin %s cannot handle context %s of this server", pc.ID, proxyctx.ContextHTTP)) + } + plug, err := plugin.NewPlugin(pc) + if err != nil { + panic(err) + } + h.plugins = append(h.plugins, plug) + } + // init routers + h.router = router.New() + if len(h.config.Routers) == 0 { + panic(errors.Errorf("empty router list of %s", h.config.ID)) + } + for _, rc := range h.config.Routers { + rt := NewHTTPRouter(rc, h.plugins) + h.router.Handle(rt.Method(), rt.Path(), rt.Handler) + h.httpRouters = append(h.httpRouters, rt) + } + h.server = &fasthttp.Server{ + Name: string(h.config.ID), + Handler: h.Handler, + } +} + +func (h *HTTPServer) Handler(ctx *fasthttp.RequestCtx) { + //defaultHandler := plugin.DefaultTerminator + //for _, plug := range h.plugins { + // defaultHandler = plug.Handler(defaultHandler) + //} + //err := defaultHandler(&proxyctx.HTTPContext{ctx}) + //if err != nil { + // if errors.Is(err, proxyctx.Return) { + // return + // } + // if e, ok := err.(*proxyctx.GeneralError); ok { + // e.WriteHttpCtx(ctx) + // } else { + // ctx.Error(err.Error(), fasthttp.StatusInternalServerError) + // } + // return + //} + h.router.Handler(ctx) +} + +//func (HTTPServer) fastHttpHandler(h plugin.HandleFunc) fasthttp.RequestHandler { +// return func(ctx *fasthttp.RequestCtx) { +// err := h(&proxyctx.HTTPContext{ctx}) +// if err != nil { +// if errors.Is(err, proxyctx.Return) { +// return +// } +// if e, ok := err.(*proxyctx.GeneralError); ok { +// e.WriteHttpCtx(ctx) +// } else { +// ctx.Error(err.Error(), fasthttp.StatusInternalServerError) +// } +// } +// } +//} + +func (h *HTTPServer) Serve() error { + if len(h.listeners) == 1 { + return h.server.Serve(h.listeners[0]) + } + eg := errgroup.Group{} + for _, l := range h.listeners { + ln := l + eg.Go(func() error { return h.server.Serve(ln) }) + } + return eg.Wait() +} + +func (h *HTTPServer) Stop() error { + for _, r := range h.httpRouters { + r.Destroy() + } + for _, p := range h.plugins { + p.Destroy() + } + return h.server.Shutdown() +} diff --git a/server/httprouter.go b/server/httprouter.go new file mode 100644 index 0000000..7357d7f --- /dev/null +++ b/server/httprouter.go @@ -0,0 +1,102 @@ +package server + +import ( + "fmt" + "github.com/pkg/errors" + "github.com/revolution1/jsonrpc-proxy/plugin" + "github.com/revolution1/jsonrpc-proxy/proxyctx" + "github.com/sirupsen/logrus" + "github.com/valyala/fasthttp" +) + +type HTTPRouterConfig struct { + Path string `json:"path"` + Method string `json:"method"` + Plugins []*plugin.Config `json:"plugins"` +} + +type HTTPRouter struct { + config *HTTPRouterConfig + defaultPlugins []plugin.Plugin + plugins []plugin.Plugin + + logger *logrus.Entry +} + +func NewHTTPRouter(config *HTTPRouterConfig, defaultPlugins []plugin.Plugin) *HTTPRouter { + r := &HTTPRouter{config: config, defaultPlugins: defaultPlugins} + r.init() + return r +} + +func (h *HTTPRouter) Name() string { + return fmt.Sprintf("%s(%s", h.Method(), h.Path()) +} + +func (h *HTTPRouter) Method() string { + return h.config.Method +} + +func (h *HTTPRouter) Path() string { + if h.config.Path == "" { + return fasthttp.MethodGet + } + return h.config.Path +} + +func (h *HTTPRouter) init() { + //init plugins + h.logger = logrus.WithField("router", h.Name()) + for _, pc := range h.config.Plugins { + info, err := plugin.GetInfoOfID(pc.ID) + if err != nil { + panic(err) + } + if !info.CanHandle(proxyctx.ContextHTTP) { + panic(errors.Errorf("plugin %s cannot handle context %s of this router", pc.ID, proxyctx.ContextHTTP)) + } + plug, err := plugin.NewPlugin(pc) + if err != nil { + panic(err) + } + h.plugins = append(h.plugins, plug) + } +} + +func (h *HTTPRouter) Handler(ctx *fasthttp.RequestCtx) { + handler := h.handler(plugin.DefaultTerminator) + err := handler(proxyctx.NewHTTPContext(nil, ctx)) + if err != nil { + if errors.Is(err, proxyctx.Return) { + return + } + if e, ok := err.(*proxyctx.GeneralError); ok { + e.WriteHttpCtx(ctx) + } else { + ctx.Error(err.Error(), fasthttp.StatusInternalServerError) + } + return + } +} + +func (h *HTTPRouter) handler(next plugin.HandleFunc) plugin.HandleFunc { + f := next + for i := len(h.plugins) - 1; i >= 0; i-- { + f = h.plugins[i].Handler(f) + } + for i := len(h.defaultPlugins) - 1; i >= 0; i-- { + f = h.defaultPlugins[i].Handler(f) + } + return func(ctx proxyctx.Context) (err error) { + h.logger.Tracef("start handling in router %s", h.Name()) + err = f(ctx) + h.logger.Tracef("done handling in router %s", h.Name()) + return + } +} + +func (h *HTTPRouter) Destroy() { + for _, p := range h.plugins { + p.Destroy() + } +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..6317a84 --- /dev/null +++ b/server/server.go @@ -0,0 +1,27 @@ +package server + +type ID string +type Type string + +type Config map[string]interface{} + +func (c Config) id() ID { + n, ok := c["id"] + if !ok { + panic("missing required server config field 'id'") + } + return n.(ID) +} + +func (c Config) typ() Type { + n, ok := c["id"] + if !ok { + panic("missing required server config field 'type'") + } + return n.(Type) +} + +type Server interface { + Serve() error + Stop() error +} \ No newline at end of file diff --git a/statistic/logger.go b/statistic/logger.go new file mode 100644 index 0000000..8d87707 --- /dev/null +++ b/statistic/logger.go @@ -0,0 +1,7 @@ +package statistic + +import ( + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("logger", "statistic") diff --git a/statistic/model.go b/statistic/model.go new file mode 100644 index 0000000..951bf3b --- /dev/null +++ b/statistic/model.go @@ -0,0 +1,17 @@ +package statistic + +import ( + "gorm.io/gorm" + "time" +) + +type RequestStat struct { + gorm.Model + Address string + Method string + params string + Hit bool + Useragent string + Duration time.Duration + Count uint64 +} diff --git a/statistic/stat.go b/statistic/stat.go new file mode 100644 index 0000000..f0ec22d --- /dev/null +++ b/statistic/stat.go @@ -0,0 +1,17 @@ +package statistic + +const ( + statBufSize = 64 * 1024 +) + +type stat struct { + address string + method string + params interface{} + useragent string +} + +type StatCollector struct { + ch chan *stat + stop chan struct{} +} diff --git a/types/configv1.go b/types/configv1.go new file mode 100644 index 0000000..175c7b4 --- /dev/null +++ b/types/configv1.go @@ -0,0 +1,20 @@ +package types + +type Config struct { + Version string `json:"version"` + Listeners []*ListenerConfig `json:"listeners"` + Upstreams []*UpstreamConfig `json:"upstreams"` + Servers []*RawServerConfig `json:"servers"` + Processors []*RawProcessorConfig `json:"processors"` +} + +type RawProcessorConfig struct { + ID string `json:"ID"` + Description string `json:"description"` + Context string `json:"context"` + PluginConfigs []RawPluginConfig `json:"plugins"` +} + +type RawPluginConfig map[string]interface{} + +type RawServerConfig map[string]interface{} diff --git a/types/configv1_test.go b/types/configv1_test.go new file mode 100644 index 0000000..692d5d9 --- /dev/null +++ b/types/configv1_test.go @@ -0,0 +1,154 @@ +package types + +import ( + assertion "github.com/stretchr/testify/assert" + "sigs.k8s.io/yaml" + "testing" +) + +func TestLoadConfig(t *testing.T) { + assert := assertion.New(t) + conf := &Config{} + err := yaml.Unmarshal([]byte(v1Example), conf) + assert.NoError(err) +} + +const v1Example = ` +version: 1.0 + +listeners: + - id: http + address: 0.0.0.0:4201 + - id: tcp + address: 0.0.0.0:4203 + - id: manage + address: 0.0.0.0:8088 + +upstreams: + - name: seedpub + k8s_service: + port: 4201 + namespace: default + name: xxx + - name: mainnet + endpoints: + - https://api.zilliqa.com + + +servers: + - name: zilliqa-rpc + type: http + listeners: + - http + plugins: + - id: cors + routers: + - path: / + plugins: + - id: logging + format: text # text | json + verbose: 1 # 1 2 3 + stream: stdout + - id: zilliqa-api + + - path: /ws + plugins: + - id: websocket + - id: websocket-to-simple-jsonrpc + - id: logging + format: text # text | json + verbose: 1 # 1 2 3 + stream: stdout + - id: zilliqa-api + + # https://pkg.go.dev/github.com/fasthttp/router + - path: /manage/{path:*} + plugins: + - id: basic-auth + username: admin + password: admin + - id: manage + + - name: zilliqa-tcp + type: tcp + listeners: + - tcp + KeepAlive: true + Delimiter: '\n' + plugins: + - id: tcp-to-simple-jsonrpc + - id: zilliqa-api + + +# user-defined plugin +processors: + - id: zilliqa-api + description: proxy jsonrpc request to zilliqa server and cache the responses + context: jsonrpc + plugins: + - id: jsonrpc + supportBatch: true + keepAlive: true + forward: + requestTimeout: 10s + supportBatch: true + keepAlive: true + upstreams: + - seedpub + defaultResponse: + code: 503 + content: "Service Unavaliable" + headers: null + - id: cache + maxSize: 256Mb + errFor: 1s + groups: + - methods: + # method that has no + - GetBlockchainInfo + - GetCurrentDSEpoch + - GetCurrentMiniEpoch + - GetDSBlockRate + - GetLatestDsBlock + - GetLatestTxBlock + - GetNumDSBlocks + - GetNumTransactions + - GetNumTxBlocks + - GetPrevDifficulty + - GetPrevDSDifficulty + - GetTotalCoinSupply + - GetTransactionRate + - GetTxBlockRate + - GetMinimumGasPrice + - GetNumTxnsDSEpoch + - GetNumTxnsTxEpoch + - GetPendingTxn + - GetPendingTxns + - GetRecentTransactions + # method that has param + - DSBlockListing + - TxBlockListing + - GetDsBlock + - GetTxBlock + - GetTransaction + - GetTransactionsForTxBlock + - GetTxnBodiesForTxBlock + - GetSmartContractInit + - GetSmartContracts + - GetSmartContractState + - GetSmartContractSubState + - GetBalance + for: 5s + errFor: 1s + # long term + - methods: + - GetContractAddressFromTransactionID + - GetSmartContractCode + for: 1h + errFor: 1s + # Permanent + - methods: + - GetNetworkId + for: 1h + errFor: 1s +` diff --git a/duration.go b/types/duration.go similarity index 96% rename from duration.go rename to types/duration.go index ac4340c..e12d706 100644 --- a/duration.go +++ b/types/duration.go @@ -1,4 +1,4 @@ -package main +package types import ( "encoding/json" @@ -20,7 +20,6 @@ func (d *Duration) UnmarshalJSON(b []byte) (err error) { var id int64 id, err = json.Number(b).Int64() d.Duration = time.Duration(id) - return } diff --git a/types/listener.go b/types/listener.go new file mode 100644 index 0000000..310f3cd --- /dev/null +++ b/types/listener.go @@ -0,0 +1,31 @@ +package types + +import ( + "fmt" + "net" + "strings" +) + +type ListenerID string + +type ListenerConfig struct { + ID ListenerID `json:"id"` + Address string `json:"address"` + Scheme string `json:"scheme"` + TLS *struct { + Enabled bool `json:"enabled"` + Cert string `json:"cert"` + Key string `json:"key"` + CommonName string `json:"commonName"` + } `json:"tls"` +} + +func (c ListenerConfig) Listen() (net.Listener, error) { + var network = strings.ToLower(c.Scheme) + switch network { + case "tcp", "unix": + return net.Listen(c.Scheme, c.Address) + default: + panic(fmt.Sprintf("listener scheme %s not supported", network)) + } +} diff --git a/types/size.go b/types/size.go new file mode 100644 index 0000000..cb12191 --- /dev/null +++ b/types/size.go @@ -0,0 +1,26 @@ +package types + +import ( + "encoding/json" + "fmt" + "github.com/dustin/go-humanize" +) + +type Size uint64 + +func (s *Size) UnmarshalJSON(b []byte) (err error) { + if b[0] == '"' { + var nBytes uint64 + sd := string(b[1 : len(b)-1]) + nBytes, err = humanize.ParseBytes(sd) + *s = Size(int(nBytes)) + return + } + nBytes, err := json.Number(b).Int64() + *s = Size(nBytes) + return +} + +func (s *Size) MarshalJSON() (b []byte, err error) { + return []byte(fmt.Sprintf(`"%s"`, humanize.IBytes(uint64(*s)))), nil +} diff --git a/types/size_test.go b/types/size_test.go new file mode 100644 index 0000000..914c952 --- /dev/null +++ b/types/size_test.go @@ -0,0 +1,32 @@ +package types + +import ( + "encoding/json" + assertion "github.com/stretchr/testify/assert" + "testing" +) + +func TestSize(t *testing.T) { + assert := assertion.New(t) + cases := []struct { + data string + bytes int + }{ + {`"1"`, 1}, + {`1`, 1}, + {`"1b"`, 1}, + {`"1kb"`, 1000}, + {`"1kib"`, 1024}, + } + + for _, c := range cases{ + var s Size + err := json.Unmarshal([]byte(c.data), &s) + assert.NoError(err) + assert.Equal(Size(c.bytes), s) + } + s := Size(1024) + data, err := json.Marshal(&s) + assert.NoError(err) + assert.Equal(`"1.0 KiB"`, string(data)) +} diff --git a/types/upstream.go b/types/upstream.go new file mode 100644 index 0000000..99aacc1 --- /dev/null +++ b/types/upstream.go @@ -0,0 +1,49 @@ +package types + +import ( + "net/url" +) + +type UpstreamID string +type UpstreamType int +type Endpoint url.URL + +const ( + TCPUpstream UpstreamType = iota + HTTPUpstream +) + +type UpstreamConfig struct { + ID UpstreamID `json:"id"` + Type UpstreamType `json:"type"` + Endpoints []*Endpoint `json:"endpoints"` + ServiceDiscovery *ServiceDiscovery `json:"serviceDiscovery"` +} + +type K8sServiceDiscover struct { + Namespace string `json:"namespace"` + Name string `json:"name"` + Port int `json:"port"` +} + +type DNSDiscover struct { + Name string `json:"name"` + EnableIPv6 bool `json:"enableIPv6"` +} + +type ServiceDiscovery struct { + K8sService *K8sServiceDiscover `json:"k8sService"` + DNSDiscover *DNSDiscover `json:"dnsRecord"` +} + +//type HealthCheck struct { +// Interval Duration +// Timeout Duration +// HTTPGet struct { +// Port int +// Path string +// Codes []int +// } +// TCPSocket struct { +// } +//} diff --git a/types/upstreamtypes.go b/types/upstreamtypes.go new file mode 100644 index 0000000..2a1cb05 --- /dev/null +++ b/types/upstreamtypes.go @@ -0,0 +1,63 @@ +package types + +import ( + "fmt" + jsoniter "github.com/json-iterator/go" + "github.com/pkg/errors" + "net/url" + "strings" +) + +func (u *UpstreamType) UnmarshalJSON(bytes []byte) error { + var s string + if err := jsoniter.Unmarshal(bytes, &s); err != nil { + return err + } + switch strings.ToLower(s) { + case "http": + *u = HTTPUpstream + case "tcp": + *u = TCPUpstream + default: + return errors.Errorf("unknown upstreamType %v", s) + } + return nil +} + +func (u UpstreamType) MarshalJSON() ([]byte, error) { + switch u { + case HTTPUpstream: + return []byte(`"HTTP"`), nil + case TCPUpstream: + return []byte(`"TCP"`), nil + default: + return nil, errors.New("unknown UpstreamType") + } +} + +func (e *Endpoint) Parse(u string) error { + if u, err := url.Parse(u); err != nil { + return err + } else { + *e = Endpoint(*u) + } + return nil +} + +func (e *Endpoint) UnmarshalJSON(bytes []byte) error { + var s string + if err := jsoniter.Unmarshal(bytes, &s); err != nil { + return err + } + if u, err := url.Parse(s); err != nil { + return err + } else { + *e = Endpoint(*u) + } + return nil +} + +func (e *Endpoint) MarshalJSON() ([]byte, error) { + u := url.URL(*e) + return []byte(fmt.Sprintf(`"%s"`, u.String())), nil +} diff --git a/types/upstreamtypes_test.go b/types/upstreamtypes_test.go new file mode 100644 index 0000000..d09177b --- /dev/null +++ b/types/upstreamtypes_test.go @@ -0,0 +1,39 @@ +package types + +import ( + "encoding/json" + assertion "github.com/stretchr/testify/assert" + "testing" +) + +func TestUpstreamType(t *testing.T) { + assert := assertion.New(t) + u := UpstreamType(0) + assert.NoError(json.Unmarshal([]byte(`"http"`), &u)) + assert.Equal(HTTPUpstream, u) + + assert.Error(json.Unmarshal([]byte(`1`), &u)) + assert.Error(json.Unmarshal([]byte(`"xxx"`), &u)) + + u = TCPUpstream + d, err := json.Marshal(u) + assert.NoError(err) + assert.Equal(`"TCP"`, string(d)) +} + +func TestEndpoint(t *testing.T) { + assert := assertion.New(t) + data := `"https://admin@api.zilliqa.com"` + e := new(Endpoint) + assert.NoError(json.Unmarshal([]byte(data), e)) + assert.Equal("https", e.Scheme) + assert.Equal("api.zilliqa.com", e.Host) + assert.Equal("admin", e.User.Username()) + m, err := json.Marshal(e) + assert.NoError(err) + assert.Equal(data, string(m)) + assert.NoError(e.Parse("http://dev/")) + assert.Equal("http", e.Scheme) + assert.Equal("dev", e.Host) + assert.Equal("/", e.Path) +} diff --git a/upstream/logger.go b/upstream/logger.go new file mode 100644 index 0000000..045a6ca --- /dev/null +++ b/upstream/logger.go @@ -0,0 +1,7 @@ +package upstream + +import ( + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("logger", "upstream") diff --git a/upstream.go b/upstream/upstream.go similarity index 83% rename from upstream.go rename to upstream/upstream.go index 173dbae..e69ed62 100644 --- a/upstream.go +++ b/upstream/upstream.go @@ -1,4 +1,4 @@ -package main +package upstream import ( "github.com/savsgio/gotils/nocopy" @@ -35,7 +35,7 @@ const ( // It is forbidden copying UpstreamManager instances. Create new instances instead. // // It is safe calling UpstreamManager methods from concurrently running goroutines. -type UpstreamManager struct { +type Manager struct { noCopy nocopy.NoCopy //nolint:unused,structcheck // HealthCheck is a callback called after each request. // @@ -66,8 +66,8 @@ type UpstreamManager struct { once sync.Once } -func NewUpstreamManager(upstreams []string) *UpstreamManager { - um := &UpstreamManager{MaxAttempts: DefaultMaxAttempts} +func NewUpstreamManager(upstreams []string) *Manager { + um := &Manager{MaxAttempts: DefaultMaxAttempts} if len(upstreams) == 0 { panic("upstreams of UpstreamManager cannot be empty") } @@ -93,17 +93,17 @@ func defaultHealthChecker(req *fasthttp.Request, resp *fasthttp.Response, err er const DefaultLBClientTimeout = time.Second // DoDeadline calls DoDeadline on the least loaded client -func (um *UpstreamManager) DoDeadline(req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error { +func (um *Manager) DoDeadline(req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error { return retry(req, resp, deadline, um.get().DoDeadline, um.maxAttempts) } // DoTimeout calculates deadline and calls DoDeadline on the least loaded client -func (um *UpstreamManager) DoTimeout(req *fasthttp.Request, resp *fasthttp.Response, timeout time.Duration) error { +func (um *Manager) DoTimeout(req *fasthttp.Request, resp *fasthttp.Response, timeout time.Duration) error { deadline := time.Now().Add(timeout) return retry(req, resp, deadline, um.get().DoDeadline, um.maxAttempts) } -func (um *UpstreamManager) setMaxAttempts() { +func (um *Manager) setMaxAttempts() { if um.MaxAttempts > len(um.upstreams) { um.maxAttempts = len(um.upstreams) } @@ -129,7 +129,7 @@ func retry(req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time, f // Do calls calculates deadline using UpstreamManager.Timeout and calls DoDeadline // on the least loaded client. -func (um *UpstreamManager) Do(req *fasthttp.Request, resp *fasthttp.Response) error { +func (um *Manager) Do(req *fasthttp.Request, resp *fasthttp.Response) error { timeout := um.Timeout if timeout <= 0 { timeout = DefaultLBClientTimeout @@ -137,7 +137,7 @@ func (um *UpstreamManager) Do(req *fasthttp.Request, resp *fasthttp.Response) er return um.DoTimeout(req, resp, timeout) } -func (um *UpstreamManager) get() *upstream { +func (um *Manager) get() *upstream { cs := um.upstreams minC := cs[0] minN := minC.PendingRequests() @@ -180,29 +180,6 @@ func newUpstream(hc HealthChecker, host string) *upstream { if (u.Scheme == "http" && u.Port() == "80") || (u.Scheme == "https" && u.Port() == "443") { host = u.Hostname() } - //isTLS := u.Scheme == "https" - //c := &fasthttp.Client{ - // Name: host, - // MaxConnsPerHost: 0, - // MaxIdleConnDuration: DefaultMaxConnectionLife, - // MaxIdemponentCallAttempts: DefaultMaxAttempts, - // ReadTimeout: DefaultReadTimeout, - // WriteTimeout: 0, - // MaxConnWaitTimeout: DefaultConnTimeout, - // RetryIf: nil, - //} - //c := &fasthttp.HostClient{ - // Addr: u.Host, - // Name: host, - // IsTLS: isTLS, - // TLSConfig: nil, - // ReadTimeout: DefaultReadTimeout, - // WriteTimeout: 0, - // MaxConnWaitTimeout: DefaultConnTimeout, - // RetryIf: func(req *fasthttp.Request) bool { - // return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut() - // }, - //} return &upstream{ c: defaultFastStdHttpClient, healthCheck: hc, diff --git a/upstream_test.go b/upstream/upstream_test.go similarity index 97% rename from upstream_test.go rename to upstream/upstream_test.go index 1ea7d7e..0bafd46 100644 --- a/upstream_test.go +++ b/upstream/upstream_test.go @@ -1,4 +1,4 @@ -package main +package upstream import ( assertion "github.com/stretchr/testify/assert" diff --git a/wrappeddHttpClient.go b/upstream/wrappeddHttpClient.go similarity index 91% rename from wrappeddHttpClient.go rename to upstream/wrappeddHttpClient.go index a22082a..55f7676 100644 --- a/wrappeddHttpClient.go +++ b/upstream/wrappeddHttpClient.go @@ -1,4 +1,4 @@ -package main +package upstream import ( "bytes" @@ -7,7 +7,8 @@ import ( "encoding/json" "github.com/certifi/gocertifi" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "github.com/revolution1/jsonrpc-proxy/utils" + "github.com/sirupsen/logrus" "github.com/valyala/fasthttp" "io/ioutil" "net/http" @@ -35,7 +36,7 @@ func (f *FastStdHttpClient) DoDeadline(fastReq *fasthttp.Request, fastResp *fast fastReq.Header.VisitAll(func(key, value []byte) { req.Header.Set(string(key), string(value)) }) - if log.IsLevelEnabled(log.TraceLevel) { + if logrus.IsLevelEnabled(logrus.TraceLevel) { log.Tracef("requesting to upstream: %s\n%s\n", req.RequestURI, fastReq.String()) } resp, err := f.Do(req) @@ -60,7 +61,7 @@ func (f *FastStdHttpClient) DoDeadline(fastReq *fasthttp.Request, fastResp *fast if err != nil { log.Debug(string(body)) } - if len(encodings) == 0 && !isASCII(body) { + if len(encodings) == 0 && !utils.IsASCII(body) { log.Info(encodings) } fastResp.SetBodyRaw(body) diff --git a/utils/logger.go b/utils/logger.go new file mode 100644 index 0000000..1f6d916 --- /dev/null +++ b/utils/logger.go @@ -0,0 +1,7 @@ +package utils + +import ( + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("logger", "utils") diff --git a/utils.go b/utils/utils.go similarity index 50% rename from utils.go rename to utils/utils.go index 54fe165..abb1be8 100644 --- a/utils.go +++ b/utils/utils.go @@ -1,31 +1,19 @@ -package main +package utils import ( - log "github.com/sirupsen/logrus" + jsoniter "github.com/json-iterator/go" "net/url" "strings" "syscall" "unicode" ) -type fnv64a struct{} - -const ( - // offset64 FNVa offset basis. See https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function#FNV-1a_hash - offset64 = 14695981039346656037 - // prime64 FNVa prime value. See https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function#FNV-1a_hash - prime64 = 1099511628211 -) - -// Sum64 gets the string and returns its uint64 hash value. -func (f fnv64a) Sum64(key string) uint64 { - var hash uint64 = offset64 - for i := 0; i < len(key); i++ { - hash ^= uint64(key[i]) - hash *= prime64 +func Blob(char byte, len int) []byte { + b := make([]byte, len) + for index := range b { + b[index] = char } - - return hash + return b } func GetHostFromUrl(u string) string { @@ -50,7 +38,7 @@ func CheckFdLimit() { } } -func isASCII(s []byte) bool { +func IsASCII(s []byte) bool { for i := 0; i < len(s); i++ { if s[i] > unicode.MaxASCII { return false @@ -58,3 +46,32 @@ func isASCII(s []byte) bool { } return true } + +func StrSliceContains(slice []string, item string) bool { + for _, i := range slice { + if i == item { + return true + } + } + return false +} + +func StrSliceContainsI(slice []string, item string) bool { + for _, i := range slice { + if strings.EqualFold(i, item) { + return true + } + } + return false +} + +func CastToStruct(from interface{}, to interface{}) (err error) { + var raw []byte + if raw, err = jsoniter.Marshal(from); err != nil { + return + } + if err = jsoniter.Unmarshal(raw, to); err != nil { + return + } + return nil +} diff --git a/utils/utils_test.go b/utils/utils_test.go new file mode 100644 index 0000000..754ccd3 --- /dev/null +++ b/utils/utils_test.go @@ -0,0 +1,28 @@ +package utils + +import ( + assertion "github.com/stretchr/testify/assert" + "testing" +) + +func TestIsAscii(t *testing.T) { + assert := assertion.New(t) + a := []byte("hello") + b := []byte("哈喽") + assert.True(IsASCII(a)) + assert.False(IsASCII(b)) +} + +func TestCastToStruct(t *testing.T) { + assert := assertion.New(t) + s := struct { + A string `json:"a"` + B string `json:"b"` + }{} + m := map[string]string{ + "A": "a", + "B": "b", + } + assert.NoError(CastToStruct(m, &s)) + assert.Equal("a", s.A) +} diff --git a/utils_test.go b/utils_test.go deleted file mode 100644 index 47fb967..0000000 --- a/utils_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package main - -import ( - assertion "github.com/stretchr/testify/assert" - "testing" -) - -func TestIsAscii(t *testing.T) { - assert := assertion.New(t) - a := []byte("hello") - b := []byte("哈喽") - assert.True(isASCII(a)) - assert.False(isASCII(b)) -} diff --git a/version.go b/version.go index b7104b0..58a178d 100644 --- a/version.go +++ b/version.go @@ -3,6 +3,7 @@ package main import ( "fmt" "github.com/prometheus/client_golang/prometheus" + "github.com/revolution1/jsonrpc-proxy/metrics" ) var version = "0.0.3" @@ -20,8 +21,8 @@ func printVersion() string { } func init() { - versionGuage := prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: MetricsNs, + versionGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, Name: "version", Help: "version info of jsonrpc proxy", ConstLabels: prometheus.Labels{ @@ -32,6 +33,6 @@ func init() { "date": date, }, }) - prometheus.MustRegister(versionGuage) - versionGuage.Set(1) + prometheus.MustRegister(versionGauge) + versionGauge.Set(1) }