diff --git a/build/db_schemas/scd.libsonnet b/build/db_schemas/scd.libsonnet index d5544d3fb..a7e88a01f 100644 --- a/build/db_schemas/scd.libsonnet +++ b/build/db_schemas/scd.libsonnet @@ -10,5 +10,6 @@ "upto-v3.0.0-add_inverted_indices.sql": importstr "scd/upto-v3.0.0-add_inverted_indices.sql", "upto-v3.1.0-create_uss_availability.sql": importstr "scd/upto-v3.1.0-create_uss_availability.sql", "upto-v3.2.0-add_ovn_columns.sql": importstr "scd/upto-v3.2.0-add_ovn_columns.sql", + "upto-v3.3.0-add_locks.sql": importstr "scd/upto-v3.3.0-add_locks.sql", }, } diff --git a/build/db_schemas/scd/downfrom-v3.3.0-remove_locks.sql b/build/db_schemas/scd/downfrom-v3.3.0-remove_locks.sql new file mode 100644 index 000000000..6954118a6 --- /dev/null +++ b/build/db_schemas/scd/downfrom-v3.3.0-remove_locks.sql @@ -0,0 +1,5 @@ +DROP TABLE IF EXISTS scd_locks; + +UPDATE schema_versions +SET schema_version = 'v3.2.0' +WHERE onerow_enforcer = TRUE; diff --git a/build/db_schemas/scd/upto-v3.3.0-add_locks.sql b/build/db_schemas/scd/upto-v3.3.0-add_locks.sql new file mode 100644 index 000000000..a84b66411 --- /dev/null +++ b/build/db_schemas/scd/upto-v3.3.0-add_locks.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS scd_locks ( + key INT64 PRIMARY KEY +); + +INSERT INTO scd_locks (key) VALUES (0); + +UPDATE schema_versions +SET schema_version = 'v3.3.0' +WHERE onerow_enforcer = TRUE; diff --git a/build/db_schemas/yugabyte/scd/downfrom-v1.1.0-remove_locks.sql b/build/db_schemas/yugabyte/scd/downfrom-v1.1.0-remove_locks.sql new file mode 100644 index 000000000..87fd81663 --- /dev/null +++ b/build/db_schemas/yugabyte/scd/downfrom-v1.1.0-remove_locks.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS scd_locks; + +UPDATE schema_versions set schema_version = 'v1.0.0' WHERE onerow_enforcer = TRUE; diff --git a/build/db_schemas/yugabyte/scd/upto-v1.1.0-add_locks.sql b/build/db_schemas/yugabyte/scd/upto-v1.1.0-add_locks.sql new file mode 100644 index 000000000..d0d904f00 --- /dev/null +++ b/build/db_schemas/yugabyte/scd/upto-v1.1.0-add_locks.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS scd_locks ( + key BIGINT PRIMARY KEY +); + +INSERT INTO scd_locks (key) VALUES (0); + +UPDATE schema_versions set schema_version = 'v1.1.0' WHERE onerow_enforcer = TRUE; diff --git a/build/dev/read_scd_lock_mode.sh b/build/dev/read_scd_lock_mode.sh new file mode 100755 index 000000000..fa04c243d --- /dev/null +++ b/build/dev/read_scd_lock_mode.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +set -eo pipefail + +# Retrieve token from dummy OAuth server +ACCESS_TOKEN=$(curl --silent \ + "http://localhost:8085/token?grant_type=client_credentials&scope=interuss.pool_status.read&intended_audience=localhost&issuer=localhost&sub=check_scd" \ +| python extract_json_field.py 'access_token') + +curl --silent -X GET \ +"http://localhost:8082/aux/v1/configuration/scd_lock_mode" \ +-H "Authorization: Bearer ${ACCESS_TOKEN}" -H "Content-Type: application/json" diff --git a/build/dev/read_version.sh b/build/dev/read_version.sh index d5f5f508d..e4ffbb423 100755 --- a/build/dev/read_version.sh +++ b/build/dev/read_version.sh @@ -10,4 +10,3 @@ ACCESS_TOKEN=$(curl --silent \ curl --silent -X GET \ "http://localhost:8082/versions/local.test.identity" \ -H "Authorization: Bearer ${ACCESS_TOKEN}" -H "Content-Type: application/json" - diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 3526618b3..597c14b3b 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -59,6 +59,8 @@ var ( jwksKeyIDs = flag.String("jwks_key_ids", "", "IDs of a set of key in a JWKS, separated by commas") keyRefreshTimeout = flag.Duration("key_refresh_timeout", 1*time.Minute, "Timeout for refreshing keys for JWT verification") jwtAudiences = flag.String("accepted_jwt_audiences", "", "comma-separated acceptable JWT `aud` claims") + + scdGlobalLock = flag.Bool("enable_scd_global_lock", false, "Experimental: Use a global lock when working with SCD subscriptions. Reduce global throughput but improve throughput with lot of subscriptions in the same areas.") ) const ( @@ -106,7 +108,7 @@ func createKeyResolver() (auth.KeyResolver, error) { } } -func createAuxServer(ctx context.Context, locality string, publicEndpoint string, logger *zap.Logger) (*aux.Server, error) { +func createAuxServer(ctx context.Context, locality string, publicEndpoint string, scdGlobalLock bool, logger *zap.Logger) (*aux.Server, error) { connectParameters := flags.ConnectParameters() connectParameters.DBName = "aux" datastore, err := datastore.Dial(ctx, connectParameters) @@ -138,7 +140,7 @@ func createAuxServer(ctx context.Context, locality string, publicEndpoint string return nil, stacktrace.Propagate(err, "Unable to store current metadata") } - return &aux.Server{Store: auxStore, Locality: locality}, nil + return &aux.Server{Store: auxStore, Locality: locality, ScdGlobalLock: scdGlobalLock}, nil } func createRIDServers(ctx context.Context, locality string, logger *zap.Logger) (*rid_v1.Server, *rid_v2.Server, error) { @@ -200,7 +202,7 @@ func createSCDServer(ctx context.Context, logger *zap.Logger) (*scd.Server, erro return nil, stacktrace.Propagate(err, "Failed to connect to strategic conflict detection database; verify your database configuration is current with https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas") } - scdStore, err := scdc.NewStore(ctx, datastore) + scdStore, err := scdc.NewStore(ctx, datastore, *scdGlobalLock) if err != nil { // TODO: More robustly detect failure to create SCD server is due to a problem that may be temporary if strings.Contains(err.Error(), "connect: connection refused") || strings.Contains(err.Error(), "database \"scd\" does not exist") { @@ -233,6 +235,7 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st logger.Info("version", zap.Any("version", version.Current())) logger.Info("build", zap.Any("description", build.Describe())) logger.Info("config", zap.Bool("scd", *enableSCD)) + logger.Info("config", zap.Bool("scdGlobalLock", *scdGlobalLock)) if len(*jwtAudiences) == 0 { // TODO: Make this flag required once all parties can set audiences @@ -250,7 +253,7 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st ) // Initialize aux - auxV1Server, err = createAuxServer(ctx, locality, *publicEndpoint, logger) + auxV1Server, err = createAuxServer(ctx, locality, *publicEndpoint, *scdGlobalLock, logger) if err != nil { return stacktrace.Propagate(err, "Failed to create aux server") } diff --git a/cmds/db-manager/cleanup/evict.go b/cmds/db-manager/cleanup/evict.go index 2487f9ea8..3e85919bf 100644 --- a/cmds/db-manager/cleanup/evict.go +++ b/cmds/db-manager/cleanup/evict.go @@ -176,7 +176,7 @@ func getSCDStore(ctx context.Context) (*scdc.Store, error) { return nil, fmt.Errorf("failed to connect to SCD database with %+v: %w", logParams, err) } - scdStore, err := scdc.NewStore(ctx, datastore) + scdStore, err := scdc.NewStore(ctx, datastore, false) if err != nil { return nil, fmt.Errorf("failed to create strategic conflict detection store with %+v: %w", connectParameters, err) } diff --git a/deploy/infrastructure/dependencies/terraform-commons-dss/default_latest.tf b/deploy/infrastructure/dependencies/terraform-commons-dss/default_latest.tf index 50b2af8af..d06d11647 100644 --- a/deploy/infrastructure/dependencies/terraform-commons-dss/default_latest.tf +++ b/deploy/infrastructure/dependencies/terraform-commons-dss/default_latest.tf @@ -1,5 +1,5 @@ locals { rid_db_schema = var.desired_rid_db_version == "latest" ? (var.datastore_type == "cockroachdb" ? "4.0.0" : "1.0.1") : var.desired_rid_db_version - scd_db_schema = var.desired_scd_db_version == "latest" ? (var.datastore_type == "cockroachdb" ? "3.2.0" : "1.0.1") : var.desired_scd_db_version + scd_db_schema = var.desired_scd_db_version == "latest" ? (var.datastore_type == "cockroachdb" ? "3.3.0" : "1.1.0") : var.desired_scd_db_version aux_db_schema = var.desired_aux_db_version == "latest" ? "1.1.0" : var.desired_aux_db_version } diff --git a/deploy/services/helm-charts/dss/templates/schema-manager.yaml b/deploy/services/helm-charts/dss/templates/schema-manager.yaml index 4d5998a63..5cd48cb43 100644 --- a/deploy/services/helm-charts/dss/templates/schema-manager.yaml +++ b/deploy/services/helm-charts/dss/templates/schema-manager.yaml @@ -5,11 +5,11 @@ {{- $jobVersion := .Release.Revision -}} {{/* Jobs template definition is immutable, using the revision in the name forces the job to be recreated at each helm upgrade. */}} {{- $waitForDatastore := include "init-container-wait-for-http" (dict "serviceName" "cockroachdb" "url" (printf "http://%s:8080/health" $datastoreHost)) -}} -{{- $schemas := dict "rid" "4.0.0" "scd" "3.2.0" "aux_" "1.1.0" }} +{{- $schemas := dict "rid" "4.0.0" "scd" "3.3.0" "aux_" "1.1.0" }} {{- if .Values.yugabyte.enabled }} {{- $waitForDatastore = include "init-container-wait-for-http" (dict "serviceName" "yb-tserver" "url" (printf "http://%s:9000/status" $datastoreHost)) -}} -{{- $schemas = dict "rid" "1.0.1" "scd" "1.0.1" "aux_" "1.1.0" }} +{{- $schemas = dict "rid" "1.0.1" "scd" "1.1.0" "aux_" "1.1.0" }} {{- end -}} {{- range $service, $schemaVersion := $schemas }} diff --git a/deploy/services/tanka/examples/minikube/main.jsonnet b/deploy/services/tanka/examples/minikube/main.jsonnet index 27c607bdf..220f975de 100644 --- a/deploy/services/tanka/examples/minikube/main.jsonnet +++ b/deploy/services/tanka/examples/minikube/main.jsonnet @@ -38,7 +38,7 @@ local metadata = metadataBase { enable: true, image: 'docker.io/interuss-local/dss:latest', desired_rid_db_version: '1.0.1', - desired_scd_db_version: '1.0.1', + desired_scd_db_version: '1.1.0', desired_aux_db_version: '1.1.0', }, evict+: { diff --git a/deploy/services/tanka/examples/minimum/main.jsonnet b/deploy/services/tanka/examples/minimum/main.jsonnet index ffa2f612d..61c1ef359 100644 --- a/deploy/services/tanka/examples/minimum/main.jsonnet +++ b/deploy/services/tanka/examples/minimum/main.jsonnet @@ -58,7 +58,7 @@ local metadata = metadataBase { enable: false, // <-- this boolean value is VAR_ENABLE_SCHEMA_MANAGER image: 'VAR_DOCKER_IMAGE_NAME', desired_rid_db_version: '4.0.0', - desired_scd_db_version: '3.2.0', + desired_scd_db_version: '3.3.0', desired_aux_db_version: '1.1.0', }, prometheus+: { diff --git a/deploy/services/tanka/examples/schema_manager/main.jsonnet b/deploy/services/tanka/examples/schema_manager/main.jsonnet index b1edb2d96..9d9799c75 100644 --- a/deploy/services/tanka/examples/schema_manager/main.jsonnet +++ b/deploy/services/tanka/examples/schema_manager/main.jsonnet @@ -18,7 +18,7 @@ local metadata = metadataBase { enable: true, // <-- this boolean value is VAR_ENABLE_SCHEMA_MANAGER image: 'VAR_DOCKER_IMAGE_NAME', desired_rid_db_version: '4.0.0', - desired_scd_db_version: '3.2.0', + desired_scd_db_version: '3.3.0', desired_aux_db_version: '1.1.0', }, }; diff --git a/deploy/services/tanka/metadata_base.libsonnet b/deploy/services/tanka/metadata_base.libsonnet index cd8e1a57e..e904e5fa2 100644 --- a/deploy/services/tanka/metadata_base.libsonnet +++ b/deploy/services/tanka/metadata_base.libsonnet @@ -87,7 +87,7 @@ enable: false, // NB: Automatically enabled if should_init is set to true. image: error 'must specify image', desired_rid_db_version: '4.0.0', - desired_scd_db_version: '3.2.0', + desired_scd_db_version: '3.3.0', desired_aux_db_version: '1.1.0', }, evict: { diff --git a/docs/assets/perfs_scd_lock_notoverlapping.png b/docs/assets/perfs_scd_lock_notoverlapping.png new file mode 100644 index 000000000..2d11cd751 Binary files /dev/null and b/docs/assets/perfs_scd_lock_notoverlapping.png differ diff --git a/docs/assets/perfs_scd_lock_overlapping.png b/docs/assets/perfs_scd_lock_overlapping.png new file mode 100644 index 000000000..51983ea4c Binary files /dev/null and b/docs/assets/perfs_scd_lock_overlapping.png differ diff --git a/docs/operations/.nav.yml b/docs/operations/.nav.yml index 3cfbe09c4..df3eaa39b 100644 --- a/docs/operations/.nav.yml +++ b/docs/operations/.nav.yml @@ -5,4 +5,5 @@ nav: - "Pooling (CockroachDB)": pooling-crdb.md - "Monitoring": monitoring.md - "Migrations": migrations.md + - "Performances": performances.md - "Troubleshooting": troubleshooting.md diff --git a/docs/operations/index.md b/docs/operations/index.md index 90e58b666..6f49c4035 100644 --- a/docs/operations/index.md +++ b/docs/operations/index.md @@ -24,6 +24,10 @@ See [Leaving a pool](pooling.md#leaving-a-pool) See [Monitoring](monitoring.md) +## Performances + +See [Performances](performances.md) + ## Troubleshooting See [Troubleshooting](troubleshooting.md) diff --git a/docs/operations/performances.md b/docs/operations/performances.md new file mode 100644 index 000000000..4f1d9b791 --- /dev/null +++ b/docs/operations/performances.md @@ -0,0 +1,30 @@ +# Performances + +## The SCD global lock option + +!!! danger + All DSS instances in a DSS pool must use the same value for this option. Mixing will result in dramatically lower performance. + + You can use the `/aux/v1/configuration/scd_lock_mode` endpoint to retrive the current value for a specifc DSS instance. + +It has been reported in issue [#1311](https://github.com/interuss/dss/issues/1311) that creating a lot of overlapping operational intents may increase the datastore load in a way that creates timeouts. + +By default, the code will try to lock on required subscriptions when working on operational intents, and having too many of them may lead to issues. + +A solution to that is to switch to a global lock, that is just globally locking operational intents operations, regardless of subscriptions. + +This will result in lower general throughput for operational intents that don't overlap, as only one of them can be processed at a time, but better performance in the issue's case as lock acquisition is simpler. + +You should enable this option depending on your DSS usage/use case and what you want to maximize: +* If you have non-overlapping traffic and maximum global throughput, don't enable this flag +* If you have overlapping traffic and don't need high global throughput, enable this flag + +The following graphs show example throughput without (on the left) and with the flag (on the right). This has been run on a local machine; on a real deployment you can expect lower performance (due to various latency), but similar relative numbers. + +All graphs have been generated with the [loadtest present in the monitoring repository](https://github.com/interuss/monitoring/blob/main/monitoring/loadtest/README.md) using `SCD.py`. + +![](../assets/perfs_scd_lock_overlapping.png) +*Overlapping requests. Notice the huge spikes on the left, as the datastore struggles to acquire locks.* + +![](../assets/perfs_scd_lock_notoverlapping.png) +*Non-overlapping requests. Notice the reduction of performance on the right, with a single lock.* diff --git a/interfaces/aux_/aux_.yaml b/interfaces/aux_/aux_.yaml index e0b90f0ff..14e114ad7 100644 --- a/interfaces/aux_/aux_.yaml +++ b/interfaces/aux_/aux_.yaml @@ -96,6 +96,13 @@ components: items: type: string + SCDLockModeResponse: + type: object + properties: + global_lock: + description: The value of the 'enable_scd_global_lock' option for this DSS instance + type: boolean + paths: /aux/v1/version: get: @@ -324,6 +331,28 @@ paths: application/json: schema: $ref: '#/components/schemas/ErrorResponse' + /aux/v1/configuration/scd_lock_mode: + get: + summary: Return the value of the 'enable_scd_global_lock' lock option. May be used to ensure all participants in a pool are using the same value. + operationId: getScdLockMode + tags: [ dss ] + security: + - Auth: + - interuss.pool_status.read + responses: + '200': + description: The information is successfully returned. + content: + application/json: + schema: + $ref: '#/components/schemas/SCDLockModeResponse' + '501': + description: >- + The server has not implemented this operation. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' security: - Auth: - dss.read.identification_service_areas diff --git a/pkg/api/auxv1/interface.gen.go b/pkg/api/auxv1/interface.gen.go index b1f3a610c..5852fb182 100644 --- a/pkg/api/auxv1/interface.gen.go +++ b/pkg/api/auxv1/interface.gen.go @@ -7,9 +7,9 @@ import ( ) var ( - InterussPoolStatusHeartbeatWriteScope = api.RequiredScope("interuss.pool_status.heartbeat.write") InterussPoolStatusReadScope = api.RequiredScope("interuss.pool_status.read") DssWriteIdentificationServiceAreasScope = api.RequiredScope("dss.write.identification_service_areas") + InterussPoolStatusHeartbeatWriteScope = api.RequiredScope("interuss.pool_status.heartbeat.write") DssReadIdentificationServiceAreasScope = api.RequiredScope("dss.read.identification_service_areas") GetVersionSecurity = []api.AuthorizationOption{} ValidateOauthSecurity = []api.AuthorizationOption{ @@ -37,6 +37,11 @@ var ( } GetAcceptedCAsSecurity = []api.AuthorizationOption{} GetInstanceCAsSecurity = []api.AuthorizationOption{} + GetScdLockModeSecurity = []api.AuthorizationOption{ + { + "Auth": {InterussPoolStatusReadScope}, + }, + } ) type GetVersionRequest struct { @@ -177,6 +182,21 @@ type GetInstanceCAsResponseSet struct { Response500 *api.InternalServerErrorBody } +type GetScdLockModeRequest struct { + // The result of attempting to authorize this request + Auth api.AuthorizationResult +} +type GetScdLockModeResponseSet struct { + // The information is successfully returned. + Response200 *SCDLockModeResponse + + // The server has not implemented this operation. + Response501 *ErrorResponse + + // Auto-generated internal server error response + Response500 *api.InternalServerErrorBody +} + type Implementation interface { // Queries the version of the DSS. GetVersion(ctx context.Context, req *GetVersionRequest) GetVersionResponseSet @@ -198,4 +218,7 @@ type Implementation interface { // Current certificates of certificate authorities (CAs) that signed the node certificates for this DSS instance. May return more that one certificate (e.g. for rotations). Other DSS instances in the pool should accept node certificates signed by these CAs. GetInstanceCAs(ctx context.Context, req *GetInstanceCAsRequest) GetInstanceCAsResponseSet + + // Return the value of the 'enable_scd_global_lock' lock option. May be used to ensure all participants in a pool are using the same value. + GetScdLockMode(ctx context.Context, req *GetScdLockModeRequest) GetScdLockModeResponseSet } diff --git a/pkg/api/auxv1/server.gen.go b/pkg/api/auxv1/server.gen.go index fc029ef61..8517b7c71 100644 --- a/pkg/api/auxv1/server.gen.go +++ b/pkg/api/auxv1/server.gen.go @@ -266,8 +266,35 @@ func (s *APIRouter) GetInstanceCAs(exp *regexp.Regexp, w http.ResponseWriter, r api.WriteJSON(w, 500, api.InternalServerErrorBody{ErrorMessage: "Handler implementation did not set a response"}) } +func (s *APIRouter) GetScdLockMode(exp *regexp.Regexp, w http.ResponseWriter, r *http.Request) { + var req GetScdLockModeRequest + + // Authorize request + req.Auth = s.Authorizer.Authorize(w, r, GetScdLockModeSecurity) + + // Call implementation + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + response := s.Implementation.GetScdLockMode(ctx, &req) + + // Write response to client + if response.Response200 != nil { + api.WriteJSON(w, 200, response.Response200) + return + } + if response.Response501 != nil { + api.WriteJSON(w, 501, response.Response501) + return + } + if response.Response500 != nil { + api.WriteJSON(w, 500, response.Response500) + return + } + api.WriteJSON(w, 500, api.InternalServerErrorBody{ErrorMessage: "Handler implementation did not set a response"}) +} + func MakeAPIRouter(impl Implementation, auth api.Authorizer) APIRouter { - router := APIRouter{Implementation: impl, Authorizer: auth, Routes: make([]*api.Route, 7)} + router := APIRouter{Implementation: impl, Authorizer: auth, Routes: make([]*api.Route, 8)} pattern := regexp.MustCompile("^/aux/v1/version$") router.Routes[0] = &api.Route{Method: http.MethodGet, Pattern: pattern, Handler: router.GetVersion} @@ -290,5 +317,8 @@ func MakeAPIRouter(impl Implementation, auth api.Authorizer) APIRouter { pattern = regexp.MustCompile("^/aux/v1/configuration/ca_certs$") router.Routes[6] = &api.Route{Method: http.MethodGet, Pattern: pattern, Handler: router.GetInstanceCAs} + pattern = regexp.MustCompile("^/aux/v1/configuration/scd_lock_mode$") + router.Routes[7] = &api.Route{Method: http.MethodGet, Pattern: pattern, Handler: router.GetScdLockMode} + return router } diff --git a/pkg/api/auxv1/types.gen.go b/pkg/api/auxv1/types.gen.go index 4b679a2ad..3bc58cfec 100644 --- a/pkg/api/auxv1/types.gen.go +++ b/pkg/api/auxv1/types.gen.go @@ -52,3 +52,8 @@ type CAsResponse struct { // A list of certificates, each in PEM format. Cas *[]string `json:"CAs,omitempty"` } + +type SCDLockModeResponse struct { + // The value of the 'enable_scd_global_lock' option for this DSS instance + GlobalLock *bool `json:"global_lock,omitempty"` +} diff --git a/pkg/api/scdv1/interface.gen.go b/pkg/api/scdv1/interface.gen.go index 1a60c456a..3cf5b1749 100644 --- a/pkg/api/scdv1/interface.gen.go +++ b/pkg/api/scdv1/interface.gen.go @@ -7,10 +7,10 @@ import ( ) var ( + UtmStrategicCoordinationScope = api.RequiredScope("utm.strategic_coordination") + UtmConformanceMonitoringSaScope = api.RequiredScope("utm.conformance_monitoring_sa") UtmAvailabilityArbitrationScope = api.RequiredScope("utm.availability_arbitration") UtmConstraintManagementScope = api.RequiredScope("utm.constraint_management") - UtmConformanceMonitoringSaScope = api.RequiredScope("utm.conformance_monitoring_sa") - UtmStrategicCoordinationScope = api.RequiredScope("utm.strategic_coordination") UtmConstraintProcessingScope = api.RequiredScope("utm.constraint_processing") QueryOperationalIntentReferencesSecurity = []api.AuthorizationOption{ { diff --git a/pkg/aux_/scd_lock_mode.go b/pkg/aux_/scd_lock_mode.go new file mode 100644 index 000000000..181e94316 --- /dev/null +++ b/pkg/aux_/scd_lock_mode.go @@ -0,0 +1,11 @@ +package aux + +import ( + "context" + + restapi "github.com/interuss/dss/pkg/api/auxv1" +) + +func (a *Server) GetScdLockMode(ctx context.Context, req *restapi.GetScdLockModeRequest) restapi.GetScdLockModeResponseSet { + return restapi.GetScdLockModeResponseSet{Response200: &restapi.SCDLockModeResponse{GlobalLock: &a.ScdGlobalLock}} +} diff --git a/pkg/aux_/server.go b/pkg/aux_/server.go index 3bdf32420..90d69eb3f 100644 --- a/pkg/aux_/server.go +++ b/pkg/aux_/server.go @@ -14,8 +14,9 @@ import ( // Server implements auxv1.Implementation. type Server struct { - Store auxstore.Store - Locality string + Store auxstore.Store + Locality string + ScdGlobalLock bool } func setAuthError(ctx context.Context, authErr error, resp401, resp403 **restapi.ErrorResponse, resp500 **api.InternalServerErrorBody) { diff --git a/pkg/scd/store/datastore/store.go b/pkg/scd/store/datastore/store.go index 5e8ea0ab0..a9fa230b4 100644 --- a/pkg/scd/store/datastore/store.go +++ b/pkg/scd/store/datastore/store.go @@ -32,22 +32,25 @@ var ( // repo is an implementation of repos.Repo using // a CockroachDB/Yugabyte transaction. type repo struct { - q dsssql.Queryable - clock clockwork.Clock + q dsssql.Queryable + clock clockwork.Clock + globalLock bool } // Store is an implementation of an scd.Store using // a CockroachDB or Yugabyte database. type Store struct { - db *datastore.Datastore - clock clockwork.Clock + db *datastore.Datastore + clock clockwork.Clock + globalLock bool } // NewStore returns a Store instance connected to a cockroach or Yugabyte instance via db. -func NewStore(ctx context.Context, db *datastore.Datastore) (*Store, error) { +func NewStore(ctx context.Context, db *datastore.Datastore, globalLock bool) (*Store, error) { store := &Store{ - db: db, - clock: DefaultClock, + db: db, + clock: DefaultClock, + globalLock: globalLock, } if err := store.CheckCurrentMajorSchemaVersion(ctx); err != nil { @@ -81,8 +84,9 @@ func (s *Store) CheckCurrentMajorSchemaVersion(ctx context.Context) error { // Interact implements store.Interactor interface. func (s *Store) Interact(_ context.Context) (repos.Repository, error) { return &repo{ - q: s.db.Pool, - clock: s.clock, + q: s.db.Pool, + clock: s.clock, + globalLock: s.globalLock, }, nil } @@ -91,8 +95,9 @@ func (s *Store) Transact(ctx context.Context, f func(context.Context, repos.Repo ctx = crdb.WithMaxRetries(ctx, flags.ConnectParameters().MaxRetries) return crdbpgx.ExecuteTx(ctx, s.db.Pool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { return f(ctx, &repo{ - q: tx, - clock: s.clock, + q: tx, + clock: s.clock, + globalLock: s.globalLock, }) }) } diff --git a/pkg/scd/store/datastore/subscriptions.go b/pkg/scd/store/datastore/subscriptions.go index 144fe0451..00d96e3b0 100644 --- a/pkg/scd/store/datastore/subscriptions.go +++ b/pkg/scd/store/datastore/subscriptions.go @@ -380,6 +380,22 @@ func (c *repo) IncrementNotificationIndices(ctx context.Context, subscriptionIds func (c *repo) LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion, subscriptionIds []dssmodels.ID) error { + if c.globalLock { + + // Remark: We use a FOR UPDATE, as this works with CockroachDB and Yugabyte. + // In the future without CockroachDB, switching to pg_advisory_xact_lock would remove + // the need for a dedicated table. + const query = `SELECT key FROM scd_locks WHERE key = 0 FOR UPDATE` + + _, err := c.q.Exec(ctx, query) + if err != nil { + return stacktrace.Propagate(err, "Error in query: %s", query) + } + + return nil + + } + const query = ` SELECT id