Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ issues:
- path: _test\.go
linters:
- funlen
- gosec
- lll
- wrapcheck
- path: tests/.*\.go
Expand Down
38 changes: 28 additions & 10 deletions pkg/opensearch/opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,29 @@ package opensearch

import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"

"github.com/golang-migrate/migrate/v4/database"
"github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"go.uber.org/atomic"
)

const (
nullVersion = -1
versionIndexName = ".migrations"
errTemplateUnsupportedOperation = "unsupported operation '%s'"
driverName = "opensearch"
nullVersion = -1
versionIndexName = ".migrations"
)

var ErrUnsupportedOperationDrop = errors.New("unsupported operation 'drop'")

func init() {
database.Register(driverName, &OpenSearch{})
}

type OpenSearch struct {
transport opensearchapi.Transport
manager MigrationsIndexManagerInterface
Expand All @@ -38,6 +46,21 @@ func NewDriver(
}
}

//nolint:ireturn
func (o *OpenSearch) Open(url string) (database.Driver, error) {
config, err := NewTransportConfigFromURL(url)
if err != nil {
return nil, err
}

transport, err := opensearch.NewClient(config)
if err != nil {
return nil, fmt.Errorf("failed to initialize OpenSearch client: %w", err)
}

return NewDriver(transport, NewMigrationsIndexManager(transport)), nil
}

func (o *OpenSearch) Lock() error {
if !o.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
Expand Down Expand Up @@ -153,15 +176,10 @@ func (o *OpenSearch) Version() (version int, dirty bool, err error) {
return parsedResp.Source.Version, parsedResp.Source.Dirty, nil
}

//nolint:ireturn
func (o *OpenSearch) Open(_ string) (database.Driver, error) {
return nil, fmt.Errorf(errTemplateUnsupportedOperation, "open")
}

func (o *OpenSearch) Close() error {
return fmt.Errorf(errTemplateUnsupportedOperation, "close")
return nil
}

func (o *OpenSearch) Drop() error {
return fmt.Errorf(errTemplateUnsupportedOperation, "drop")
return ErrUnsupportedOperationDrop
}
56 changes: 56 additions & 0 deletions pkg/opensearch/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package opensearch

import (
"crypto/tls"
"fmt"
"net/http"
neturl "net/url"

"github.com/opensearch-project/opensearch-go/v2"
)

const (
DefaultScheme = "https"
DefaultPort = "9200"
QueryNameInsecureSkipVerify = "insecure-skip-verify"
)

func NewTransportConfigFromURL(url string) (opensearch.Config, error) {
parsedURL, err := neturl.Parse(url)
if err != nil {
return opensearch.Config{}, fmt.Errorf("failed to parse url: %w", err)
}

scheme := parsedURL.Scheme

if scheme == "" {
scheme = DefaultScheme
}

baseURL := scheme + "://" + parsedURL.Hostname()
port := parsedURL.Port()

if port == "" {
port = DefaultPort
}

baseURL += ":" + port

config := opensearch.Config{
Addresses: []string{baseURL},
Username: parsedURL.User.Username(),
}

passwd, isSet := parsedURL.User.Password()
if isSet {
config.Password = passwd
}

if parsedURL.Query().Get(QueryNameInsecureSkipVerify) == "true" {
config.Transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec
}
}

return config, nil
}
82 changes: 82 additions & 0 deletions pkg/opensearch/transport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package opensearch_test

import (
"crypto/tls"
"fmt"
"net/http"
"testing"

"github.com/opensearch-project/opensearch-go/v2"
"github.com/stretchr/testify/assert"

opensearchdriver "github.com/limangotech/opensearch-driver/pkg/opensearch"
)

func TestItParsesURL(t *testing.T) {
t.Parallel()

testCases := []struct {
url string
expected opensearch.Config
}{
{
// https connection
url: "https://admin:password@opensearch:1234",
expected: opensearch.Config{
Addresses: []string{"https://opensearch:1234"},
Username: "admin",
Password: "password",
},
},
{
// http connection
url: "http://admin:password@opensearch:1234",
expected: opensearch.Config{
Addresses: []string{"http://opensearch:1234"},
Username: "admin",
Password: "password",
},
},
{
// connection with no password
url: "http://admin@opensearch:1234",
expected: opensearch.Config{
Addresses: []string{"http://opensearch:1234"},
Username: "admin",
},
},
{
// connection with no auth
url: "https://opensearch:1234",
expected: opensearch.Config{Addresses: []string{"https://opensearch:1234"}},
},
{
// connection with no port
url: "https://admin:password@opensearch",
expected: opensearch.Config{
Addresses: []string{"https://opensearch:9200"},
Username: "admin",
Password: "password",
},
},
{
// connection with no TLS check
url: "https://admin:password@opensearch?insecure-skip-verify=true",
expected: opensearch.Config{
Addresses: []string{"https://opensearch:9200"},
Username: "admin",
Password: "password",
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
},
},
}

for i, testCase := range testCases {
actual, err := opensearchdriver.NewTransportConfigFromURL(testCase.url)

assert.NoError(t, err, fmt.Sprintf("Case %d", i))
assert.Equal(t, testCase.expected, actual, fmt.Sprintf("Case %d", i))
}
}