Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions doc/server_plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ The response can look like any of the following:
}
```

* Allow operation and modify content
* Allow operation and modify content:

```
{
"unchange": "false",
"reject": false,
"unchange": false,
"content": {
... // Replaced content
}
Expand All @@ -70,7 +71,7 @@ The response can look like any of the following:

### Operation

Currently `Login`, `NewProxy`, `CloseProxy`, `Ping`, `NewWorkConn` and `NewUserConn` operations are supported.
Currently `Login`, `NewProxy`, `CloseProxy`, `Ping`, `NewWorkConn`, `NewUserConn` and `NewVisitorConn` operations are supported.

#### Login

Expand Down Expand Up @@ -214,6 +215,23 @@ New user connection received from proxy (support `tcp`, `stcp`, `https` and `tcp
}
```

#### NewVisitorConn

New visitor connection received for visitor-based proxies (`stcp`, `sudp`).

```
{ "content": {
"user": { "user": <string>
"metas": map<string>string
"run_id": <string>
},
"proxy_name": <string>,
"proxy_type": <string>,
"remote_addr": <string>
}
}
```

### Server Plugin Configuration

```toml
Expand All @@ -233,10 +251,10 @@ path = "/handler"
ops = ["NewProxy"]
```

- addr: the address where the external RPC service listens. Defaults to http. For https, specify the schema: `addr = "https://127.0.0.1:9001"`.
- path: http request url path for the POST request.
- ops: operations plugin needs to handle (e.g. "Login", "NewProxy", ...).
- tlsVerify: When the schema is https, we verify by default. Set this value to false if you want to skip verification.
* addr: the address where the external RPC service listens. Defaults to http. For https, specify the schema: `addr = "https://127.0.0.1:9001"`.
* path: http request url path for the POST request.
* ops: operations plugin needs to handle (e.g. "Login", "NewProxy", ...).
* tlsVerify: When the schema is https, we verify by default. Set this value to false if you want to skip verification.

### Metadata

Expand Down
63 changes: 51 additions & 12 deletions pkg/plugin/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,24 @@ import (
)

type Manager struct {
loginPlugins []Plugin
newProxyPlugins []Plugin
closeProxyPlugins []Plugin
pingPlugins []Plugin
newWorkConnPlugins []Plugin
newUserConnPlugins []Plugin
loginPlugins []Plugin
newProxyPlugins []Plugin
closeProxyPlugins []Plugin
pingPlugins []Plugin
newWorkConnPlugins []Plugin
newUserConnPlugins []Plugin
newVisitorConnPlugins []Plugin
}

func NewManager() *Manager {
return &Manager{
loginPlugins: make([]Plugin, 0),
newProxyPlugins: make([]Plugin, 0),
closeProxyPlugins: make([]Plugin, 0),
pingPlugins: make([]Plugin, 0),
newWorkConnPlugins: make([]Plugin, 0),
newUserConnPlugins: make([]Plugin, 0),
loginPlugins: make([]Plugin, 0),
newProxyPlugins: make([]Plugin, 0),
closeProxyPlugins: make([]Plugin, 0),
pingPlugins: make([]Plugin, 0),
newWorkConnPlugins: make([]Plugin, 0),
newUserConnPlugins: make([]Plugin, 0),
newVisitorConnPlugins: make([]Plugin, 0),
}
}

Expand All @@ -63,6 +65,9 @@ func (m *Manager) Register(p Plugin) {
if p.IsSupport(OpNewUserConn) {
m.newUserConnPlugins = append(m.newUserConnPlugins, p)
}
if p.IsSupport(OpNewVisitorConn) {
m.newVisitorConnPlugins = append(m.newVisitorConnPlugins, p)
}
}

func (m *Manager) Login(content *LoginContent) (*LoginContent, error) {
Expand Down Expand Up @@ -259,3 +264,37 @@ func (m *Manager) NewUserConn(content *NewUserConnContent) (*NewUserConnContent,
}
return content, nil
}

func (m *Manager) NewVisitorConn(content *NewVisitorConnContent) (*NewVisitorConnContent, error) {
if len(m.newVisitorConnPlugins) == 0 {
return content, nil
}

var (
res = &Response{
Reject: false,
Unchange: true,
}
retContent any
err error
)
reqid, _ := util.RandID()
xl := xlog.New().AppendPrefix("reqid: " + reqid)
ctx := xlog.NewContext(context.Background(), xl)
ctx = NewReqidContext(ctx, reqid)

for _, p := range m.newVisitorConnPlugins {
res, retContent, err = p.Handle(ctx, OpNewVisitorConn, *content)
if err != nil {
xl.Infof("send NewVisitorConn request to plugin [%s] error: %v", p.Name(), err)
return nil, errors.New("send NewVisitorConn request to plugin error")
}
if res.Reject {
return nil, fmt.Errorf("%s", res.RejectReason)
}
if !res.Unchange {
content = retContent.(*NewVisitorConnContent)
}
}
return content, nil
}
13 changes: 7 additions & 6 deletions pkg/plugin/server/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
const (
APIVersion = "0.1.0"

OpLogin = "Login"
OpNewProxy = "NewProxy"
OpCloseProxy = "CloseProxy"
OpPing = "Ping"
OpNewWorkConn = "NewWorkConn"
OpNewUserConn = "NewUserConn"
OpLogin = "Login"
OpNewProxy = "NewProxy"
OpCloseProxy = "CloseProxy"
OpPing = "Ping"
OpNewWorkConn = "NewWorkConn"
OpNewUserConn = "NewUserConn"
OpNewVisitorConn = "NewVisitorConn"
)

type Plugin interface {
Expand Down
7 changes: 7 additions & 0 deletions pkg/plugin/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,10 @@ type NewUserConnContent struct {
ProxyType string `json:"proxy_type"`
RemoteAddr string `json:"remote_addr"`
}

type NewVisitorConnContent struct {
User UserInfo `json:"user"`
ProxyName string `json:"proxy_name"`
ProxyType string `json:"proxy_type"`
RemoteAddr string `json:"remote_addr"`
}
21 changes: 20 additions & 1 deletion server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn)
}

func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVisitorConn) error {
xl := netpkg.NewLogFromConn(visitorConn)
visitorUser := ""
// TODO(deprecation): Compatible with old versions, can be without runID, user is empty. In later versions, it will be mandatory to include runID.
// If runID is required, it is not compatible with versions prior to v0.50.0.
Expand All @@ -694,6 +695,24 @@ func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVis
}
visitorUser = ctl.sessionCtx.LoginMsg.User
}
return svr.rc.VisitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey,

// SERVER PLUGIN HOOK - NEW VISITOR CONNECTION
content := &plugin.NewVisitorConnContent{
User: plugin.UserInfo{
User: visitorUser,
RunID: newMsg.RunID,
},
ProxyName: newMsg.ProxyName,
ProxyType: "stcp", // Default type, could be determined from proxy config
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: ProxyType is hard-coded to "stcp", so sudp visitor connections will be reported to plugins as the wrong protocol. Any plugin that branches on proxy_type will mis-handle or reject every sudp connection even though this hook is documented for both visitor-based proxy types.

RemoteAddr: visitorConn.RemoteAddr().String(),
}

retContent, err := svr.pluginManager.NewVisitorConn(content)
if err != nil {
xl.Warnf("visitor connection [%s] rejected by plugin: %v", newMsg.ProxyName, err)
return err
}

return svr.rc.VisitorManager.NewConn(retContent.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey,
newMsg.UseEncryption, newMsg.UseCompression, visitorUser)
}
97 changes: 97 additions & 0 deletions test/e2e/v1/plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,103 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
})
})

ginkgo.Describe("NewVisitorConn", func() {
newFunc := func() *plugin.Request {
var r plugin.Request
r.Content = &plugin.NewVisitorConnContent{}
return &r
}

ginkgo.It("Accept visitor connection", func() {
localPort := f.AllocPort()

var recordedProxyName string
handler := func(req *plugin.Request) *plugin.Response {
var ret plugin.Response
content := req.Content.(*plugin.NewVisitorConnContent)
recordedProxyName = content.ProxyName
ret.Unchange = true
return &ret
}
pluginServer := pluginpkg.NewHTTPPluginServer(localPort, newFunc, handler, nil)

f.RunServer("", pluginServer)

serverConf := consts.DefaultServerConfig + fmt.Sprintf(`
[[httpPlugins]]
name = "test"
addr = "127.0.0.1:%d"
path = "/handler"
ops = ["NewVisitorConn"]
`, localPort)

clientConf := consts.DefaultClientConfig
clientConf += `
[[proxies]]
name = "stcp"
type = "stcp"
secretKey = "abcdefg"
localIP = "127.0.0.1"
localPort = 22
`

f.RunProcesses(serverConf, []string{clientConf})

framework.ExpectEqual(recordedProxyName, "stcp")
})

ginkgo.It("Reject visitor connection", func() {
localPort := f.AllocPort()

handler := func(req *plugin.Request) *plugin.Response {
var ret plugin.Response
ret.Reject = true
ret.RejectReason = "visitor rejected by plugin"
return &ret
}
pluginServer := pluginpkg.NewHTTPPluginServer(localPort, newFunc, handler, nil)

f.RunServer("", pluginServer)

serverConf := consts.DefaultServerConfig + fmt.Sprintf(`
[[httpPlugins]]
name = "test"
addr = "127.0.0.1:%d"
path = "/handler"
ops = ["NewVisitorConn"]
`, localPort)

clientConf := consts.DefaultClientConfig
clientConf += `
[[proxies]]
name = "stcp"
type = "stcp"
secretKey = "abcdefg"
localIP = "127.0.0.1"
localPort = 22
`

visitorConf := consts.DefaultClientConfig
visitorConf += `
[[visitors]]
name = "stcp_visitor"
type = "stcp"
serverName = "stcp"
secretKey = "abcdefg"
bindAddr = "127.0.0.1"
bindPort = 9000
`

f.RunProcesses(serverConf, []string{clientConf, visitorConf})

// Give the visitor connection time to attempt
time.Sleep(500 * time.Millisecond)

// Connection should fail due to plugin rejection
framework.NewRequestExpect(f).Port(9000).ExpectError(true).Ensure()
})
})

ginkgo.Describe("HTTPS Protocol", func() {
newFunc := func() *plugin.Request {
var r plugin.Request
Expand Down