diff --git a/chart/distworker/values.yaml b/chart/distworker/values.yaml
index cfe55cb..6548194 100644
--- a/chart/distworker/values.yaml
+++ b/chart/distworker/values.yaml
@@ -118,7 +118,7 @@ controller:
- infinity
containerPorts:
- api: 8081
+ api: 8080
worker: 8081
## @param notificationService.replicaCount Number of main replicas to deploy
diff --git a/go.mod b/go.mod
index 2125872..7923998 100644
--- a/go.mod
+++ b/go.mod
@@ -1,14 +1,18 @@
module github.com/jc-lab/distworker
-go 1.24
+go 1.24.0
+
+toolchain go1.24.3
require (
github.com/AmrSaber/go-blocking-dequeue v1.0.1
github.com/aws/aws-sdk-go-v2 v1.36.3
github.com/aws/aws-sdk-go-v2/config v1.29.14
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.4
+ github.com/gin-contrib/cors v1.7.5
+ github.com/gin-gonic/gin v1.10.0
+ github.com/go-swagger/go-swagger v0.32.3
github.com/google/uuid v1.6.0
- github.com/gorilla/handlers v1.5.2
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.3
github.com/knadh/koanf/parsers/yaml v1.0.0
@@ -17,16 +21,22 @@ require (
github.com/knadh/koanf/v2 v2.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.22.0
+ github.com/swaggo/files v1.0.1
+ github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.4
go.mongodb.org/mongo-driver v1.16.1
go.uber.org/zap v1.27.0
golang.org/x/sys v0.33.0
- google.golang.org/protobuf v1.36.5
+ google.golang.org/protobuf v1.36.6
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/KyleBanks/depth v1.2.1 // indirect
+ github.com/Masterminds/goutils v1.1.1 // indirect
+ github.com/Masterminds/semver/v3 v3.2.1 // indirect
+ github.com/Masterminds/sprig/v3 v3.2.3 // indirect
+ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect
@@ -43,38 +53,85 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect
github.com/aws/smithy-go v1.22.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
+ github.com/bytedance/sonic v1.13.2 // indirect
+ github.com/bytedance/sonic/loader v0.2.4 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
- github.com/felixge/httpsnoop v1.0.3 // indirect
+ github.com/cloudwego/base64x v0.1.5 // indirect
+ github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
+ github.com/gabriel-vasile/mimetype v1.4.8 // indirect
+ github.com/gin-contrib/sse v1.0.0 // indirect
+ github.com/go-openapi/analysis v0.23.0 // indirect
+ github.com/go-openapi/errors v0.22.0 // indirect
+ github.com/go-openapi/inflect v0.21.0 // indirect
github.com/go-openapi/jsonpointer v0.21.1 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
+ github.com/go-openapi/loads v0.22.0 // indirect
+ github.com/go-openapi/runtime v0.28.0 // indirect
github.com/go-openapi/spec v0.21.0 // indirect
+ github.com/go-openapi/strfmt v0.23.0 // indirect
github.com/go-openapi/swag v0.23.1 // indirect
+ github.com/go-openapi/validate v0.24.0 // indirect
+ github.com/go-playground/locales v0.14.1 // indirect
+ github.com/go-playground/universal-translator v0.18.1 // indirect
+ github.com/go-playground/validator/v10 v10.26.0 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
+ github.com/goccy/go-json v0.10.5 // indirect
github.com/golang/snappy v0.0.4 // indirect
+ github.com/gorilla/handlers v1.5.2 // indirect
+ github.com/hashicorp/hcl v1.0.0 // indirect
+ github.com/huandu/xstrings v1.4.0 // indirect
+ github.com/imdario/mergo v0.3.16 // indirect
+ github.com/jessevdk/go-flags v1.5.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
+ github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.18.0 // indirect
+ github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/knadh/koanf/maps v0.1.2 // indirect
+ github.com/kr/pretty v0.3.1 // indirect
+ github.com/kr/text v0.2.0 // indirect
+ github.com/leodido/go-urn v1.4.0 // indirect
+ github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.9.0 // indirect
+ github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
+ github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
+ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+ github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
+ github.com/oklog/ulid v1.3.1 // indirect
+ github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
- github.com/swaggo/files v1.0.1 // indirect
- github.com/swaggo/files/v2 v2.0.2 // indirect
- github.com/swaggo/http-swagger v1.3.4 // indirect
- github.com/swaggo/http-swagger/v2 v2.0.2 // indirect
+ github.com/rogpeppe/go-internal v1.12.0 // indirect
+ github.com/sagikazarmark/locafero v0.4.0 // indirect
+ github.com/sagikazarmark/slog-shim v0.1.0 // indirect
+ github.com/shopspring/decimal v1.3.1 // indirect
+ github.com/sourcegraph/conc v0.3.0 // indirect
+ github.com/spf13/afero v1.11.0 // indirect
+ github.com/spf13/cast v1.6.0 // indirect
+ github.com/spf13/pflag v1.0.5 // indirect
+ github.com/spf13/viper v1.18.2 // indirect
+ github.com/subosito/gotenv v1.6.0 // indirect
+ github.com/toqueteos/webbrowser v1.2.0 // indirect
+ github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
+ github.com/ugorji/go/codec v1.2.12 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
- go.uber.org/multierr v1.10.0 // indirect
+ go.uber.org/multierr v1.11.0 // indirect
+ golang.org/x/arch v0.15.0 // indirect
golang.org/x/crypto v0.39.0 // indirect
+ golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa // indirect
+ golang.org/x/mod v0.25.0 // indirect
golang.org/x/net v0.41.0 // indirect
golang.org/x/sync v0.15.0 // indirect
golang.org/x/text v0.26.0 // indirect
golang.org/x/tools v0.34.0 // indirect
+ gopkg.in/ini.v1 v1.67.0 // indirect
+ gopkg.in/yaml.v2 v2.4.0 // indirect
)
diff --git a/go.sum b/go.sum
index 1638431..226ee8f 100644
--- a/go.sum
+++ b/go.sum
@@ -2,6 +2,15 @@ github.com/AmrSaber/go-blocking-dequeue v1.0.1 h1:u5rGaj9VNa/2QDLB+NezMZvzZXoALJ
github.com/AmrSaber/go-blocking-dequeue v1.0.1/go.mod h1:GXUgB8Z+qxCmeb6GC60gmRLMf+fC3dY+sF8zcCxAJDE=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
+github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI=
+github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU=
+github.com/Masterminds/semver/v3 v3.2.0/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
+github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
+github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
+github.com/Masterminds/sprig/v3 v3.2.3 h1:eL2fZNezLomi0uOLqjQoN6BfsDD+fyLtgbJMAj9n6YA=
+github.com/Masterminds/sprig/v3 v3.2.3/go.mod h1:rXcFaZ2zZbLRJv/xSysmlgIM1u11eBaRMhvYXJNkGuM=
+github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
+github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM=
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
@@ -40,28 +49,79 @@ github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ=
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/bytedance/sonic v1.13.2 h1:8/H1FempDZqC4VqjptGo14QQlJx8VdZJegxs6wwfqpQ=
+github.com/bytedance/sonic v1.13.2/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1+KgkJhz4=
+github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
+github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCNan80NzY=
+github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
-github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4=
+github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
+github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
-github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
+github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
+github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
+github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
+github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
+github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
+github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
+github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
+github.com/gin-contrib/cors v1.7.5 h1:cXC9SmofOrRg0w9PigwGlHG3ztswH6bqq4vJVXnvYMk=
+github.com/gin-contrib/cors v1.7.5/go.mod h1:4q3yi7xBEDDWKapjT2o1V7mScKDDr8k+jZ0fSquGoy0=
+github.com/gin-contrib/gzip v0.0.6 h1:NjcunTcGAj5CO1gn4N8jHOSIeRFHIbn51z6K+xaN4d4=
+github.com/gin-contrib/gzip v0.0.6/go.mod h1:QOJlmV2xmayAjkNS2Y8NQsMneuRShOU/kjovCXNuzzk=
+github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E=
+github.com/gin-contrib/sse v1.0.0/go.mod h1:zNuFdwarAygJBht0NTKiSi3jRf6RbqeILZ9Sp6Slhe0=
+github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU=
+github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y=
+github.com/go-openapi/analysis v0.23.0 h1:aGday7OWupfMs+LbmLZG4k0MYXIANxcuBTYUC03zFCU=
+github.com/go-openapi/analysis v0.23.0/go.mod h1:9mz9ZWaSlV8TvjQHLl2mUW2PbZtemkE8yA5v22ohupo=
+github.com/go-openapi/errors v0.22.0 h1:c4xY/OLxUBSTiepAg3j/MHuAv5mJhnf53LLMWFB+u/w=
+github.com/go-openapi/errors v0.22.0/go.mod h1:J3DmZScxCDufmIMsdOuDHxJbdOGC0xtUynjIx092vXE=
+github.com/go-openapi/inflect v0.21.0 h1:FoBjBTQEcbg2cJUWX6uwL9OyIW8eqc9k4KhN4lfbeYk=
+github.com/go-openapi/inflect v0.21.0/go.mod h1:INezMuUu7SJQc2AyR3WO0DqqYUJSj8Kb4hBd7WtjlAw=
github.com/go-openapi/jsonpointer v0.21.1 h1:whnzv/pNXtK2FbX/W9yJfRmE2gsmkfahjMKB0fZvcic=
github.com/go-openapi/jsonpointer v0.21.1/go.mod h1:50I1STOfbY1ycR8jGz8DaMeLCdXiI6aDteEdRNNzpdk=
github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ=
github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4=
+github.com/go-openapi/loads v0.22.0 h1:ECPGd4jX1U6NApCGG1We+uEozOAvXvJSF4nnwHZ8Aco=
+github.com/go-openapi/loads v0.22.0/go.mod h1:yLsaTCS92mnSAZX5WWoxszLj0u+Ojl+Zs5Stn1oF+rs=
+github.com/go-openapi/runtime v0.28.0 h1:gpPPmWSNGo214l6n8hzdXYhPuJcGtziTOgUpvsFWGIQ=
+github.com/go-openapi/runtime v0.28.0/go.mod h1:QN7OzcS+XuYmkQLw05akXk0jRH/eZ3kb18+1KwW9gyc=
github.com/go-openapi/spec v0.21.0 h1:LTVzPc3p/RzRnkQqLRndbAzjY0d0BCL72A6j3CdL9ZY=
github.com/go-openapi/spec v0.21.0/go.mod h1:78u6VdPw81XU44qEWGhtr982gJ5BWg2c0I5XwVMotYk=
+github.com/go-openapi/strfmt v0.23.0 h1:nlUS6BCqcnAk0pyhi9Y+kdDVZdZMHfEKQiS4HaMgO/c=
+github.com/go-openapi/strfmt v0.23.0/go.mod h1:NrtIpfKtWIygRkKVsxh7XQMDQW5HKQl6S5ik2elW+K4=
github.com/go-openapi/swag v0.23.1 h1:lpsStH0n2ittzTnbaSloVZLuB5+fvSY/+hnagBjSNZU=
github.com/go-openapi/swag v0.23.1/go.mod h1:STZs8TbRvEQQKUA+JZNAm3EWlgaOBGpyFDqQnDHMef0=
+github.com/go-openapi/validate v0.24.0 h1:LdfDKwNbpB6Vn40xhTdNZAnfLECL81w+VX3BumrGD58=
+github.com/go-openapi/validate v0.24.0/go.mod h1:iyeX1sEufmv3nPbBdX3ieNviWnOZaJ1+zquzJEf2BAQ=
+github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
+github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
+github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
+github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
+github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
+github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
+github.com/go-playground/validator/v10 v10.26.0 h1:SP05Nqhjcvz81uJaRfEV0YBSSSGMc/iMaVtFbr3Sw2k=
+github.com/go-playground/validator/v10 v10.26.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo=
+github.com/go-swagger/go-swagger v0.32.3 h1:bhAfZ4WaFXyPuw2OrXg34rOcUBR++fpVdonRRYzBK1c=
+github.com/go-swagger/go-swagger v0.32.3/go.mod h1:lAwO1nKff3qNRJYVQeTCl1am5pcNiiA2VyDf8TqzS24=
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
+github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
+github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE=
@@ -70,10 +130,25 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
+github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
+github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
+github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU=
+github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
+github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
+github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
+github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
+github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc=
+github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
+github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
+github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
+github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
+github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE=
+github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpbo=
github.com/knadh/koanf/maps v0.1.2/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI=
github.com/knadh/koanf/parsers/yaml v1.0.0 h1:PXyeHCRhAMKyfLJaoTWsqUTxIFeDMmdAKz3XVEslZV4=
@@ -84,26 +159,48 @@ github.com/knadh/koanf/providers/file v1.2.0 h1:hrUJ6Y9YOA49aNu/RSYzOTFlqzXSCpmY
github.com/knadh/koanf/providers/file v1.2.0/go.mod h1:bp1PM5f83Q+TOUu10J/0ApLBd9uIzg+n9UgthfY+nRA=
github.com/knadh/koanf/v2 v2.2.0 h1:FZFwd9bUjpb8DyCWARUBy5ovuhDs1lI87dOEn2K8UVU=
github.com/knadh/koanf/v2 v2.2.0/go.mod h1:PSFru3ufQgTsI7IF+95rf9s8XA1+aHxKuO/W+dPoHEY=
+github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
+github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
+github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
+github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
+github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4=
github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU=
+github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
+github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
+github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
+github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
+github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
+github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
+github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
+github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
+github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
+github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
+github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
+github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
-github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
+github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q=
github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
@@ -112,20 +209,55 @@ github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
-github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
-github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
+github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
+github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
+github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
+github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ=
+github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
+github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
+github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
+github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
+github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
+github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
+github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
+github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
+github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
+github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY=
+github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
+github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0=
+github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
+github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
+github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
+github.com/spf13/viper v1.18.2 h1:LUXCnvUvSM6FXAsj6nnfc8Q2tp1dIgUfY9Kc8GsSOiQ=
+github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
+github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
+github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
+github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/swaggo/files v1.0.1 h1:J1bVJ4XHZNq0I46UU90611i9/YzdrF7x92oX1ig5IdE=
github.com/swaggo/files v1.0.1/go.mod h1:0qXmMNH6sXNf+73t65aKeB+ApmgxdnkQzVTAj2uaMUg=
-github.com/swaggo/files/v2 v2.0.2 h1:Bq4tgS/yxLB/3nwOMcul5oLEUKa877Ykgz3CJMVbQKU=
-github.com/swaggo/files/v2 v2.0.2/go.mod h1:TVqetIzZsO9OhHX1Am9sRf9LdrFZqoK49N37KON/jr0=
-github.com/swaggo/http-swagger v1.3.4 h1:q7t/XLx0n15H1Q9/tk3Y9L4n210XzJF5WtnDX64a5ww=
-github.com/swaggo/http-swagger v1.3.4/go.mod h1:9dAh0unqMBAlbp1uE2Uc2mQTxNMU/ha4UbucIg1MFkQ=
-github.com/swaggo/http-swagger/v2 v2.0.2 h1:FKCdLsl+sFCx60KFsyM0rDarwiUSZ8DqbfSyIKC9OBg=
-github.com/swaggo/http-swagger/v2 v2.0.2/go.mod h1:r7/GBkAWIfK6E/OLnE8fXnviHiDeAHmgIyooa4xm3AQ=
+github.com/swaggo/gin-swagger v1.6.0 h1:y8sxvQ3E20/RCyrXeFfg60r6H0Z+SwpTjMYsMm+zy8M=
+github.com/swaggo/gin-swagger v1.6.0/go.mod h1:BG00cCEy294xtVpyIAHG6+e2Qzj/xKlRdOqDkvq0uzo=
github.com/swaggo/swag v1.16.4 h1:clWJtd9LStiG3VeijiCfOVODP6VpHtKdQy9ELFG3s1A=
github.com/swaggo/swag v1.16.4/go.mod h1:VBsHJRsDvfYvqoiMKnsdwhNV9LEMHgEDZcyVYX0sxPg=
+github.com/toqueteos/webbrowser v1.2.0 h1:tVP/gpK69Fx+qMJKsLE7TD8LuGWPnEV71wBN9rrstGQ=
+github.com/toqueteos/webbrowser v1.2.0/go.mod h1:XWoZq4cyp9WeUeak7w7LXRUQf1F1ATJMir8RTqb4ayM=
+github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
+github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
+github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
+github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
@@ -139,20 +271,26 @@ go.mongodb.org/mongo-driver v1.16.1 h1:rIVLL3q0IHM39dvE+z2ulZLp9ENZKThVfuvN/IiN4
go.mongodb.org/mongo-driver v1.16.1/go.mod h1:oB6AhJQvFQL4LEHyXi6aJzQJtBiTQHiAd83l0GdFaiw=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
-go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
-go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
+go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
+go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
+golang.org/x/arch v0.15.0 h1:QtOrQd0bTUnhNVNndMpLHNWrDmYzZ2KDqSrEymqInZw=
+golang.org/x/arch v0.15.0/go.mod h1:JmwW7aLIoRUKgaTzhkiEFxvcEiQGyOg9BMonBJUS7EE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
+golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa h1:t2QcU6V556bFjYgu4L6C+6VrCPyJZ+eyRsABUPs1mz4=
+golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa/go.mod h1:BHOTPb3L19zxehTsLoJXVaTktb06DFgmdW6Wb9s8jqk=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
@@ -162,19 +300,24 @@ golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
+golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
@@ -184,10 +327,18 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo=
golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
-google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
+google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
+google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
+gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
+gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
diff --git a/go/internal/eventstream/writer.go b/go/internal/eventstream/writer.go
new file mode 100644
index 0000000..c2e6bfb
--- /dev/null
+++ b/go/internal/eventstream/writer.go
@@ -0,0 +1,45 @@
+package eventstream
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+)
+
+type Writer interface {
+ io.Writer
+ io.Closer
+ JSON(v interface{}) error
+}
+
+type WriterImpl struct {
+ http.ResponseWriter
+}
+
+var _ Writer = (*WriterImpl)(nil)
+
+func NewWriter(w http.ResponseWriter) Writer {
+ writer := &WriterImpl{
+ ResponseWriter: w,
+ }
+ contentType := w.Header().Get("Content-Type")
+ if contentType == "" {
+ w.Header().Set("Content-Type", "text/event-stream")
+ }
+ return writer
+}
+
+func (w *WriterImpl) JSON(v interface{}) error {
+ d, err := json.Marshal(v)
+ if err != nil {
+ return err
+ }
+ _, err = w.ResponseWriter.Write([]byte(fmt.Sprintf("data: %s\n\n", d)))
+ return err
+}
+
+func (w *WriterImpl) Close() error {
+ _, err := w.ResponseWriter.Write([]byte("data: [DONE]\n\n"))
+ return err
+}
diff --git a/go/internal/openai/openai.go b/go/internal/openai/openai.go
new file mode 100644
index 0000000..91edd71
--- /dev/null
+++ b/go/internal/openai/openai.go
@@ -0,0 +1,231 @@
+// openai package provides middleware for partial compatibility with the OpenAI REST API
+package openai
+
+import (
+ "encoding/json"
+ "fmt"
+)
+
+// PropertyType can be either a string or an array of strings
+type PropertyType []string
+
+// UnmarshalJSON implements the json.Unmarshaler interface
+func (pt *PropertyType) UnmarshalJSON(data []byte) error {
+ // Try to unmarshal as a string first
+ var s string
+ if err := json.Unmarshal(data, &s); err == nil {
+ *pt = []string{s}
+ return nil
+ }
+
+ // If that fails, try to unmarshal as an array of strings
+ var a []string
+ if err := json.Unmarshal(data, &a); err != nil {
+ return err
+ }
+ *pt = a
+ return nil
+}
+
+// MarshalJSON implements the json.Marshaler interface
+func (pt PropertyType) MarshalJSON() ([]byte, error) {
+ if len(pt) == 1 {
+ // If there's only one type, marshal as a string
+ return json.Marshal(pt[0])
+ }
+ // Otherwise marshal as an array
+ return json.Marshal([]string(pt))
+}
+
+// String returns a string representation of the PropertyType
+func (pt PropertyType) String() string {
+ if len(pt) == 0 {
+ return ""
+ }
+ if len(pt) == 1 {
+ return pt[0]
+ }
+ return fmt.Sprintf("%v", []string(pt))
+}
+
+type ToolFunction struct {
+ Name string `json:"name"`
+ Description string `json:"description"`
+ Parameters struct {
+ Type string `json:"type"`
+ Defs any `json:"$defs,omitempty"`
+ Items any `json:"items,omitempty"`
+ Required []string `json:"required"`
+ Properties map[string]struct {
+ Type PropertyType `json:"type"`
+ Items any `json:"items,omitempty"`
+ Description string `json:"description"`
+ Enum []any `json:"enum,omitempty"`
+ } `json:"properties"`
+ } `json:"parameters"`
+}
+
+type Tool struct {
+ Type string `json:"type"`
+ Items any `json:"items,omitempty"`
+ Function ToolFunction `json:"function"`
+}
+
+type Error struct {
+ Message string `json:"message"`
+ Type string `json:"type"`
+ Param any `json:"param"`
+ Code *string `json:"code"`
+}
+
+type ErrorResponse struct {
+ Error Error `json:"error"`
+}
+
+type Message struct {
+ Role string `json:"role"`
+ Content any `json:"content"`
+ ToolCalls []ToolCall `json:"tool_calls,omitempty"`
+}
+
+type Choice struct {
+ Index int `json:"index"`
+ Message Message `json:"message"`
+ FinishReason *string `json:"finish_reason"`
+}
+
+type ChunkChoice struct {
+ Index int `json:"index"`
+ Delta Message `json:"delta"`
+ FinishReason *string `json:"finish_reason"`
+}
+
+type CompleteChunkChoice struct {
+ Text string `json:"text"`
+ Index int `json:"index"`
+ FinishReason *string `json:"finish_reason"`
+}
+
+type Usage struct {
+ PromptTokens int `json:"prompt_tokens"`
+ CompletionTokens int `json:"completion_tokens"`
+ TotalTokens int `json:"total_tokens"`
+}
+
+type ResponseFormat struct {
+ Type string `json:"type"`
+ JsonSchema *JsonSchema `json:"json_schema,omitempty"`
+}
+
+type JsonSchema struct {
+ Schema json.RawMessage `json:"schema"`
+}
+
+type EmbedRequest struct {
+ Input any `json:"input"`
+ Model string `json:"model"`
+}
+
+type StreamOptions struct {
+ IncludeUsage bool `json:"include_usage"`
+}
+
+type ChatCompletionRequest struct {
+ Model string `json:"model"`
+ Messages []Message `json:"messages"`
+ Stream bool `json:"stream"`
+ StreamOptions *StreamOptions `json:"stream_options"`
+ MaxTokens *int `json:"max_tokens"`
+ Seed *int `json:"seed"`
+ Stop any `json:"stop"`
+ Temperature *float64 `json:"temperature"`
+ FrequencyPenalty *float64 `json:"frequency_penalty"`
+ PresencePenalty *float64 `json:"presence_penalty"`
+ TopP *float64 `json:"top_p"`
+ ResponseFormat *ResponseFormat `json:"response_format"`
+ Tools []Tool `json:"tools"`
+}
+
+type ChatCompletion struct {
+ Id string `json:"id"`
+ Object string `json:"object"`
+ Created int64 `json:"created"`
+ Model string `json:"model"`
+ SystemFingerprint string `json:"system_fingerprint"`
+ Choices []Choice `json:"choices"`
+ Usage Usage `json:"usage,omitempty"`
+}
+
+type ChatCompletionChunk struct {
+ Id string `json:"id"`
+ Object string `json:"object"`
+ Created int64 `json:"created"`
+ Model string `json:"model"`
+ SystemFingerprint string `json:"system_fingerprint"`
+ Choices []ChunkChoice `json:"choices"`
+ Usage *Usage `json:"usage,omitempty"`
+}
+
+// TODO (https://github.com/ollama/ollama/issues/5259): support []string, []int and [][]int
+type CompletionRequest struct {
+ Model string `json:"model"`
+ Prompt string `json:"prompt"`
+ FrequencyPenalty float32 `json:"frequency_penalty"`
+ MaxTokens *int `json:"max_tokens"`
+ PresencePenalty float32 `json:"presence_penalty"`
+ Seed *int `json:"seed"`
+ Stop any `json:"stop"`
+ Stream bool `json:"stream"`
+ StreamOptions *StreamOptions `json:"stream_options"`
+ Temperature *float32 `json:"temperature"`
+ TopP float32 `json:"top_p"`
+ Suffix string `json:"suffix"`
+}
+
+type Completion struct {
+ Id string `json:"id"`
+ Object string `json:"object"`
+ Created int64 `json:"created"`
+ Model string `json:"model"`
+ SystemFingerprint string `json:"system_fingerprint"`
+ Choices []CompleteChunkChoice `json:"choices"`
+ Usage Usage `json:"usage,omitempty"`
+}
+
+type CompletionChunk struct {
+ Id string `json:"id"`
+ Object string `json:"object"`
+ Created int64 `json:"created"`
+ Choices []CompleteChunkChoice `json:"choices"`
+ Model string `json:"model"`
+ SystemFingerprint string `json:"system_fingerprint"`
+ Usage *Usage `json:"usage,omitempty"`
+}
+
+type ToolCall struct {
+ ID string `json:"id"`
+ Index int `json:"index"`
+ Type string `json:"type"`
+ Function struct {
+ Name string `json:"name"`
+ Arguments string `json:"arguments"`
+ } `json:"function"`
+}
+
+type Embedding struct {
+ Object string `json:"object"`
+ Embedding []float32 `json:"embedding"`
+ Index int `json:"index"`
+}
+
+type EmbeddingList struct {
+ Object string `json:"object"`
+ Data []Embedding `json:"data"`
+ Model string `json:"model"`
+ Usage EmbeddingUsage `json:"usage,omitempty"`
+}
+
+type EmbeddingUsage struct {
+ PromptTokens int `json:"prompt_tokens"`
+ TotalTokens int `json:"total_tokens"`
+}
diff --git a/go/pkg/controller/config/config.go b/go/pkg/controller/config/config.go
index dd3a052..0d0f463 100644
--- a/go/pkg/controller/config/config.go
+++ b/go/pkg/controller/config/config.go
@@ -68,7 +68,18 @@ type ServerConfig struct {
// APIConfig represents API server configuration
type APIConfig struct {
- Port int `koanf:"port"`
+ Port int `koanf:"port"`
+ ReadTimeout int `koanf:"read_timeout"`
+ WriteTimeout int `koanf:"write_timeout"`
+ IdleTimeout int `koanf:"idle_timeout"`
+ OpenAi OpenAiApiConfig `koanf:"openai"`
+}
+
+type OpenAiApiConfig struct {
+ Enabled bool `koanf:"enabled"`
+ ChatCompletionsQueue string `koanf:"chat_completions_queue"`
+ CompletionsQueue string `koanf:"completions_queue"`
+ EmbeddingsQueue string `koanf:"embeddings_queue"`
}
// WorkerConfig represents worker server configuration
diff --git a/go/pkg/controller/eventbus/eventbus.go b/go/pkg/controller/eventbus/eventbus.go
deleted file mode 100644
index 2a7994a..0000000
--- a/go/pkg/controller/eventbus/eventbus.go
+++ /dev/null
@@ -1,94 +0,0 @@
-// distworker
-// Copyright (C) 2025 JC-Lab
-//
-// SPDX-License-Identifier: AGPL-3.0-only
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see .
-
-package eventbus
-
-import (
- "context"
- "sync"
-)
-
-type Bus[T interface{}] interface {
- Publish(id string, data T)
- Listen(ctx context.Context, id string) (T, error)
-}
-
-type eventTopic[T interface{}] struct {
- done chan struct{}
- data T
-}
-
-type busImpl[T interface{}] struct {
- mu sync.Mutex
- topics map[string]*eventTopic[T]
-}
-
-var _ Bus[any] = (*busImpl[any])(nil)
-
-func New[T interface{}]() Bus[T] {
- return &busImpl[T]{
- topics: make(map[string]*eventTopic[T]),
- }
-}
-
-func (b *busImpl[T]) Publish(id string, data T) {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- topic := b.topics[id]
- if topic != nil {
- topic.data = data
- delete(b.topics, id)
- close(topic.done)
- }
-}
-
-func (b *busImpl[T]) Listen(ctx context.Context, id string) (T, error) {
- topic := b.getOrCreateTopic(id)
- return topic.waitForData(ctx)
-}
-
-func newEventTopic[T interface{}]() *eventTopic[T] {
- topic := &eventTopic[T]{
- done: make(chan struct{}),
- }
- return topic
-}
-
-func (t *eventTopic[T]) waitForData(ctx context.Context) (T, error) {
- select {
- case <-ctx.Done():
- var empty T
- return empty, ctx.Err()
- case _, _ = <-t.done:
- return t.data, nil
- }
-}
-
-func (b *busImpl[T]) getOrCreateTopic(id string) *eventTopic[T] {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if topic, exists := b.topics[id]; exists {
- return topic
- }
-
- topic := newEventTopic[T]()
- b.topics[id] = topic
- return topic
-}
diff --git a/go/pkg/controller/handlers.go b/go/pkg/controller/handlers.go
index 0973528..8c95f02 100644
--- a/go/pkg/controller/handlers.go
+++ b/go/pkg/controller/handlers.go
@@ -20,14 +20,17 @@ package controller
import (
"context"
- "encoding/json"
+ "errors"
+ "github.com/gin-gonic/gin"
"github.com/jc-lab/distworker/go/internal/protocol"
"github.com/jc-lab/distworker/go/internal/version"
"github.com/jc-lab/distworker/go/pkg/api"
"github.com/jc-lab/distworker/go/pkg/controller/database"
+ "github.com/jc-lab/distworker/go/pkg/controller/subscriber"
"github.com/jc-lab/distworker/go/pkg/healthchecker"
models2 "github.com/jc-lab/distworker/go/pkg/models"
"github.com/jc-lab/distworker/go/pkg/types"
+ errors2 "github.com/pkg/errors"
"io"
"log"
"mime/multipart"
@@ -37,8 +40,6 @@ import (
"net/http"
"strconv"
"time"
-
- "github.com/gorilla/mux"
)
// handleCreateTask handles POST /api/v1/tasks
@@ -54,38 +55,71 @@ import (
// @Failure 400 {object} api.ErrorResponse
// @Failure 500 {object} api.ErrorResponse
// @Router /tasks [post]
-func (s *Server) handleCreateTask(w http.ResponseWriter, r *http.Request) {
- contentType := r.Header.Get("Content-Type")
+func (s *Server) handleCreateTask(c *gin.Context) {
+ contentType := c.GetHeader("Content-Type")
// Check if it's multipart form data (file upload)
if strings.HasPrefix(contentType, "multipart/form-data") {
- s.handleCreateTaskWithFiles(w, r)
+ s.handleCreateTaskWithFiles(c)
return
}
var wait int = 0
- waitParam := r.URL.Query().Get("wait")
+ waitParam := c.Request.URL.Query().Get("wait")
if waitParam != "" {
var err error
wait, err = strconv.Atoi(waitParam)
if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
}
// Handle JSON request
var request api.CreateTaskRequest
+ if err := c.ShouldBindJSON(&request); err != nil {
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": err.Error()})
+ return
+ }
- if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
- http.Error(w, "Invalid JSON", http.StatusBadRequest)
+ task, listener, err := s.CreateTask(c.Request.Context(), &request, wait != 0)
+ if err != nil {
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
+ if listener != nil {
+ var err error
+ var waitCtx context.Context
+ var waitCancel context.CancelFunc
+ if wait > 0 {
+ waitCtx, waitCancel = context.WithTimeout(c.Request.Context(), time.Duration(wait)*time.Millisecond)
+ } else {
+ waitCtx, waitCancel = context.WithCancel(c.Request.Context())
+ }
+ defer waitCancel()
+
+ newTask, err := listener.Wait(waitCtx)
+ if err != nil {
+ log.Printf("Task[%s] wait failed: %+v", task.Id, err)
+
+ newTask, err = s.db.GetTaskRepository().GetById(c.Request.Context(), task.Id)
+ if err != nil {
+ log.Printf("Task[%s] get failed: %+v", task.Id, err)
+ } else {
+ task = newTask
+ }
+ } else {
+ task = newTask
+ }
+ }
+
+ c.JSON(http.StatusOK, task)
+}
+func (s *Server) CreateTask(ctx context.Context, request *api.CreateTaskRequest, wait bool) (*models2.Task, subscriber.Listener[*models2.Task, *models2.TaskProgress], error) {
// Validate required fields
if request.Queue == "" {
- http.Error(w, "queue is required", http.StatusBadRequest)
- return
+ return nil, nil, errors.New("queue is required")
}
// Parse timeout
@@ -93,8 +127,7 @@ func (s *Server) handleCreateTask(w http.ResponseWriter, r *http.Request) {
if request.Timeout != "" {
duration, err := time.ParseDuration(request.Timeout)
if err != nil {
- http.Error(w, "Invalid timeout format", http.StatusBadRequest)
- return
+ return nil, nil, errors2.Wrap(err, "invalid timeout format")
}
timeoutMS = duration.Milliseconds()
}
@@ -111,73 +144,54 @@ func (s *Server) handleCreateTask(w http.ResponseWriter, r *http.Request) {
MaxRetry: request.Retry,
}
- if err := s.db.GetTaskRepository().Create(r.Context(), task); err != nil {
- http.Error(w, "Failed to create task", http.StatusInternalServerError)
- return
- }
-
- s.workerManager.EnqueueTask(task)
+ var taskListener subscriber.Listener[*models2.Task, *models2.TaskProgress]
- if wait < 0 || wait > 0 {
+ if wait {
var err error
- var waitCtx context.Context
- var waitCancel context.CancelFunc
- if wait > 0 {
- waitCtx, waitCancel = context.WithTimeout(r.Context(), time.Duration(wait)*time.Millisecond)
- } else {
- waitCtx, waitCancel = context.WithCancel(r.Context())
- }
- defer waitCancel()
-
- newTask, err := s.workerManager.WaitTask(waitCtx, task.Id)
+ taskListener, err = s.workerManager.GetTaskListener(task.Id)
if err != nil {
- log.Printf("Task[%s] wait failed: %+v", task.Id, err)
-
- task, err = s.db.GetTaskRepository().GetById(r.Context(), task.Id)
- if err != nil {
- log.Printf("get task[%s] failed: %+v", task.Id, err)
- http.Error(w, "Task not found", http.StatusNotFound)
- return
- }
- } else {
- task = newTask
+ return nil, nil, err
}
}
- writeJson(w, http.StatusOK, task)
+ if err := s.db.GetTaskRepository().Create(ctx, task); err != nil {
+ return nil, nil, errors2.Wrap(err, "failed to create task")
+ }
+ s.workerManager.EnqueueTask(task)
+
+ return task, taskListener, nil
}
// handleCreateTaskWithFiles handles POST /api/v1/tasks with file uploads
-func (s *Server) handleCreateTaskWithFiles(w http.ResponseWriter, r *http.Request) {
+func (s *Server) handleCreateTaskWithFiles(c *gin.Context) {
// Parse multipart form
- err := r.ParseMultipartForm(32 << 20) // 32MB max memory
+ err := c.Request.ParseMultipartForm(32 << 20) // 32MB max memory
if err != nil {
- http.Error(w, "Failed to parse multipart form", http.StatusBadRequest)
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": "Failed to parse multipart form"})
return
}
defer func() {
- _ = r.MultipartForm.RemoveAll()
+ _ = c.Request.MultipartForm.RemoveAll()
}()
// Get task data from form
- taskDataStr := r.FormValue("task")
+ taskDataStr := c.Request.FormValue("task")
if taskDataStr == "" {
- http.Error(w, "task field is required", http.StatusBadRequest)
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": "task field is required"})
return
}
// Parse task JSON
var request api.CreateTaskRequest
-
- if err := json.Unmarshal([]byte(taskDataStr), &request); err != nil {
- http.Error(w, "Invalid task JSON", http.StatusBadRequest)
+ if err := c.ShouldBindJSON(&request); err != nil {
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// Validate required fields
if request.Queue == "" {
- http.Error(w, "queue is required", http.StatusBadRequest)
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": "queue is required"})
return
}
@@ -186,7 +200,7 @@ func (s *Server) handleCreateTaskWithFiles(w http.ResponseWriter, r *http.Reques
if request.Timeout != "" {
duration, err := time.ParseDuration(request.Timeout)
if err != nil {
- http.Error(w, "Invalid timeout format", http.StatusBadRequest)
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": "Invalid timeout format"})
return
}
timeoutMS = duration.Milliseconds()
@@ -194,12 +208,12 @@ func (s *Server) handleCreateTaskWithFiles(w http.ResponseWriter, r *http.Reques
// Handle file uploads
var fileInfos []models2.FileInfo
- if r.MultipartForm != nil && r.MultipartForm.File != nil {
- for fieldName, fileHeaders := range r.MultipartForm.File {
+ if c.Request.MultipartForm != nil && c.Request.MultipartForm.File != nil {
+ for fieldName, fileHeaders := range c.Request.MultipartForm.File {
for _, fileHeader := range fileHeaders {
- fileInfo, err := s.uploadFile(r, fieldName, fileHeader)
+ fileInfo, err := s.uploadFile(c.Request, fieldName, fileHeader)
if err != nil {
- http.Error(w, "Failed to upload file: "+err.Error(), http.StatusInternalServerError)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "Failed to upload file: " + err.Error()})
return
}
fileInfos = append(fileInfos, *fileInfo)
@@ -219,15 +233,15 @@ func (s *Server) handleCreateTaskWithFiles(w http.ResponseWriter, r *http.Reques
CreatedAt: models2.Now(),
}
- if err := s.db.GetTaskRepository().Create(r.Context(), task); err != nil {
- http.Error(w, "Failed to create task", http.StatusInternalServerError)
+ if err := s.db.GetTaskRepository().Create(c.Request.Context(), task); err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "Failed to create task"})
return
}
// Try to assign the task to an available worker
s.workerManager.EnqueueTask(task)
- writeJson(w, http.StatusOK, task)
+ c.JSON(http.StatusOK, task)
}
// uploadFile uploads a single file to storage
@@ -289,17 +303,16 @@ func (s *Server) uploadFile(r *http.Request, fieldName string, fileHeader *multi
// @Success 200 {object} models.Task
// @Failure 404 {object} api.ErrorResponse
// @Router /tasks/{task_id} [get]
-func (s *Server) handleGetTask(w http.ResponseWriter, r *http.Request) {
- vars := mux.Vars(r)
- taskId := vars["task_id"]
+func (s *Server) handleGetTask(c *gin.Context) {
+ taskId := c.Param("task_id")
- task, err := s.db.GetTaskRepository().GetById(r.Context(), taskId)
+ task, err := s.db.GetTaskRepository().GetById(c.Request.Context(), taskId)
if err != nil {
- http.Error(w, "Task not found", http.StatusNotFound)
+ c.AbortWithStatusJSON(http.StatusNotFound, gin.H{"error": "Task not found"})
return
}
- writeJson(w, http.StatusOK, task)
+ c.JSON(http.StatusOK, task)
}
// handleListTasks handles GET /api/v1/tasks
@@ -316,12 +329,12 @@ func (s *Server) handleGetTask(w http.ResponseWriter, r *http.Request) {
// @Success 200 {object} api.ListTasksResponse
// @Failure 500 {object} api.ErrorResponse
// @Router /tasks [get]
-func (s *Server) handleListTasks(w http.ResponseWriter, r *http.Request) {
+func (s *Server) handleListTasks(c *gin.Context) {
// Parse query parameters
- queue := r.URL.Query().Get("queue")
- status := r.URL.Query().Get("status")
- pageStr := r.URL.Query().Get("page")
- limitStr := r.URL.Query().Get("limit")
+ queue := c.Request.URL.Query().Get("queue")
+ status := c.Request.URL.Query().Get("status")
+ pageStr := c.Request.URL.Query().Get("page")
+ limitStr := c.Request.URL.Query().Get("limit")
page := 1
if pageStr != "" {
@@ -343,9 +356,9 @@ func (s *Server) handleListTasks(w http.ResponseWriter, r *http.Request) {
}
_ = status
- tasks, total, err := s.db.GetTaskRepository().List(r.Context(), filter, page, limit)
+ tasks, total, err := s.db.GetTaskRepository().List(c.Request.Context(), filter, page, limit)
if err != nil {
- http.Error(w, "Failed to list tasks", http.StatusInternalServerError)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "Failed to list tasks"})
return
}
@@ -361,7 +374,7 @@ func (s *Server) handleListTasks(w http.ResponseWriter, r *http.Request) {
},
}
- writeJson(w, http.StatusOK, response)
+ c.JSON(http.StatusOK, response)
}
// handleDeleteTask handles DELETE /api/v1/tasks/{task_id}
@@ -376,20 +389,19 @@ func (s *Server) handleListTasks(w http.ResponseWriter, r *http.Request) {
// @Failure 400 {object} api.ErrorResponse
// @Failure 404 {object} api.ErrorResponse
// @Router /tasks/{task_id} [delete]
-func (s *Server) handleDeleteTask(w http.ResponseWriter, r *http.Request) {
- vars := mux.Vars(r)
- taskId := vars["task_id"]
+func (s *Server) handleDeleteTask(c *gin.Context) {
+ taskId := c.Param("task_id")
// Get task first to check if it exists and can be cancelled
- task, err := s.db.GetTaskRepository().GetById(r.Context(), taskId)
+ task, err := s.db.GetTaskRepository().GetById(c.Request.Context(), taskId)
if err != nil {
- http.Error(w, "Task not found", http.StatusNotFound)
+ c.AbortWithStatusJSON(http.StatusNotFound, gin.H{"error": "Task not found"})
return
}
// Only allow cancellation of pending or processing tasks
if task.Status != types.TaskStatusPending && task.Status != types.TaskStatusProcessing {
- http.Error(w, "Task cannot be cancelled", http.StatusBadRequest)
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": "Task cannot be cancelled"})
return
}
@@ -401,8 +413,8 @@ func (s *Server) handleDeleteTask(w http.ResponseWriter, r *http.Request) {
}
task.CompletedAt = models2.NowPtr()
- if err := s.db.GetTaskRepository().Update(r.Context(), task); err != nil {
- http.Error(w, "Failed to cancel task", http.StatusInternalServerError)
+ if err := s.db.GetTaskRepository().Update(c.Request.Context(), task); err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "Failed to cancel task"})
return
}
@@ -411,7 +423,7 @@ func (s *Server) handleDeleteTask(w http.ResponseWriter, r *http.Request) {
Status: task.Status,
}
- writeJson(w, http.StatusOK, response)
+ c.JSON(http.StatusOK, response)
}
// handleCreateQueue handles POST /api/v1/queues
@@ -427,16 +439,15 @@ func (s *Server) handleDeleteTask(w http.ResponseWriter, r *http.Request) {
// @Failure 400 {object} api.ErrorResponse
// @Failure 500 {object} api.ErrorResponse
// @Router /queues [post]
-func (s *Server) handleCreateQueue(w http.ResponseWriter, r *http.Request) {
+func (s *Server) handleCreateQueue(c *gin.Context) {
var request api.CreateQueueRequest
-
- if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
- http.Error(w, "Invalid JSON", http.StatusBadRequest)
+ if err := c.ShouldBindJSON(&request); err != nil {
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if request.Name == "" {
- http.Error(w, "name is required", http.StatusBadRequest)
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": "name is required"})
return
}
@@ -448,12 +459,12 @@ func (s *Server) handleCreateQueue(w http.ResponseWriter, r *http.Request) {
UpdatedAt: models2.Now(),
}
- if err := s.db.GetQueueRepository().Create(r.Context(), queue); err != nil {
- http.Error(w, "Failed to create queue", http.StatusInternalServerError)
+ if err := s.db.GetQueueRepository().Create(c.Request.Context(), queue); err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "Failed to create queue"})
return
}
- writeJson(w, http.StatusOK, queue)
+ c.JSON(http.StatusOK, queue)
}
// handleListQueues handles GET /api/v1/queues
@@ -466,10 +477,10 @@ func (s *Server) handleCreateQueue(w http.ResponseWriter, r *http.Request) {
// @Success 200 {object} api.ListQueuesResponse
// @Failure 500 {object} api.ErrorResponse
// @Router /queues [get]
-func (s *Server) handleListQueues(w http.ResponseWriter, r *http.Request) {
- queues, err := s.db.GetQueueRepository().List(r.Context())
+func (s *Server) handleListQueues(c *gin.Context) {
+ queues, err := s.db.GetQueueRepository().List(c.Request.Context())
if err != nil {
- http.Error(w, "Failed to list queues", http.StatusInternalServerError)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "Failed to list queues"})
return
}
@@ -477,7 +488,7 @@ func (s *Server) handleListQueues(w http.ResponseWriter, r *http.Request) {
Queues: queues,
}
- writeJson(w, http.StatusOK, response)
+ c.JSON(http.StatusOK, response)
}
// handleGetQueue handles GET /api/v1/queues/{queue_name}
@@ -491,17 +502,16 @@ func (s *Server) handleListQueues(w http.ResponseWriter, r *http.Request) {
// @Success 200 {object} models.Queue
// @Failure 404 {object} api.ErrorResponse
// @Router /queues/{queue_name} [get]
-func (s *Server) handleGetQueue(w http.ResponseWriter, r *http.Request) {
- vars := mux.Vars(r)
- queueName := vars["queue_name"]
+func (s *Server) handleGetQueue(c *gin.Context) {
+ queueName := c.Param("queue_name")
- queue, err := s.db.GetQueueRepository().GetByName(r.Context(), queueName)
+ queue, err := s.db.GetQueueRepository().GetByName(c.Request.Context(), queueName)
if err != nil {
- http.Error(w, "Queue not found", http.StatusNotFound)
+ c.AbortWithStatusJSON(http.StatusNotFound, gin.H{"error": "Queue not found"})
return
}
- writeJson(w, http.StatusOK, queue)
+ c.JSON(http.StatusOK, queue)
}
// handleUpdateQueue handles PUT /api/v1/queues/{queue_name}
@@ -518,32 +528,30 @@ func (s *Server) handleGetQueue(w http.ResponseWriter, r *http.Request) {
// @Failure 404 {object} api.ErrorResponse
// @Failure 500 {object} api.ErrorResponse
// @Router /queues/{queue_name} [put]
-func (s *Server) handleUpdateQueue(w http.ResponseWriter, r *http.Request) {
- vars := mux.Vars(r)
- queueName := vars["queue_name"]
+func (s *Server) handleUpdateQueue(c *gin.Context) {
+ queueName := c.Param("queue_name")
- queue, err := s.db.GetQueueRepository().GetByName(r.Context(), queueName)
+ queue, err := s.db.GetQueueRepository().GetByName(c.Request.Context(), queueName)
if err != nil {
- http.Error(w, "Queue not found", http.StatusNotFound)
+ c.AbortWithStatusJSON(http.StatusNotFound, gin.H{"error": "Queue not found"})
return
}
var request api.UpdateQueueRequest
-
- if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
- http.Error(w, "Invalid JSON", http.StatusBadRequest)
+ if err := c.ShouldBindJSON(&request); err != nil {
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
queue.Description = request.Description
queue.UpdatedAt = models2.Now()
- if err := s.db.GetQueueRepository().Update(r.Context(), queue); err != nil {
- http.Error(w, "Failed to update queue", http.StatusInternalServerError)
+ if err := s.db.GetQueueRepository().Update(c.Request.Context(), queue); err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "Failed to update queue"})
return
}
- writeJson(w, http.StatusOK, queue)
+ c.JSON(http.StatusOK, queue)
}
// handleDeleteQueue handles DELETE /api/v1/queues/{queue_name}
@@ -557,12 +565,11 @@ func (s *Server) handleUpdateQueue(w http.ResponseWriter, r *http.Request) {
// @Success 200 {object} api.DeleteQueueResponse
// @Failure 500 {object} api.ErrorResponse
// @Router /queues/{queue_name} [delete]
-func (s *Server) handleDeleteQueue(w http.ResponseWriter, r *http.Request) {
- vars := mux.Vars(r)
- queueName := vars["queue_name"]
+func (s *Server) handleDeleteQueue(c *gin.Context) {
+ queueName := c.Param("queue_name")
- if err := s.db.GetQueueRepository().Delete(r.Context(), queueName); err != nil {
- http.Error(w, "Failed to delete queue", http.StatusInternalServerError)
+ if err := s.db.GetQueueRepository().Delete(c.Request.Context(), queueName); err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "Failed to delete queue"})
return
}
@@ -570,7 +577,7 @@ func (s *Server) handleDeleteQueue(w http.ResponseWriter, r *http.Request) {
Status: "deleted",
}
- writeJson(w, http.StatusOK, response)
+ c.JSON(http.StatusOK, response)
}
// handleGetQueueStats handles GET /api/v1/queues/{queue_name}/stats
@@ -584,9 +591,9 @@ func (s *Server) handleDeleteQueue(w http.ResponseWriter, r *http.Request) {
// @Success 200 {object} models.QueueStats
// @Failure 501 {object} api.ErrorResponse
// @Router /queues/{queue_name}/stats [get]
-func (s *Server) handleGetQueueStats(w http.ResponseWriter, r *http.Request) {
+func (s *Server) handleGetQueueStats(c *gin.Context) {
// TODO: Implement queue statistics
- http.Error(w, "Not implemented", http.StatusNotImplemented)
+ c.AbortWithStatusJSON(http.StatusNotImplemented, gin.H{"error": "Not implemented"})
}
// handleListWorkers handles GET /api/v1/workers
@@ -599,10 +606,10 @@ func (s *Server) handleGetQueueStats(w http.ResponseWriter, r *http.Request) {
// @Success 200 {object} api.ListWorkersResponse
// @Failure 500 {object} api.ErrorResponse
// @Router /workers [get]
-func (s *Server) handleListWorkers(w http.ResponseWriter, r *http.Request) {
- sessions, err := s.db.GetWorkerSessionRepository().List(r.Context())
+func (s *Server) handleListWorkers(c *gin.Context) {
+ sessions, err := s.db.GetWorkerSessionRepository().List(c.Request.Context())
if err != nil {
- http.Error(w, "Failed to list workers", http.StatusInternalServerError)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "Failed to list workers"})
return
}
@@ -615,7 +622,7 @@ func (s *Server) handleListWorkers(w http.ResponseWriter, r *http.Request) {
Workers: workers,
}
- writeJson(w, http.StatusOK, response)
+ c.JSON(http.StatusOK, response)
}
// handleDeleteWorker handles DELETE /api/v1/workers/{worker_id}
@@ -629,9 +636,9 @@ func (s *Server) handleListWorkers(w http.ResponseWriter, r *http.Request) {
// @Success 200 {object} api.DeleteWorkerResponse
// @Failure 501 {object} api.ErrorResponse
// @Router /workers/{worker_id} [delete]
-func (s *Server) handleDeleteWorker(w http.ResponseWriter, r *http.Request) {
+func (s *Server) handleDeleteWorker(c *gin.Context) {
// TODO: Implement worker disconnection
- http.Error(w, "Not implemented", http.StatusNotImplemented)
+ c.AbortWithStatusJSON(http.StatusNotImplemented, gin.H{"error": "Not implemented"})
}
// handleListProvisioners handles GET /api/v1/provisioners
@@ -644,9 +651,9 @@ func (s *Server) handleDeleteWorker(w http.ResponseWriter, r *http.Request) {
// @Success 200 {array} models.Provisioner
// @Failure 501 {object} api.ErrorResponse
// @Router /provisioners [get]
-func (s *Server) handleListProvisioners(w http.ResponseWriter, r *http.Request) {
+func (s *Server) handleListProvisioners(c *gin.Context) {
// TODO: Implement provisioner listing
- http.Error(w, "Not implemented", http.StatusNotImplemented)
+ c.AbortWithStatusJSON(http.StatusNotImplemented, gin.H{"error": "Not implemented"})
}
// handleHealth handles GET /health
@@ -659,14 +666,14 @@ func (s *Server) handleListProvisioners(w http.ResponseWriter, r *http.Request)
// @Success 200 {object} api.HealthResponse
// @Failure 503 {object} api.HealthResponse
// @Router /health [get]
-func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
+func (s *Server) handleHealth(c *gin.Context) {
response := &api.HealthResponse{
Timestamp: time.Now().UnixMilli(),
Version: version.Version,
Details: make(map[string]*api.HealthDetail),
}
- response.Details, response.Status = healthchecker.Check(r.Context(), []healthchecker.Checkable{
+ response.Details, response.Status = healthchecker.Check(c.Request.Context(), []healthchecker.Checkable{
&healthchecker.Feature{
Name: "mongodb",
HealthFunc: s.db.Health,
@@ -679,9 +686,9 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
},
}, time.Second)
if response.Status == types.HealthStatusDown {
- writeJson(w, http.StatusServiceUnavailable, response)
+ c.AbortWithStatusJSON(http.StatusServiceUnavailable, response)
} else {
- writeJson(w, http.StatusOK, response)
+ c.JSON(http.StatusOK, response)
}
}
@@ -694,23 +701,23 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
// @Produce text/plain
// @Success 200 {string} string "Prometheus metrics"
// @Router /metrics [get]
-func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) {
+func (s *Server) handleMetrics(c *gin.Context) {
// Collect latest metrics from database before serving
if s.metrics != nil {
- s.metrics.CollectDatabaseStats(r.Context(), s.db)
- s.metrics.CollectWorkerStats(r.Context(), s.workerManager)
+ s.metrics.CollectDatabaseStats(c.Request.Context(), s.db)
+ s.metrics.CollectWorkerStats(c.Request.Context(), s.workerManager)
}
// Serve Prometheus metrics
- s.metricsHandler.ServeHTTP(w, r)
+ s.metricsHandler.ServeHTTP(c.Writer, c.Request)
}
// handleWorkerWebSocket handles WebSocket connections from workers
-func (s *Server) handleWorkerWebSocket(w http.ResponseWriter, r *http.Request) {
+func (s *Server) handleWorkerWebSocket(c *gin.Context) {
// Upgrade HTTP connection to WebSocket
- conn, err := s.upgrader.Upgrade(w, r, nil)
+ conn, err := s.upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
- http.Error(w, "Failed to upgrade to WebSocket", http.StatusBadRequest)
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": "Failed to upgrade to WebSocket"})
return
}
@@ -719,61 +726,51 @@ func (s *Server) handleWorkerWebSocket(w http.ResponseWriter, r *http.Request) {
}
// handleFileDownload handles GET /worker/v1/file/{file_id}
-func (s *Server) handleFileDownload(w http.ResponseWriter, r *http.Request) {
- vars := mux.Vars(r)
- fileId := vars["file_id"]
+func (s *Server) handleFileDownload(c *gin.Context) {
+ fileId := c.Param("file_id")
- vctx, err := protocol.NewValidateContext(r)
+ vctx, err := protocol.NewValidateContext(c.Request)
if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
- workerInfo, err := s.db.GetWorkerSessionRepository().GetById(r.Context(), vctx.WorkerId)
+ workerInfo, err := s.db.GetWorkerSessionRepository().GetById(c.Request.Context(), vctx.WorkerId)
if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
if err = vctx.ValidateSignature(workerInfo.WorkerToken); err != nil {
- http.Error(w, err.Error(), http.StatusForbidden)
+ c.AbortWithStatusJSON(http.StatusForbidden, gin.H{"error": err.Error()})
return
}
if fileId == "" {
- http.Error(w, "file_id is required", http.StatusBadRequest)
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": "file_id is required"})
return
}
// Download file from storage
- reader, fileInfo, err := s.storage.Download(r.Context(), fileId)
+ reader, fileInfo, err := s.storage.Download(c.Request.Context(), fileId)
if err != nil {
if strings.Contains(err.Error(), "not found") {
- http.Error(w, "File not found", http.StatusNotFound)
+ c.AbortWithStatusJSON(http.StatusNotFound, gin.H{"error": "File not found"})
} else {
- http.Error(w, "Failed to download file: "+err.Error(), http.StatusInternalServerError)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "Failed to download file: " + err.Error()})
}
return
}
defer reader.Close()
// Set response headers
- w.Header().Set("Content-Type", fileInfo.ContentType)
- w.Header().Set("Content-Length", strconv.FormatInt(fileInfo.Size, 10))
- w.Header().Set("Content-Disposition", "attachment; filename=\""+fileInfo.Filename+"\"")
-
- w.WriteHeader(http.StatusOK)
+ c.Header("Content-Type", fileInfo.ContentType)
+ c.Header("Content-Length", strconv.FormatInt(fileInfo.Size, 10))
+ c.Header("Content-Disposition", "attachment; filename=\""+fileInfo.Filename+"\"")
+ c.Status(http.StatusOK)
// Stream file content
- _, err = io.Copy(w, reader)
+ _, err = io.Copy(c.Writer, reader)
if err != nil {
// Log error but don't send HTTP error as headers are already sent
// log.Printf("Error streaming file: %v", err)
return
}
}
-
-func writeJson(w http.ResponseWriter, statusCode int, data interface{}) {
- bytes, _ := json.Marshal(data)
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Content-Length", strconv.FormatInt(int64(len(bytes)), 10))
- w.WriteHeader(statusCode)
- _, _ = w.Write(bytes)
-}
diff --git a/go/pkg/controller/openai.go b/go/pkg/controller/openai.go
new file mode 100644
index 0000000..76a426b
--- /dev/null
+++ b/go/pkg/controller/openai.go
@@ -0,0 +1,152 @@
+package controller
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/gin-gonic/gin"
+ "github.com/jc-lab/distworker/go/internal/eventstream"
+ "github.com/jc-lab/distworker/go/internal/openai"
+ "github.com/jc-lab/distworker/go/pkg/api"
+ "io"
+ "log/slog"
+ "net/http"
+)
+
+func (s *Server) openaiGenerateHandler(c *gin.Context) {
+ var req openai.ChatCompletionRequest
+
+ createTaskRequest := &api.CreateTaskRequest{
+ Queue: s.config.Server.API.OpenAi.CompletionsQueue,
+ }
+
+ if _, err := doubleDecode(c.Request.Body, &req, &createTaskRequest.Input); err != nil {
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": err.Error()})
+ return
+ }
+
+ task, listener, err := s.CreateTask(c.Request.Context(), createTaskRequest, true)
+ if err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+
+ if req.Stream {
+ progressCh := listener.Progress(c.Request.Context())
+ sseCh := make(chan any)
+
+ go func() {
+ defer close(sseCh)
+ for progress := range progressCh {
+ sseCh <- progress.Data
+ }
+ }()
+
+ openaiStreamResponse(c, sseCh)
+ } else {
+ task, err = listener.Wait(c.Request.Context())
+ if err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+ c.JSON(http.StatusOK, task.Result)
+ }
+}
+
+func (s *Server) openaiEmbedHandler(c *gin.Context) {
+ var req openai.EmbedRequest
+
+ createTaskRequest := &api.CreateTaskRequest{
+ Queue: s.config.Server.API.OpenAi.EmbeddingsQueue,
+ }
+
+ if _, err := doubleDecode(c.Request.Body, &req, &createTaskRequest.Input); err != nil {
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": err.Error()})
+ return
+ }
+
+ task, listener, err := s.CreateTask(c.Request.Context(), createTaskRequest, true)
+ if err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+
+ task, err = listener.Wait(c.Request.Context())
+ if err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+ c.JSON(http.StatusOK, task.Result)
+}
+
+func (s *Server) openaiChatHandler(c *gin.Context) {
+ var req openai.ChatCompletionRequest
+
+ createTaskRequest := &api.CreateTaskRequest{
+ Queue: s.config.Server.API.OpenAi.ChatCompletionsQueue,
+ }
+
+ if _, err := doubleDecode(c.Request.Body, &req, &createTaskRequest.Input); err != nil {
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": err.Error()})
+ return
+ }
+
+ task, listener, err := s.CreateTask(c.Request.Context(), createTaskRequest, true)
+ if err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+
+ if req.Stream {
+ progressCh := listener.Progress(c.Request.Context())
+ sseCh := make(chan any)
+
+ go func() {
+ defer close(sseCh)
+ for progress := range progressCh {
+ sseCh <- progress.Data
+ }
+ }()
+
+ openaiStreamResponse(c, sseCh)
+ } else {
+ task, err = listener.Wait(c.Request.Context())
+ if err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+ c.JSON(http.StatusOK, task.Result)
+ }
+}
+
+func doubleDecode(r io.Reader, out1 interface{}, out2 interface{}) ([]byte, error) {
+ raw, err := io.ReadAll(r)
+ if err != nil {
+ return nil, err
+ }
+ if err = json.Unmarshal(raw, out1); err != nil {
+ return nil, err
+ }
+ if err = json.Unmarshal(raw, out2); err != nil {
+ return nil, err
+ }
+ return raw, nil
+}
+
+func openaiStreamResponse(c *gin.Context, ch chan any) {
+ c.Header("Content-Type", "application/x-ndjson")
+ sw := eventstream.NewWriter(c.Writer)
+
+ c.Stream(func(w io.Writer) bool {
+ val, ok := <-ch
+ if !ok {
+ _ = sw.Close()
+ return false
+ }
+ if err := sw.JSON(val); err != nil {
+ slog.Info(fmt.Sprintf("openaiStreamResponse: json.Marshal failed with %s", err))
+ return false
+ }
+
+ return true
+ })
+}
diff --git a/go/pkg/controller/server.go b/go/pkg/controller/server.go
index 1c6c4e9..b219a63 100644
--- a/go/pkg/controller/server.go
+++ b/go/pkg/controller/server.go
@@ -35,7 +35,8 @@ import (
"context"
"errors"
"fmt"
- "github.com/gorilla/handlers"
+ "github.com/gin-contrib/cors"
+ "github.com/gin-gonic/gin"
"github.com/jc-lab/distworker/go/internal/provisioner"
"github.com/jc-lab/distworker/go/internal/version"
config2 "github.com/jc-lab/distworker/go/pkg/controller/config"
@@ -48,7 +49,8 @@ import (
"github.com/jc-lab/distworker/go/pkg/types"
errors2 "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
- httpSwagger "github.com/swaggo/http-swagger/v2"
+ swaggerFiles "github.com/swaggo/files"
+ ginSwagger "github.com/swaggo/gin-swagger"
"go.uber.org/zap"
"log"
"net"
@@ -57,7 +59,6 @@ import (
"sync"
"time"
- "github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
@@ -72,9 +73,9 @@ type Server struct {
rootLogger *zap.Logger
logger *zap.SugaredLogger
- router *mux.Router
- wsRouter *mux.Router
- upgrader websocket.Upgrader
+ apiServer *gin.Engine
+ workerServer *gin.Engine
+ upgrader websocket.Upgrader
// Metrics
metrics *metrics.Metrics
@@ -108,10 +109,10 @@ func NewServer(config *config2.Config, options ...Option) (*Server, error) {
// Create server
server := &Server{
- config: config,
- router: mux.NewRouter(),
- wsRouter: mux.NewRouter(),
- metrics: appMetrics,
+ config: config,
+ apiServer: gin.Default(),
+ workerServer: gin.Default(),
+ metrics: appMetrics,
metricsHandler: promhttp.InstrumentMetricHandler(
promRegistry, promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{}),
),
@@ -186,19 +187,16 @@ func (s *Server) Start() error {
// Start API server
apiServer := &http.Server{
Addr: fmt.Sprintf(":%d", s.config.Server.API.Port),
- Handler: s.router,
- ReadTimeout: 15 * time.Second,
- WriteTimeout: 15 * time.Second,
- IdleTimeout: 60 * time.Second,
+ Handler: s.apiServer.Handler(),
+ ReadTimeout: time.Duration(s.config.Server.API.ReadTimeout) * time.Second,
+ WriteTimeout: time.Duration(s.config.Server.API.WriteTimeout) * time.Second,
+ IdleTimeout: time.Duration(s.config.Server.API.IdleTimeout) * time.Second,
}
// Start Worker server
workerServer := &http.Server{
- Addr: fmt.Sprintf(":%d", s.config.Server.Worker.Port),
- Handler: s.wsRouter,
- ReadTimeout: 15 * time.Second,
- WriteTimeout: 15 * time.Second,
- IdleTimeout: 60 * time.Second,
+ Addr: fmt.Sprintf(":%d", s.config.Server.Worker.Port),
+ Handler: s.workerServer.Handler(),
}
if s.config.ControllerSetting.WorkerAccessibleBaseUrl == "" {
@@ -222,13 +220,6 @@ func (s *Server) Start() error {
go func() {
defer apiListener.Close()
- s.router.PathPrefix("/swagger/").Handler(httpSwagger.Handler(
- //httpSwagger.URL("/swagger/doc.json"),
- httpSwagger.DeepLinking(true),
- httpSwagger.DocExpansion("none"),
- httpSwagger.DomID("swagger-ui"),
- )).Methods(http.MethodGet)
-
log.Printf("Starting API server on port %d", s.config.Server.API.Port)
if err := apiServer.Serve(apiListener); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Printf("API server error: %v", err)
@@ -297,33 +288,45 @@ func (s *Server) Stop() {
// setupAPIRoutes sets up HTTP API routes
func (s *Server) setupAPIRoutes() {
- api := s.router.PathPrefix("/api/v1").Subrouter()
- api.Use(handlers.CORS(
- handlers.AllowedMethods([]string{"GET"}),
- ))
-
- api.HandleFunc("/tasks", s.handleCreateTask).Methods("POST")
- api.HandleFunc("/tasks", s.handleListTasks).Methods("GET")
- api.HandleFunc("/tasks/{task_id}", s.handleGetTask).Methods("GET")
- api.HandleFunc("/tasks/{task_id}", s.handleDeleteTask).Methods("DELETE")
-
- api.HandleFunc("/queues", s.handleCreateQueue).Methods("POST")
- api.HandleFunc("/queues", s.handleListQueues).Methods("GET")
- api.HandleFunc("/queues/{queue_name}", s.handleGetQueue).Methods("GET")
- api.HandleFunc("/queues/{queue_name}", s.handleUpdateQueue).Methods("PUT")
- api.HandleFunc("/queues/{queue_name}", s.handleDeleteQueue).Methods("DELETE")
- api.HandleFunc("/queues/{queue_name}/stats", s.handleGetQueueStats).Methods("GET")
+ api := s.apiServer.Group("/api/v1")
+ api.Use(cors.New(cors.Config{
+ AllowMethods: []string{"GET"},
+ AllowAllOrigins: true,
+ }))
+
+ api.POST("/tasks", s.handleCreateTask)
+ api.GET("/tasks", s.handleListTasks)
+ api.GET("/tasks/{task_id}", s.handleGetTask)
+ api.DELETE("/tasks/{task_id}", s.handleDeleteTask)
+
+ api.POST("/queues", s.handleCreateQueue)
+ api.GET("/queues", s.handleListQueues)
+ api.GET("/queues/{queue_name}", s.handleGetQueue)
+ api.PUT("/queues/{queue_name}", s.handleUpdateQueue)
+ api.DELETE("/queues/{queue_name}", s.handleDeleteQueue)
+ api.GET("/queues/{queue_name}/stats", s.handleGetQueueStats)
// Worker routes
- api.HandleFunc("/workers", s.handleListWorkers).Methods("GET")
- api.HandleFunc("/workers/{worker_id}", s.handleDeleteWorker).Methods("DELETE")
+ api.GET("/workers", s.handleListWorkers)
+ api.DELETE("/workers/{worker_id}", s.handleDeleteWorker)
// Provisioner routes
- api.HandleFunc("/provisioners", s.handleListProvisioners).Methods("GET")
+ api.GET("/provisioners", s.handleListProvisioners)
+
+ // OpenAI Compatibility routes
+ if s.config.Server.API.OpenAi.Enabled {
+ openaiRouter := s.apiServer.Group("/openai")
+ openaiRouter.POST("/v1/chat/completions", s.openaiChatHandler)
+ openaiRouter.POST("/v1/completions", s.openaiGenerateHandler)
+ openaiRouter.POST("/v1/embeddings", s.openaiEmbedHandler)
+ }
// Health routes
- s.router.HandleFunc("/health", s.handleHealth).Methods("GET")
- s.router.HandleFunc("/metrics", s.handleMetrics).Methods("GET")
+ s.apiServer.GET("/health", s.handleHealth)
+ s.apiServer.GET("/metrics", s.handleMetrics)
+
+ // Swagger routes
+ s.apiServer.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
}
// setupWorkerRoutes sets up WebSocket and worker-specific routes
@@ -337,7 +340,7 @@ func (s *Server) setupWorkerRoutes() {
// @Success 101 {string} string "Switching Protocols"
// @Failure 400 {object} api.ErrorResponse
// @Router /worker/v1/ws [get]
- s.wsRouter.HandleFunc("/worker/v1/ws", s.handleWorkerWebSocket)
+ s.workerServer.GET("/worker/v1/ws", s.handleWorkerWebSocket)
// File download endpoint
// @Summary Download file
@@ -352,7 +355,7 @@ func (s *Server) setupWorkerRoutes() {
// @Failure 404 {object} api.ErrorResponse
// @Failure 500 {object} api.ErrorResponse
// @Router /worker/v1/file/{file_id} [get]
- s.wsRouter.HandleFunc("/worker/v1/file/{file_id}", s.handleFileDownload).Methods("GET")
+ s.workerServer.GET("/worker/v1/file/{file_id}", s.handleFileDownload)
}
// initializeQueues creates predefined queues from configuration
diff --git a/go/pkg/controller/subscriber/locker.go b/go/pkg/controller/subscriber/locker.go
new file mode 100644
index 0000000..c634123
--- /dev/null
+++ b/go/pkg/controller/subscriber/locker.go
@@ -0,0 +1,7 @@
+package subscriber
+
+// notLocker is a sync.Locker whose Lock and Unlock methods are nops.
+type notLocker struct{}
+
+func (n notLocker) Lock() {}
+func (n notLocker) Unlock() {}
diff --git a/go/pkg/controller/subscriber/subscriber.go b/go/pkg/controller/subscriber/subscriber.go
new file mode 100644
index 0000000..3aea500
--- /dev/null
+++ b/go/pkg/controller/subscriber/subscriber.go
@@ -0,0 +1,138 @@
+// distworker
+// Copyright (C) 2025 JC-Lab
+//
+// SPDX-License-Identifier: AGPL-3.0-only
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package subscriber
+
+import (
+ "context"
+ "sync"
+)
+
+type Listener[T interface{}, P interface{}] interface {
+ Progress(ctx context.Context) chan P
+ // Wait for finish
+ Wait(ctx context.Context) (T, error)
+}
+
+type Bus[T interface{}, P interface{}] interface {
+ EmitProgress(id string, data P) bool
+ Finish(id string, data T) bool
+ Listener(id string) (Listener[T, P], error)
+}
+
+type eventTopic[T interface{}, P interface{}] struct {
+ mu sync.Mutex
+ cond *sync.Cond
+ doneCh chan struct{}
+ done bool
+ progress []P
+ data T
+}
+
+type busImpl[T interface{}, P interface{}] struct {
+ mu sync.Mutex
+ topics map[string]*eventTopic[T, P]
+}
+
+var _ Bus[any, any] = (*busImpl[any, any])(nil)
+
+func New[T interface{}, P interface{}]() Bus[T, P] {
+ return &busImpl[T, P]{
+ topics: make(map[string]*eventTopic[T, P]),
+ }
+}
+
+func (b *busImpl[T, P]) EmitProgress(id string, data P) bool {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ topic := b.topics[id]
+ if topic != nil {
+ topic.progress = append(topic.progress, data)
+ topic.cond.Broadcast()
+ }
+ return topic != nil
+}
+
+func (b *busImpl[T, P]) Finish(id string, data T) bool {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ topic := b.topics[id]
+ if topic != nil {
+ topic.done = true
+ topic.data = data
+ delete(b.topics, id)
+ close(topic.doneCh)
+ topic.cond.Broadcast()
+ }
+ return topic != nil
+}
+
+func (b *busImpl[T, P]) Listener(id string) (Listener[T, P], error) {
+ return b.getOrCreateTopic(id), nil
+}
+
+func newEventTopic[T interface{}, P interface{}]() *eventTopic[T, P] {
+ topic := &eventTopic[T, P]{
+ doneCh: make(chan struct{}),
+ }
+ topic.cond = sync.NewCond(¬Locker{})
+ return topic
+}
+
+func (t *eventTopic[T, P]) Wait(ctx context.Context) (T, error) {
+ select {
+ case <-ctx.Done():
+ var empty T
+ return empty, ctx.Err()
+ case _, _ = <-t.doneCh:
+ return t.data, nil
+ }
+}
+
+func (t *eventTopic[T, P]) Progress(ctx context.Context) chan P {
+ ch := make(chan P)
+ go func() {
+ defer close(ch)
+
+ progressIndex := 0
+ for ctx.Err() == nil && !t.done {
+ progressSize := len(t.progress)
+ for progressIndex < progressSize {
+ ch <- t.progress[progressIndex]
+ progressIndex++
+ }
+ t.cond.Wait()
+ }
+ }()
+ return ch
+}
+
+func (b *busImpl[T, P]) getOrCreateTopic(id string) *eventTopic[T, P] {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if topic, exists := b.topics[id]; exists {
+ return topic
+ }
+
+ topic := newEventTopic[T, P]()
+ b.topics[id] = topic
+ return topic
+}
diff --git a/go/pkg/controller/worker/manager.go b/go/pkg/controller/worker/manager.go
index 61abe82..6a9e302 100644
--- a/go/pkg/controller/worker/manager.go
+++ b/go/pkg/controller/worker/manager.go
@@ -30,7 +30,7 @@ import (
"github.com/jc-lab/distworker/go/internal/worker"
"github.com/jc-lab/distworker/go/pkg/controller/config"
"github.com/jc-lab/distworker/go/pkg/controller/database"
- "github.com/jc-lab/distworker/go/pkg/controller/eventbus"
+ "github.com/jc-lab/distworker/go/pkg/controller/subscriber"
models2 "github.com/jc-lab/distworker/go/pkg/models"
"github.com/jc-lab/distworker/go/pkg/types"
"github.com/pkg/errors"
@@ -63,7 +63,7 @@ type Manager struct {
queueConsuming bool
taskAssignQueues [queueBuckets]*blocking_dequeue.BlockingDequeue[*models2.Task]
- taskEventBus eventbus.Bus[*models2.Task]
+ taskEventBus subscriber.Bus[*models2.Task, *models2.TaskProgress]
processingTasks map[string]*models2.Task
@@ -86,7 +86,7 @@ func NewManager(rootCtx context.Context, db database.Database, provisionerManage
connections: make(map[string]Connection),
taskAssignChan: make(chan *models2.Task, 100),
- taskEventBus: eventbus.New[*models2.Task](),
+ taskEventBus: subscriber.New[*models2.Task, *models2.TaskProgress](),
processingTasks: make(map[string]*models2.Task),
httpClient: &http.Client{
@@ -379,8 +379,8 @@ func (wm *Manager) WaitForWorkerReady(ctx context.Context, workerId string) (wor
}
}
-func (wm *Manager) WaitTask(ctx context.Context, taskId string) (*models2.Task, error) {
- return wm.taskEventBus.Listen(ctx, taskId)
+func (wm *Manager) GetTaskListener(taskId string) (subscriber.Listener[*models2.Task, *models2.TaskProgress], error) {
+ return wm.taskEventBus.Listener(taskId)
}
func (wm *Manager) workerIsIdle(conn Connection) bool {
@@ -489,11 +489,13 @@ func (wm *Manager) handleTaskProgress(conn Connection, progress *protocol2.TaskP
if task.Metadata == nil {
task.Metadata = make(map[string]interface{})
}
- task.Metadata["progress"] = progress.Progress
- task.Metadata["progress_message"] = progress.Message
if progress.Data != nil {
- task.Metadata["progress_data"] = progress.Data.AsMap()
+ taskProgress := &models2.TaskProgress{
+ Message: progress.Message,
+ Data: progress.GetData().AsMap(),
+ }
+ wm.taskEventBus.EmitProgress(task.Id, taskProgress)
}
return wm.db.GetTaskRepository().Update(wm.ctx, task)
@@ -695,7 +697,7 @@ func (wm *Manager) taskComplete(task *models2.Task) {
_ = taskRepository.Update(wm.ctx, task)
- wm.taskEventBus.Publish(task.Id, task)
+ wm.taskEventBus.Finish(task.Id, task)
if task.Status == types.TaskStatusPending && wm.queueConsuming {
wm.EnqueueTask(task)
diff --git a/go/pkg/controller/worker/websocket/listener.go b/go/pkg/controller/worker/websocket/listener.go
index faea339..d92cbb1 100644
--- a/go/pkg/controller/worker/websocket/listener.go
+++ b/go/pkg/controller/worker/websocket/listener.go
@@ -51,7 +51,7 @@ func NewListener(
}
func (l *Listener) HandleConnection(conn *websocket.Conn) {
- go l.serveConnection(conn)
+ l.serveConnection(conn)
}
func (l *Listener) serveConnection(conn *websocket.Conn) {
diff --git a/go/pkg/models/models.go b/go/pkg/models/models.go
index ee3d443..7e97e3b 100644
--- a/go/pkg/models/models.go
+++ b/go/pkg/models/models.go
@@ -49,6 +49,11 @@ type Task struct {
WebhookUrl string `bson:"webhook_url" json:"webhook_url"`
}
+type TaskProgress struct {
+ Message string
+ Data map[string]interface{}
+}
+
// TaskError represents an error that occurred during task execution
type TaskError struct {
Code types.TaskErrorCode `bson:"code" json:"code"`
diff --git a/python/distworker/__init__.py b/python/distworker/__init__.py
index a50c895..2d96da0 100644
--- a/python/distworker/__init__.py
+++ b/python/distworker/__init__.py
@@ -7,18 +7,20 @@
__version__ = "1.0.0"
__author__ = "JC-Lab"
-from .client.worker import Worker
-from .client.task import Task
from .client.exceptions import (
DistWorkerError,
ConnectionError,
AuthenticationError,
TaskError,
)
+from .client.request import Request
+from .client.task import Task
+from .client.worker import Worker
__all__ = [
"Worker",
- "Task",
+ "Task",
+ "Request",
"DistWorkerError",
"ConnectionError",
"AuthenticationError",
diff --git a/python/distworker/client/request.py b/python/distworker/client/request.py
new file mode 100644
index 0000000..2cbd1e9
--- /dev/null
+++ b/python/distworker/client/request.py
@@ -0,0 +1,15 @@
+from typing import Callable, Optional, Dict
+
+from .task import Task
+
+
+class Request:
+ _send_progress: Callable[[float, str, Optional[Dict]], None]
+
+ task: Task
+
+ def __init__(self, task: Task):
+ self.task = task
+
+ def progress(self, progress: float = 0, message: str = "", data: Optional[Dict] = None):
+ return self._send_progress(progress, message, data)
\ No newline at end of file
diff --git a/python/distworker/client/worker.py b/python/distworker/client/worker.py
index 78cde8c..b3d7aac 100644
--- a/python/distworker/client/worker.py
+++ b/python/distworker/client/worker.py
@@ -16,7 +16,8 @@
from websockets.exceptions import ConnectionClosed, InvalidStatusCode
from .auth import generate_websocket_signature, DATE_ONLY_FORMAT, DATE_FORMAT
-from .exceptions import ConnectionError, AuthenticationError, TaskError, ProtocolError
+from .exceptions import ConnectionError, AuthenticationError, ProtocolError
+from .request import Request
from .task import Task
from ..protocol import protocol_pb2
@@ -30,7 +31,7 @@ class Worker:
Connects to the DistWorker controller and processes assigned tasks.
"""
- task_handler: Callable[[Task], Awaitable[Dict[str, Any]]]
+ task_handler: Callable[[Request], Awaitable[Dict[str, Any]]]
def __init__(
self,
@@ -81,7 +82,7 @@ def __init__(
self._heartbeat_task: Optional[asyncio.Task] = None
self._connection_task: Optional[asyncio.Task] = None
- async def _default_task_handler(self, task: Task) -> Dict[str, Any]:
+ async def _default_task_handler(self, req: Request) -> Dict[str, Any]:
raise NotImplementedError()
async def run(self):
@@ -334,13 +335,15 @@ async def _handle_task_assignment(self, task_assign: protocol_pb2.TaskAssign):
'storage_url': f.storage_url
} for f in task_assign.files]
)
-
+
+ req = Request(task = task)
+
self.current_task = task
logger.info(f"Processing task {task.task_id} from queue {task.queue}")
# Process task in background
- asyncio.create_task(self._process_task(task))
+ asyncio.create_task(self._process_task(req))
except Exception as e:
logger.error(f"Task assignment error: {e}")
@@ -376,19 +379,46 @@ def _match_parts(self, pattern_parts: List[str], queue_parts: List[str]) -> bool
else:
return False
- async def _process_task(self, task: Task):
+ async def _process_task(self, req: Request):
"""Process task with handler"""
try:
+ task_id = req.task.task_id
+
+ progress_queue = asyncio.Queue()
+
+ async def progress_worker():
+ while True:
+ try:
+ item = await progress_queue.get()
+ if item is None: # Exit signal
+ break
+
+ progress, message, data = item
+ await self._send_task_progress(task_id, progress, message, data)
+ except Exception as e:
+ logger.error(f"Error processing progress for task {task_id}: {e}")
+ finally:
+ progress_queue.task_done()
+
+ def send_progress(progress: float, message: str = "", data: Optional[Dict] = None):
+ progress_queue.put_nowait((progress, message, data))
+
+ req._send_progress = send_progress
+
# Call task handler
- result = await self.task_handler(task)
+ asyncio.create_task(progress_worker())
+ result = await self.task_handler(req)
+
+ await progress_queue.join()
+ await progress_queue.put(None) # Exit signal
# Send completion
- await self._send_task_complete(task.task_id, result)
- logger.info(f"Task {task.task_id} completed successfully")
+ await self._send_task_complete(task_id, result)
+ logger.info(f"Task {task_id} completed successfully")
except Exception as e:
- logger.error(f"Task {task.task_id} failed: {e}")
- await self._send_task_failed(task.task_id, "HANDLER_ERROR", str(e))
+ logger.error(f"Task {task_id} failed: {e}")
+ await self._send_task_failed(task_id, "HANDLER_ERROR", str(e))
finally:
self.current_task = None
self.last_processed = time.monotonic()
@@ -476,17 +506,3 @@ async def _send_message(self, message: protocol_pb2.WebSocketMessage):
await self.websocket.send(data)
except Exception as e:
raise ConnectionError(f"Failed to send message: {e}")
-
- async def send_task_progress(self, progress: float, message: str = "", data: Optional[Dict] = None):
- """
- Send progress update for current task
-
- Args:
- progress: Progress percentage (0.0 to 100.0)
- message: Optional progress message
- data: Optional progress data
- """
- if not self.current_task:
- raise TaskError("No current task to update progress for")
-
- await self._send_task_progress(self.current_task.task_id, progress, message, data)
\ No newline at end of file
diff --git a/python/examples/modal-worker.py b/python/examples/modal-worker.py
index 2da3e4a..e558960 100644
--- a/python/examples/modal-worker.py
+++ b/python/examples/modal-worker.py
@@ -1,21 +1,275 @@
import os
+import time
+import uuid
+from typing import List, Optional
import modal
app = modal.App("distworker-sample-worker")
worker_image = modal.Image.debian_slim().pip_install(
- "distworker-sdk==0.0.2rc1",
- "protobuf>=6.31.1",
+ "distworker-sdk==0.0.3rc3",
+ "protobuf>=4.31.1",
+ "transformers>=4.35.0",
+ "torch>=2.1.0",
+ "accelerate>=0.24.0",
+ "sentencepiece>=0.1.99",
+ "tiktoken>=0.5.0",
)
@app.function(
image=worker_image,
- timeout=1800
+ timeout=1800,
+ gpu="T4",
+ memory=8192
)
async def worker(controller_url: str, provisioner: str, worker_id: str, worker_token: str):
- import asyncio
from typing import Dict, Any
- from distworker import Worker, Task
+ from distworker import Worker, Request
+ import torch
+ from transformers import AutoTokenizer, AutoModelForCausalLM
+ import tiktoken
+
+ # 가벼운 모델 로드 (GPT-2 또는 다른 소형 모델)
+ print("Loading model...")
+ model_name = "microsoft/DialoGPT-small" # 대화형 모델
+ tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side='left')
+ model = AutoModelForCausalLM.from_pretrained(
+ model_name,
+ torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
+ device_map="auto" if torch.cuda.is_available() else None
+ )
+
+ # pad token 설정
+ if tokenizer.pad_token is None:
+ tokenizer.pad_token = tokenizer.eos_token
+
+ print("Model loaded successfully!")
+
+ def count_tokens(text: str) -> int:
+ """토큰 수 계산"""
+ try:
+ # tiktoken을 사용해 대략적인 토큰 수 계산
+ encoding = tiktoken.get_encoding("cl100k_base")
+ return len(encoding.encode(text))
+ except:
+ # fallback: 단어 수 기반 추정
+ return len(text.split()) * 1.3
+
+ def generate_text(prompt: str, max_tokens: int = 100, temperature: float = 0.7,
+ stop_sequences: Optional[List[str]] = None) -> str:
+ """텍스트 생성"""
+ try:
+ inputs = tokenizer.encode(prompt, return_tensors="pt")
+ if torch.cuda.is_available():
+ inputs = inputs.cuda()
+
+ with torch.no_grad():
+ outputs = model.generate(
+ inputs,
+ max_new_tokens=min(max_tokens, 512), # 최대 토큰 제한
+ temperature=max(temperature, 0.1),
+ do_sample=True,
+ pad_token_id=tokenizer.pad_token_id,
+ eos_token_id=tokenizer.eos_token_id,
+ attention_mask=torch.ones_like(inputs)
+ )
+
+ generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
+
+ # 원본 프롬프트 제거
+ if generated_text.startswith(prompt):
+ generated_text = generated_text[len(prompt):].strip()
+
+ # stop sequences 처리
+ if stop_sequences:
+ for stop_seq in stop_sequences:
+ if stop_seq in generated_text:
+ generated_text = generated_text.split(stop_seq)[0]
+
+ return generated_text
+
+ except Exception as e:
+ print(f"Generation error: {e}")
+ return "I apologize, but I encountered an error while generating a response."
+
+ def handle_chat_completions(req: Request, data: Dict[str, Any]) -> Dict[str, Any]:
+ """Chat completions API 처리"""
+ messages = data.get("messages", [])
+ max_tokens = data.get("max_tokens", 100)
+ temperature = data.get("temperature", 0.7)
+ stream = data.get("stream", False)
+ model_name = data.get("model", "dialogpt-small")
+ stop = data.get("stop", [])
+
+ # 메시지를 프롬프트로 변환
+ prompt = ""
+ for message in messages:
+ role = message.get("role", "user")
+ content = message.get("content", "")
+
+ if role == "system":
+ prompt += f"System: {content}\n"
+ elif role == "user":
+ prompt += f"Human: {content}\n"
+ elif role == "assistant":
+ prompt += f"Assistant: {content}\n"
+
+ prompt += "Assistant: "
+
+ # 텍스트 생성
+ generated_text = generate_text(
+ prompt,
+ max_tokens=max_tokens,
+ temperature=temperature,
+ stop_sequences=stop if isinstance(stop, list) else ([stop] if stop else [])
+ )
+
+ # 응답 생성
+ response_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
+ created = int(time.time())
+
+ prompt_tokens = count_tokens(prompt)
+ completion_tokens = count_tokens(generated_text)
+ total_tokens = prompt_tokens + completion_tokens
+
+ if stream:
+ # 스트리밍 응답
+ words = generated_text.split()
+ for i, word in enumerate(words):
+ chunk = {
+ "id": response_id,
+ "object": "chat.completion.chunk",
+ "created": created,
+ "model": model_name,
+ "choices": [{
+ "index": 0,
+ "delta": {
+ "content": word + (" " if i < len(words) - 1 else "")
+ },
+ "finish_reason": None
+ }]
+ }
+ req.progress(data = chunk)
+
+ # 마지막 청크
+ final_chunk = {
+ "id": response_id,
+ "object": "chat.completion.chunk",
+ "created": created,
+ "model": model_name,
+ "choices": [{
+ "index": 0,
+ "delta": {},
+ "finish_reason": "stop"
+ }]
+ }
+ req.progress(data = final_chunk)
+
+ return {}
+ else:
+ # 일반 응답
+ return {
+ "id": response_id,
+ "object": "chat.completion",
+ "created": created,
+ "model": model_name,
+ "choices": [{
+ "index": 0,
+ "message": {
+ "role": "assistant",
+ "content": generated_text
+ },
+ "finish_reason": "stop"
+ }],
+ "usage": {
+ "prompt_tokens": prompt_tokens,
+ "completion_tokens": completion_tokens,
+ "total_tokens": total_tokens
+ }
+ }
+
+ def handle_completions(req: Request, data: Dict[str, Any]) -> Dict[str, Any]:
+ """Text completions API 처리"""
+ prompt = data.get("prompt", "")
+ max_tokens = data.get("max_tokens", 100)
+ temperature = data.get("temperature", 0.7)
+ stream = data.get("stream", False)
+ model_name = data.get("model", "dialogpt-small")
+ stop = data.get("stop", [])
+ n = data.get("n", 1)
+
+ response_id = f"cmpl-{uuid.uuid4().hex[:29]}"
+ created = int(time.time())
+
+ choices = []
+
+ for i in range(n):
+ generated_text = generate_text(
+ prompt,
+ max_tokens=max_tokens,
+ temperature=temperature,
+ stop_sequences=stop if isinstance(stop, list) else ([stop] if stop else [])
+ )
+
+ if stream:
+ # 스트리밍 응답
+ words = generated_text.split()
+ for j, word in enumerate(words):
+ chunk = {
+ "id": response_id,
+ "object": "text_completion",
+ "created": created,
+ "model": model_name,
+ "choices": [{
+ "text": word + (" " if j < len(words) - 1 else ""),
+ "index": i,
+ "logprobs": None,
+ "finish_reason": None
+ }]
+ }
+ req.progress(data = chunk)
+
+ # 마지막 청크
+ final_chunk = {
+ "id": response_id,
+ "object": "text_completion",
+ "created": created,
+ "model": model_name,
+ "choices": [{
+ "text": "",
+ "index": i,
+ "logprobs": None,
+ "finish_reason": "stop"
+ }]
+ }
+ req.progress(data = final_chunk)
+ else:
+ choices.append({
+ "text": generated_text,
+ "index": i,
+ "logprobs": None,
+ "finish_reason": "stop"
+ })
+
+ if stream:
+ return {}
+ else:
+ prompt_tokens = count_tokens(prompt)
+ completion_tokens = sum(count_tokens(choice["text"]) for choice in choices)
+ total_tokens = prompt_tokens + completion_tokens
+
+ return {
+ "id": response_id,
+ "object": "text_completion",
+ "created": created,
+ "model": model_name,
+ "choices": choices,
+ "usage": {
+ "prompt_tokens": prompt_tokens,
+ "completion_tokens": completion_tokens,
+ "total_tokens": total_tokens
+ }
+ }
worker = Worker(
controller_url=controller_url,
@@ -26,27 +280,39 @@ async def worker(controller_url: str, provisioner: str, worker_id: str, worker_t
heartbeat_interval=5.0
)
- async def task_handler(task: Task) -> Dict[str, Any]:
- print("task_handler: ", task)
- output_message = task.get_input("message") * 10
-
- # await worker.send_task_progress(30.0, "processing")
- #
- # # Simulate some processing time
- # await asyncio.sleep(1)
- #
- # await worker.send_task_progress(90.0, "finishing")
- #
- # # Simulate some processing time
- # await asyncio.sleep(1)
+ async def task_handler(req: Request) -> Dict[str, Any]:
+ task = req.task
+ print(f"Processing task: queue={task.queue}, task_id={task.task_id}")
- return {
- "message": output_message,
- }
+ try:
+ if task.queue == "example/chat":
+ return handle_chat_completions(req, task.input_data)
+ elif task.queue == "example/completions":
+ return handle_completions(req, task.input_data)
+ else:
+ error_msg = f"Unknown queue: {task.queue}"
+ print(error_msg)
+ return {
+ "error": {
+ "message": error_msg,
+ "type": "invalid_request_error",
+ "code": "unknown_queue"
+ }
+ }
+ except Exception as e:
+ error_msg = f"Task processing error: {str(e)}"
+ print(error_msg)
+ return {
+ "error": {
+ "message": error_msg,
+ "type": "internal_server_error",
+ "code": "processing_error"
+ }
+ }
worker.task_handler = task_handler
- print(f"worker start: controller_url={controller_url}, worker_id={worker_id}")
+ print(f"Worker started: controller_url={controller_url}, worker_id={worker_id}")
await worker.run()
@app.local_entrypoint()
@@ -55,4 +321,4 @@ def main():
provisioner = os.getenv('DISTWORKER_PROVISIONER', 'modal')
worker_id = os.environ['DISTWORKER_WORKER_ID']
worker_token = os.environ['DISTWORKER_WORKER_TOKEN']
- worker.remote(controller_url, provisioner, worker_id, worker_token)
+ worker.remote(controller_url, provisioner, worker_id, worker_token)
\ No newline at end of file
diff --git a/tools.go b/tools.go
index e65cc30..f26e910 100644
--- a/tools.go
+++ b/tools.go
@@ -21,4 +21,9 @@
package distworker
+import (
+ _ "github.com/go-swagger/go-swagger/cmd/swagger"
+ _ "github.com/swaggo/swag"
+)
+
//go:generate swag i -d ./go/pkg/controller/ -g server.go -o ./go/cmd/controller/docs/ --parseDependency --parseInternal