Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Usage of ./cli:
-producer
if true, produce messages, otherwise consume
-pulsar string
pulsar address (default "localhost:6650")
pulsar address. May start with pulsar:// or pulsar+ssl:// (default "localhost:6650")
-rate duration
rate at which to send messages (default 1s)
-shared
Expand Down
72 changes: 37 additions & 35 deletions cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"syscall"
"time"

"github.com/wolfstudy/pulsar-client-go/core/auth"
"github.com/wolfstudy/pulsar-client-go/core/manage"
"github.com/wolfstudy/pulsar-client-go/core/msg"
)
Expand Down Expand Up @@ -62,7 +63,7 @@ var args = struct {
}

func main() {
flag.StringVar(&args.pulsar, "pulsar", args.pulsar, "pulsar address")
flag.StringVar(&args.pulsar, "pulsar", args.pulsar, "pulsar address. May start with pulsar:// or pulsar+ssl://")
flag.StringVar(&args.tlsCert, "tls-cert", args.tlsCert, "(optional) path to TLS certificate")
flag.StringVar(&args.tlsKey, "tls-key", args.tlsKey, "(optional) path to TLS key")
flag.StringVar(&args.tlsCA, "tls-ca", args.tlsKey, "(optional) path to root certificate")
Expand Down Expand Up @@ -91,33 +92,18 @@ func main() {
cancel()
}()

var tlsCfg *tls.Config
if args.tlsCert != "" && args.tlsKey != "" {
tlsCfg = &tls.Config{
InsecureSkipVerify: args.tlsSkipVerify,
}
var err error
cert, err := tls.LoadX509KeyPair(args.tlsCert, args.tlsKey)
if err != nil {
fmt.Fprintln(os.Stderr, "error loading certificates:", err)
os.Exit(1)
}
tlsCfg.Certificates = []tls.Certificate{cert}
var authentication auth.Authentication
tlsCfg := &tls.Config{
InsecureSkipVerify: args.tlsSkipVerify,
}

if args.tlsCA != "" {
rootCA, err := ioutil.ReadFile(args.tlsCA)
if err != nil {
fmt.Fprintln(os.Stderr, "error loading certificate authority:", err)
os.Exit(1)
}
tlsCfg.RootCAs = x509.NewCertPool()
tlsCfg.RootCAs.AppendCertsFromPEM(rootCA)
}
if args.tlsCert != "" && args.tlsKey != "" {
authentication = auth.NewAuthenticationTLS(args.tlsCert, args.tlsKey)

// Inspect certificate and print the CommonName attribute,
// since this may be used for authorization
if len(cert.Certificate[0]) > 0 {
x509Cert, err := x509.ParseCertificate(cert.Certificate[0])
if certs := authentication.GetAuthData().GetTlsCertificates(); len(certs) > 0 && len(certs[0].Certificate) > 0 && len(certs[0].Certificate[0]) > 0 {
x509Cert, err := x509.ParseCertificate(certs[0].Certificate[0])
if err != nil {
fmt.Fprintln(os.Stderr, "error loading public certificate:", err)
os.Exit(1)
Expand All @@ -126,21 +112,35 @@ func main() {
}
}

if args.tlsCA != "" {
rootCA, err := ioutil.ReadFile(args.tlsCA)
if err != nil {
fmt.Fprintln(os.Stderr, "error loading certificate authority:", err)
os.Exit(1)
}
tlsCfg.RootCAs = x509.NewCertPool()
tlsCfg.RootCAs.AppendCertsFromPEM(rootCA)
}

mcp := manage.NewClientPool()

switch args.producer {
case true:
// Create the managed producer
mpCfg := manage.ProducerConfig{
mpCfg := manage.ManagedProducerConfig{
Name: args.name,
Topic: args.topic,
NewProducerTimeout: time.Second,
InitialReconnectDelay: time.Second,
MaxReconnectDelay: time.Minute,
ClientConfig: manage.ClientConfig{
Addr: args.pulsar,
TLSConfig: tlsCfg,
Errs: asyncErrs,
ManagedClientConfig: manage.ManagedClientConfig{
ClientConfig: manage.ClientConfig{
Addr: args.pulsar,
UseTLS: args.tlsCert != "" && args.tlsKey != "",
TLSConfig: tlsCfg,
Authentication: authentication,
Errs: asyncErrs,
},
},
}
mp := manage.NewManagedProducer(mcp, mpCfg)
Expand Down Expand Up @@ -183,7 +183,7 @@ func main() {
return
}
sctx, cancel := context.WithTimeout(ctx, time.Second)
_, err := mp.Send(sctx, payload,"")
_, err := mp.Send(sctx, payload, "")
cancel()
if err != nil {
fmt.Fprintln(os.Stderr, err)
Expand All @@ -199,16 +199,18 @@ func main() {
queue := make(chan msg.Message, 8)

// Create managed consumer
mcCfg := manage.ConsumerConfig{
mcCfg := manage.ManagedConsumerConfig{
Name: args.name,
Topic: args.topic,
NewConsumerTimeout: time.Second,
InitialReconnectDelay: time.Second,
MaxReconnectDelay: time.Minute,
ClientConfig: manage.ClientConfig{
Addr: args.pulsar,
TLSConfig: tlsCfg,
Errs: asyncErrs,
ManagedClientConfig: manage.ManagedClientConfig{
ClientConfig: manage.ClientConfig{
Addr: args.pulsar,
TLSConfig: tlsCfg,
Errs: asyncErrs,
},
},
}

Expand Down
48 changes: 48 additions & 0 deletions core/auth/basic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package auth

import (
"encoding/base64"
"net/http"
)

func NewAuthenticationBasic(userId, password string) Authentication {
return authenticationBasic{userId: userId, password: password}
}

type authenticationBasic struct {
userId, password string
}

func (a authenticationBasic) GetAuthMethodName() string {
return "basic"
}
func (a authenticationBasic) GetAuthData() AuthenticationDataProvider {
commandAuthToken := []byte(a.userId + ":" + a.password)
httpAuthToken := "Basic " + base64.StdEncoding.EncodeToString(commandAuthToken)
return authenticationDataBasic{
httpAuthToken: httpAuthToken,
commandAuthToken: commandAuthToken,
}
}

type authenticationDataBasic struct {
authenticationDataNull
httpAuthToken string
commandAuthToken []byte
}

func (adBasic authenticationDataBasic) HasDataForHttp() bool {
return true
}
func (adBasic authenticationDataBasic) GetHttpHeaders() http.Header {
return http.Header{
"Authorization": []string{adBasic.httpAuthToken},
}
}

func (adBasic authenticationDataBasic) HasDataFromCommand() bool {
return true
}
func (adBasic authenticationDataBasic) GetCommandData() []byte {
return adBasic.commandAuthToken
}
45 changes: 45 additions & 0 deletions core/auth/disabled.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package auth

import (
"crypto/tls"
"net/http"
)

func NewAuthenticationDisabled() Authentication {
return authenticationDisabled{}
}

type authenticationDisabled struct{}

func (a authenticationDisabled) GetAuthMethodName() string {
return ""
}
func (a authenticationDisabled) GetAuthData() AuthenticationDataProvider {
return authenticationDataNull{}
}

type authenticationDataNull struct{}

func (adNull authenticationDataNull) HasDataForTls() bool {
return false
}
func (adNull authenticationDataNull) GetTlsCertificates() []tls.Certificate {
return nil
}

func (adNull authenticationDataNull) HasDataForHttp() bool {
return false
}
func (adNull authenticationDataNull) GetHttpAuthType() string {
return ""
}
func (adNull authenticationDataNull) GetHttpHeaders() http.Header {
return nil
}

func (adNull authenticationDataNull) HasDataFromCommand() bool {
return false
}
func (adNull authenticationDataNull) GetCommandData() []byte {
return nil
}
26 changes: 26 additions & 0 deletions core/auth/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package auth

import (
"crypto/tls"
"net/http"
)

type (
Authentication interface {
GetAuthMethodName() string
GetAuthData() AuthenticationDataProvider
}

AuthenticationDataProvider interface {
HasDataForTls() bool
GetTlsCertificates() []tls.Certificate
// GetTslPrivateKey is redundant due to Go TLS implementation

HasDataForHttp() bool
GetHttpAuthType() string
GetHttpHeaders() http.Header

HasDataFromCommand() bool
GetCommandData() []byte
}
)
34 changes: 34 additions & 0 deletions core/auth/tls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package auth

import "crypto/tls"

func NewAuthenticationTLS(certFile, keyFile string) Authentication {
return authenticationTls{certFile: certFile, keyFile: keyFile}
}

type authenticationTls struct {
certFile, keyFile string
}

func (a authenticationTls) GetAuthMethodName() string {
return "tls"
}
func (a authenticationTls) GetAuthData() AuthenticationDataProvider {
if certificate, err := tls.LoadX509KeyPair(a.certFile, a.keyFile); err == nil {
return authenticationDataTls{certificates: []tls.Certificate{certificate}}
} else {
panic(err)
}
}

type authenticationDataTls struct {
authenticationDataNull
certificates []tls.Certificate
}

func (adTls authenticationDataTls) HasDataForTls() bool {
return true
}
func (adTls authenticationDataTls) GetTlsCertificates() []tls.Certificate {
return adTls.certificates
}
61 changes: 61 additions & 0 deletions core/auth/token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package auth

import (
"io/ioutil"
"net/http"
"strings"
)

type AuthenticationTokenSupplier func() []byte

func NewAuthenticationTokenFromSupplier(tokenSupplier AuthenticationTokenSupplier) Authentication {
return authenticationToken{supplier: tokenSupplier}
}
func NewAuthenticationTokenFromString(token string) Authentication {
token = strings.TrimPrefix(token, "token:")
return authenticationToken{supplier: func() []byte {
return []byte(token)
}}
}
func NewAuthenticationTokenFromFile(fileName string) Authentication {
fileName = strings.TrimPrefix(fileName, "file:")
return authenticationToken{supplier: func() []byte {
if content, err := ioutil.ReadFile(fileName); err == nil {
return content
} else {
panic(err)
}
}}
}

type authenticationToken struct {
supplier AuthenticationTokenSupplier
}

func (a authenticationToken) GetAuthMethodName() string {
return "token"
}
func (a authenticationToken) GetAuthData() AuthenticationDataProvider {
return authenticationDataToken{supplier: a.supplier}
}

type authenticationDataToken struct {
authenticationDataNull
supplier AuthenticationTokenSupplier
}

func (adToken authenticationDataToken) HasDataForHttp() bool {
return true
}
func (adToken authenticationDataToken) GetHttpHeaders() http.Header {
return http.Header{
"Authorization": []string{"Bearer " + string(adToken.supplier())},
}
}

func (adToken authenticationDataToken) HasDataFromCommand() bool {
return true
}
func (adToken authenticationDataToken) GetCommandData() []byte {
return adToken.supplier()
}
Loading