diff --git a/.github/workflows/prod.yml b/.github/workflows/prod.yml index 5c43d58..9e8a4b6 100644 --- a/.github/workflows/prod.yml +++ b/.github/workflows/prod.yml @@ -1,154 +1,18 @@ -name: MT Deploy Production +name: Deploy Production on: - push: - branches: - - release - + pull_request: + types: [closed] + branches: + - 'releases/latest' + jobs: - release: - runs-on: ubuntu-20.04 - outputs: - published: ${{ steps.semantic.outputs.new_release_published }} - version: ${{ steps.semantic.outputs.new_release_version }} - steps: - - uses: actions/checkout@v3 - - id: semantic - uses: cycjimmy/semantic-release-action@v3 - with: - semantic_version: 18 - env: - GITHUB_TOKEN: ${{ secrets.GH_TOKEN_PROJECT_ACTION }} - deploy: - needs: release + if: github.event.pull_request.merged == true runs-on: ubuntu-20.04 - if: needs.release.outputs.published == 'true' steps: - uses: actions/checkout@v2 - uses: superfly/flyctl-actions@1.1 with: args: "-c deploy/fly/prod.toml deploy --build-arg SLOT_NAME_SUFFIX=${GITHUB_SHA::7}" env: - FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} - - docker_x86_release: - needs: release - runs-on: ubuntu-20.04 - if: needs.release.outputs.published == 'true' - timeout-minutes: 120 - env: - arch: amd64 - outputs: - image_digest: ${{ steps.build.outputs.digest }} - steps: - - id: meta - uses: docker/metadata-action@v4 - with: - images: | - supabase/realtime - tags: | - type=raw,value=v${{ needs.release.outputs.version }}_${{ env.arch }} - - - uses: docker/setup-buildx-action@v2 - - - uses: docker/login-action@v2 - with: - username: ${{ secrets.DOCKER_USERNAME }} - password: ${{ secrets.DOCKER_PASSWORD }} - - - id: build - uses: docker/build-push-action@v3 - with: - push: true - tags: ${{ steps.meta.outputs.tags }} - platforms: linux/${{ env.arch }} - cache-from: type=gha - cache-to: type=gha,mode=max - - docker_arm_release: - needs: release - runs-on: arm-runner - if: needs.release.outputs.published == 'true' - timeout-minutes: 120 - env: - arch: arm64 - outputs: - image_digest: ${{ steps.build.outputs.digest }} - steps: - - uses: actions/checkout@v3 - - - id: meta - uses: docker/metadata-action@v4 - with: - images: | - supabase/realtime - tags: | - type=raw,value=v${{ needs.release.outputs.version }}_${{ env.arch }} - - - uses: docker/login-action@v2 - with: - username: ${{ secrets.DOCKER_USERNAME }} - password: ${{ secrets.DOCKER_PASSWORD }} - - - uses: docker/setup-buildx-action@v2 - with: - driver: docker - driver-opts: | - image=moby/buildkit:master - network=host - - - id: build - uses: docker/build-push-action@v3 - with: - context: . - push: true - tags: ${{ steps.meta.outputs.tags }} - platforms: linux/${{ env.arch }} - no-cache: true - - merge_manifest: - needs: [release, docker_x86_release, docker_arm_release] - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - id-token: write - steps: - - uses: docker/setup-buildx-action@v2 - - - uses: docker/login-action@v2 - with: - username: ${{ secrets.DOCKER_USERNAME }} - password: ${{ secrets.DOCKER_PASSWORD }} - - - name: Merge multi-arch manifests - run: | - docker buildx imagetools create -t supabase/realtime:v${{ needs.release.outputs.version }} \ - supabase/realtime@${{ needs.docker_x86_release.outputs.image_digest }} \ - supabase/realtime@${{ needs.docker_arm_release.outputs.image_digest }} - - - name: configure aws credentials - uses: aws-actions/configure-aws-credentials@v1 - with: - role-to-assume: ${{ secrets.PROD_AWS_ROLE }} - aws-region: us-east-1 - - - name: Login to ECR - uses: docker/login-action@v2 - with: - registry: public.ecr.aws - - - name: Login to GHCR - uses: docker/login-action@v2 - with: - registry: ghcr.io - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Mirror to ECR - uses: akhilerm/tag-push-action@v2.0.0 - with: - src: docker.io/supabase/realtime:v${{ needs.release.outputs.version }} - dst: | - public.ecr.aws/supabase/realtime:v${{ needs.release.outputs.version }} - ghcr.io/supabase/realtime:v${{ needs.release.outputs.version }} + FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/prod_build.yml b/.github/workflows/prod_build.yml new file mode 100644 index 0000000..ea17709 --- /dev/null +++ b/.github/workflows/prod_build.yml @@ -0,0 +1,154 @@ +name: Build Production +on: + push: + branches: + - 'main' +jobs: + release: + runs-on: ubuntu-20.04 + outputs: + published: ${{ steps.semantic.outputs.new_release_published }} + version: ${{ steps.semantic.outputs.new_release_version }} + steps: + - uses: actions/checkout@v3 + - id: semantic + uses: cycjimmy/semantic-release-action@v3 + with: + semantic_version: 18 + env: + GITHUB_TOKEN: ${{ secrets.GH_TOKEN_PROJECT_ACTION }} + + docker_x86_release: + needs: release + runs-on: ubuntu-20.04 + if: needs.release.outputs.published == 'true' + timeout-minutes: 120 + env: + arch: amd64 + outputs: + image_digest: ${{ steps.build.outputs.digest }} + steps: + - id: meta + uses: docker/metadata-action@v4 + with: + images: | + supabase/realtime + tags: | + type=raw,value=v${{ needs.release.outputs.version }}_${{ env.arch }} + + - uses: docker/setup-buildx-action@v2 + + - uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - id: build + uses: docker/build-push-action@v3 + with: + push: true + tags: ${{ steps.meta.outputs.tags }} + platforms: linux/${{ env.arch }} + cache-from: type=gha + cache-to: type=gha,mode=max + + docker_arm_release: + needs: release + runs-on: arm-runner + if: needs.release.outputs.published == 'true' + timeout-minutes: 120 + env: + arch: arm64 + outputs: + image_digest: ${{ steps.build.outputs.digest }} + steps: + - uses: actions/checkout@v3 + + - id: meta + uses: docker/metadata-action@v4 + with: + images: | + supabase/realtime + tags: | + type=raw,value=v${{ needs.release.outputs.version }}_${{ env.arch }} + + - uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - uses: docker/setup-buildx-action@v2 + with: + driver: docker + driver-opts: | + image=moby/buildkit:master + network=host + + - id: build + uses: docker/build-push-action@v3 + with: + context: . + push: true + tags: ${{ steps.meta.outputs.tags }} + platforms: linux/${{ env.arch }} + no-cache: true + + merge_manifest: + needs: [release, docker_x86_release, docker_arm_release] + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + id-token: write + steps: + - uses: docker/setup-buildx-action@v2 + + - uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Merge multi-arch manifests + run: | + docker buildx imagetools create -t supabase/realtime:v${{ needs.release.outputs.version }} \ + supabase/realtime@${{ needs.docker_x86_release.outputs.image_digest }} \ + supabase/realtime@${{ needs.docker_arm_release.outputs.image_digest }} + + - name: configure aws credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + role-to-assume: ${{ secrets.PROD_AWS_ROLE }} + aws-region: us-east-1 + + - name: Login to ECR + uses: docker/login-action@v2 + with: + registry: public.ecr.aws + + - name: Login to GHCR + uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Mirror to ECR + uses: akhilerm/tag-push-action@v2.0.0 + with: + src: docker.io/supabase/realtime:v${{ needs.release.outputs.version }} + dst: | + public.ecr.aws/supabase/realtime:v${{ needs.release.outputs.version }} + ghcr.io/supabase/realtime:v${{ needs.release.outputs.version }} + + update-branch-name: + needs: [release, docker_x86_release, docker_arm_release, merge_manifest] + runs-on: ubuntu-latest + steps: + - name: Checkout branch + uses: actions/checkout@v2 + with: + ref: refs/heads/main + - name: Update branch name + run: | + git branch -m main releases/v${{ needs.release.outputs.version }} + git push origin HEAD:releases/v${{ needs.release.outputs.version }} \ No newline at end of file diff --git a/.github/workflows/prod_linter.yml b/.github/workflows/prod_linter.yml index fc8b5d6..9feacd0 100644 --- a/.github/workflows/prod_linter.yml +++ b/.github/workflows/prod_linter.yml @@ -1,4 +1,4 @@ -name: MT Production Formatting Checks +name: Production Formatting Checks on: pull_request: branches: diff --git a/.github/workflows/staging.yml b/.github/workflows/staging.yml index b26613d..5f0f1b1 100644 --- a/.github/workflows/staging.yml +++ b/.github/workflows/staging.yml @@ -1,4 +1,4 @@ -name: MT Deploy Staging +name: Deploy Staging on: push: branches: diff --git a/.github/workflows/staging_linter.yml b/.github/workflows/staging_linter.yml index 26e5095..6ae20c7 100644 --- a/.github/workflows/staging_linter.yml +++ b/.github/workflows/staging_linter.yml @@ -1,4 +1,4 @@ -name: MT Staging Formatting Checks +name: Staging Formatting Checks on: pull_request: branches: @@ -31,7 +31,7 @@ jobs: - name: Run main database migrations run: mix ecto.migrate --log-migrator-sql - name: Run database tenant migrations - run: mix ecto.migrate --migrations-path priv/repo/postgres/migrations + run: mix ecto.migrate --migrations-path lib/extensions/postgres_cdc_rls/repo/migrations - name: Run format check run: mix format --check-formatted - name: Credo checks diff --git a/.github/workflows/staging_tealbench.yml b/.github/workflows/staging_tealbench.yml index 81ac4cc..5646381 100644 --- a/.github/workflows/staging_tealbench.yml +++ b/.github/workflows/staging_tealbench.yml @@ -1,4 +1,4 @@ -name: MT Tealbench +name: Tealbench on: push: branches: @@ -28,4 +28,4 @@ jobs: curl --request POST \ --url https://Tealbench.fly.dev/api/runs \ --header 'Authorization: User ${{ secrets.BENCHMARK_JWT }}' \ - --data '{"benchmark_id":"SyMKxU3TOiWcAvU","name":"PR ${{ env.PR_NUM }}","comment": "https://github.com/supabase/realtime/pull/${{ env.PR_NUM }}", "origin":"main"}' + --data '{"benchmark_id":"SyMKxU3TOiWcAvU","name":"PR ${{ env.PR_NUM }}","comment": "https://github.com/Tealbase/realtime/pull/${{ env.PR_NUM }}", "origin":"main"}' diff --git a/.github/workflows/version_updated.yml b/.github/workflows/version_updated.yml new file mode 100644 index 0000000..99e9547 --- /dev/null +++ b/.github/workflows/version_updated.yml @@ -0,0 +1,29 @@ +on: + pull_request: + types: [opened, reopened, synchronize] + branches: + - 'main' + +name: Default Checks + +jobs: + versions_updated: + name: Versions Updated + runs-on: ubuntu-20.04 + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Verify Versions Updated + uses: tj-actions/changed-files@v35 + id: verify_changed_files + with: + files: | + mix.exs + + - name: Fail Unless Versions Updated + id: fail_unless_changed + if: steps.verify_changed_files.outputs.any_changed == 'false' + run: | + echo "::error ::Please update the mix.exs version" + exit 1 diff --git a/.releaserc b/.releaserc index 66b8fc1..15f87c2 100644 --- a/.releaserc +++ b/.releaserc @@ -1,10 +1,10 @@ { "branches": [ - "release" + "main" ], "plugins": [ "@semantic-release/commit-analyzer", "@semantic-release/release-notes-generator", "@semantic-release/github" ] -} +} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index ca7eb6c..41169f0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,9 +12,9 @@ # - https://pkgs.org/ - resource for finding needed packages # - Ex: hexpm/elixir:1.14.0-erlang-25.0.4-debian-bullseye-20220801-slim # -ARG ELIXIR_VERSION=1.14.0 -ARG OTP_VERSION=25.0.4 -ARG DEBIAN_VERSION=bullseye-20220801-slim +ARG ELIXIR_VERSION=1.14.3 +ARG OTP_VERSION=25.3 +ARG DEBIAN_VERSION=bullseye-20230227-slim ARG BUILDER_IMAGE="hexpm/elixir:${ELIXIR_VERSION}-erlang-${OTP_VERSION}-debian-${DEBIAN_VERSION}" ARG RUNNER_IMAGE="debian:${DEBIAN_VERSION}" @@ -34,9 +34,8 @@ RUN apt-get update -y \ && apt-get install -y build-essential git \ && apt-get clean \ && rm -f /var/lib/apt/lists/*_* \ - && curl -sL https://deb.nodesource.com/setup_10.x | bash - \ - && apt-get install -y nodejs \ - && apt-get install -y npm + && curl -sL https://deb.nodesource.com/setup_18.x | bash - \ + && apt-get install -y nodejs # prepare build dir WORKDIR /app diff --git a/TAGS.md b/TAGS.md index 39d6b4c..e8ca742 100644 --- a/TAGS.md +++ b/TAGS.md @@ -33,7 +33,3 @@ Here are the tags that exist in my fork but not in the upstream repository: | v1.0.0 | b6f9301cbf50faf55c929fe00733dcca74092664 | v1.0.0 | | v2.0.0 | b4d9b0dbb18f05ed9d665b17ad31675263bc5ea3 | Merge branch 'Tealbase:main' into main | | v2.1.0 | 1290bd40f7abdf89fb5146438c7c7e8f53733758 | v2.1.0 | -| v2.2.0 | 6b50011b0518d2e469d5ec17dcca504e40acecde | v2.2.0 | -| v2.3.0 | 68fa68738e5d8d556d09dc6e1e99cdcc62df8967 | v2.3.0 | -| v2.4.0 | 7622c95761f9a63c79a769aa33fc4b2e3273d76b | v2.4.0 | -| v2.5.0 | 49453d2b93eac0b80e335f667cc17f10c5760ca3 | feat: v2.5.0 | diff --git a/deploy/fly/prod.toml b/deploy/fly/prod.toml index 0adb642..f098ce6 100644 --- a/deploy/fly/prod.toml +++ b/deploy/fly/prod.toml @@ -14,6 +14,10 @@ processes = [] allowed_public_ports = [] auto_rollback = true +[mounts] + source="data_vol" + destination="/data" + [[services]] internal_port = 4000 processes = ["app"] diff --git a/deploy/fly/staging.toml b/deploy/fly/staging.toml index 895b928..f111500 100644 --- a/deploy/fly/staging.toml +++ b/deploy/fly/staging.toml @@ -14,6 +14,10 @@ processes = [] allowed_public_ports = [] auto_rollback = true +[mounts] + source="data_vol" + destination="/data" + [[services]] internal_port = 4000 processes = ["app"] diff --git a/lib/extensions/postgres/adapters/postgres/decoder/decoder.ex b/lib/extensions/postgres/adapters/postgres/decoder/decoder.ex index 96e5683..7b90127 100644 --- a/lib/extensions/postgres/adapters/postgres/decoder/decoder.ex +++ b/lib/extensions/postgres/adapters/postgres/decoder/decoder.ex @@ -9,33 +9,139 @@ defmodule Realtime.Adapters.Postgres.Decoder do @moduledoc """ Different types of logical replication messages from Postgres """ - defmodule(Begin, do: defstruct([:final_lsn, :commit_timestamp, :xid])) - defmodule(Commit, do: defstruct([:flags, :lsn, :end_lsn, :commit_timestamp])) - defmodule(Origin, do: defstruct([:origin_commit_lsn, :name])) - defmodule(Relation, do: defstruct([:id, :namespace, :name, :replica_identity, :columns])) - defmodule(Insert, do: defstruct([:relation_id, :tuple_data])) + defmodule Begin do + @moduledoc """ + Struct representing the BEGIN message in PostgreSQL's logical decoding output. + + * `final_lsn` - The LSN of the commit that this transaction ended at. + * `commit_timestamp` - The timestamp of the commit that this transaction ended at. + * `xid` - The transaction ID of this transaction. + """ + defstruct [:final_lsn, :commit_timestamp, :xid] + end - defmodule(Update, - do: defstruct([:relation_id, :changed_key_tuple_data, :old_tuple_data, :tuple_data]) - ) + defmodule Commit do + @moduledoc """ + Struct representing the COMMIT message in PostgreSQL's logical decoding output. - defmodule(Delete, - do: defstruct([:relation_id, :changed_key_tuple_data, :old_tuple_data]) - ) + * `flags` - Bitmask of flags associated with this commit. + * `lsn` - The LSN of the commit. + * `end_lsn` - The LSN of the next record in the WAL stream. + * `commit_timestamp` - The timestamp of the commit. + """ + defstruct [:flags, :lsn, :end_lsn, :commit_timestamp] + end - defmodule(Truncate, - do: defstruct([:number_of_relations, :options, :truncated_relations]) - ) + defmodule Origin do + @moduledoc """ + Struct representing the ORIGIN message in PostgreSQL's logical decoding output. - defmodule(Type, - do: defstruct([:id, :namespace, :name]) - ) + * `origin_commit_lsn` - The LSN of the commit in the database that the change originated from. + * `name` - The name of the origin. + """ + defstruct [:origin_commit_lsn, :name] + end - defmodule(Unsupported, do: defstruct([:data])) + defmodule Relation do + @moduledoc """ + Struct representing the RELATION message in PostgreSQL's logical decoding output. + + * `id` - The OID of the relation. + * `namespace` - The OID of the namespace that the relation belongs to. + * `name` - The name of the relation. + * `replica_identity` - The replica identity setting of the relation. + * `columns` - A list of columns in the relation. + """ + defstruct [:id, :namespace, :name, :replica_identity, :columns] + + defmodule Column do + @moduledoc """ + Struct representing a column in a relation. + + * `flags` - Bitmask of flags associated with this column. + * `name` - The name of the column. + * `type` - The OID of the data type of the column. + * `type_modifier` - The type modifier of the column. + """ + defstruct [:flags, :name, :type, :type_modifier] + end + end + + defmodule Insert do + @moduledoc """ + Struct representing the INSERT message in PostgreSQL's logical decoding output. - defmodule(Relation.Column, - do: defstruct([:flags, :name, :type, :type_modifier]) - ) + * `relation_id` - The OID of the relation that the tuple was inserted into. + * `tuple_data` - The data of the inserted tuple. + """ + defstruct [:relation_id, :tuple_data] + end + + defmodule Update do + @moduledoc """ + Struct representing the UPDATE message in PostgreSQL's logical decoding output. + + * `relation_id` - The OID of the relation that the tuple was updated in. + * `changed_key_tuple_data` - The data of the tuple with the old key values. + * `old_tuple_data` - The data of the tuple before the update. + * `tuple_data` - The data of the tuple after the update. + """ + defstruct [:relation_id, :changed_key_tuple_data, :old_tuple_data, :tuple_data] + end + + defmodule Delete do + @moduledoc """ + Struct representing the DELETE message in PostgreSQL's logical decoding output. + + * `relation_id` - The OID of the relation that the tuple was deleted from. + * `changed_key_tuple_data` - The data of the tuple with the old key values. + * `old_tuple_data` - The data of the tuple before the delete. + """ + defstruct [:relation_id, :changed_key_tuple_data, :old_tuple_data] + end + + defmodule Truncate do + @moduledoc """ + Struct representing the TRUNCATE message in PostgreSQL's logical decoding output. + + * `number_of_relations` - The number of truncated relations. + * `options` - Additional options provided when truncating the relations. + * `truncated_relations` - List of relations that have been truncated. + """ + defstruct [:number_of_relations, :options, :truncated_relations] + end + + defmodule Type do + @moduledoc """ + Struct representing the TYPE message in PostgreSQL's logical decoding output. + + * `id` - The OID of the type. + * `namespace` - The namespace of the type. + * `name` - The name of the type. + """ + defstruct [:id, :namespace, :name] + end + + defmodule Unsupported do + @moduledoc """ + Struct representing an unsupported message in PostgreSQL's logical decoding output. + + * `data` - The raw data of the unsupported message. + """ + defstruct [:data] + end + + defmodule Relation.Column do + @moduledoc """ + Struct representing a column in a relation in PostgreSQL's logical decoding output. + + * `flags` - Column flags. + * `name` - The name of the column. + * `type` - The OID of the column type. + * `type_modifier` - The type modifier of the column. + """ + defstruct [:flags, :name, :type, :type_modifier] + end end require Logger diff --git a/lib/extensions/postgres/adapters/postgres/decoder/oid_database.ex b/lib/extensions/postgres/adapters/postgres/decoder/oid_database.ex index c067022..73f621b 100644 --- a/lib/extensions/postgres/adapters/postgres/decoder/oid_database.ex +++ b/lib/extensions/postgres/adapters/postgres/decoder/oid_database.ex @@ -16,6 +16,8 @@ # AND typisdefined -- Ignore undefined types defmodule Realtime.Adapters.Postgres.OidDatabase do + @moduledoc "This module maps a numeric PostgreSQL type ID to a descriptive string." + @doc """ Maps a numeric PostgreSQL type ID to a descriptive string. diff --git a/lib/extensions/postgres_cdc_rls/migrations.ex b/lib/extensions/postgres_cdc_rls/migrations.ex index 4fd633c..a3b5df9 100644 --- a/lib/extensions/postgres_cdc_rls/migrations.ex +++ b/lib/extensions/postgres_cdc_rls/migrations.ex @@ -1,10 +1,79 @@ defmodule Extensions.PostgresCdcRls.Migrations do - @moduledoc false + @moduledoc """ + Run Realtime database migrations for tenant's database. + """ + use GenServer alias Realtime.Repo + + alias Realtime.Extensions.Rls.Repo.Migrations.{ + CreateRealtimeSubscriptionTable, + CreateRealtimeCheckFiltersTrigger, + CreateRealtimeQuoteWal2jsonFunction, + CreateRealtimeCheckEqualityOpFunction, + CreateRealtimeBuildPreparedStatementSqlFunction, + CreateRealtimeCastFunction, + CreateRealtimeIsVisibleThroughFiltersFunction, + CreateRealtimeApplyRlsFunction, + GrantRealtimeUsageToAuthenticatedRole, + EnableRealtimeApplyRlsFunctionPostgrest9Compatibility, + UpdateRealtimeSubscriptionCheckFiltersFunctionSecurity, + UpdateRealtimeBuildPreparedStatementSqlFunctionForCompatibilityWithAllTypes, + EnableGenericSubscriptionClaims, + AddWalPayloadOnErrorsInApplyRlsFunction, + UpdateChangeTimestampToIso8601ZuluFormat, + UpdateSubscriptionCheckFiltersFunctionDynamicTableName, + UpdateApplyRlsFunctionToApplyIso8601, + AddQuotedRegtypesSupport, + AddOutputForDataLessThanEqual64BytesWhenPayloadTooLarge, + AddQuotedRegtypesBackwardCompatibilitySupport, + RecreateRealtimeBuildPreparedStatementSqlFunction, + NullPassesFiltersRecreateIsVisibleThroughFilters, + UpdateApplyRlsFunctionToPassThroughDeleteEventsOnFilter, + MillisecondPrecisionForWalrus, + AddInOpToFilters, + EnableFilteringOnDeleteRecord, + UpdateSubscriptionCheckFiltersForInFilterNonTextTypes, + ConvertCommitTimestampToUtc, + OutputFullRecordWhenUnchangedToast + } + alias Realtime.Helpers, as: H + @migrations [ + {20_211_116_024_918, CreateRealtimeSubscriptionTable}, + {20_211_116_045_059, CreateRealtimeCheckFiltersTrigger}, + {20_211_116_050_929, CreateRealtimeQuoteWal2jsonFunction}, + {20_211_116_051_442, CreateRealtimeCheckEqualityOpFunction}, + {20_211_116_212_300, CreateRealtimeBuildPreparedStatementSqlFunction}, + {20_211_116_213_355, CreateRealtimeCastFunction}, + {20_211_116_213_934, CreateRealtimeIsVisibleThroughFiltersFunction}, + {20_211_116_214_523, CreateRealtimeApplyRlsFunction}, + {20_211_122_062_447, GrantRealtimeUsageToAuthenticatedRole}, + {20_211_124_070_109, EnableRealtimeApplyRlsFunctionPostgrest9Compatibility}, + {20_211_202_204_204, UpdateRealtimeSubscriptionCheckFiltersFunctionSecurity}, + {20_211_202_204_605, + UpdateRealtimeBuildPreparedStatementSqlFunctionForCompatibilityWithAllTypes}, + {20_211_210_212_804, EnableGenericSubscriptionClaims}, + {20_211_228_014_915, AddWalPayloadOnErrorsInApplyRlsFunction}, + {20_220_107_221_237, UpdateChangeTimestampToIso8601ZuluFormat}, + {20_220_228_202_821, UpdateSubscriptionCheckFiltersFunctionDynamicTableName}, + {20_220_312_004_840, UpdateApplyRlsFunctionToApplyIso8601}, + {20_220_603_231_003, AddQuotedRegtypesSupport}, + {20_220_603_232_444, AddOutputForDataLessThanEqual64BytesWhenPayloadTooLarge}, + {20_220_615_214_548, AddQuotedRegtypesBackwardCompatibilitySupport}, + {20_220_712_093_339, RecreateRealtimeBuildPreparedStatementSqlFunction}, + {20_220_908_172_859, NullPassesFiltersRecreateIsVisibleThroughFilters}, + {20_220_916_233_421, UpdateApplyRlsFunctionToPassThroughDeleteEventsOnFilter}, + {20_230_119_133_233, MillisecondPrecisionForWalrus}, + {20_230_128_025_114, AddInOpToFilters}, + {20_230_128_025_212, EnableFilteringOnDeleteRecord}, + {20_230_227_211_149, UpdateSubscriptionCheckFiltersForInFilterNonTextTypes}, + {20_230_228_184_745, ConvertCommitTimestampToUtc}, + {20_230_308_225_145, OutputFullRecordWhenUnchangedToast} + ] + @spec start_link(GenServer.options()) :: GenServer.on_start() def start_link(opts) do GenServer.start_link(__MODULE__, opts) @@ -13,7 +82,8 @@ defmodule Extensions.PostgresCdcRls.Migrations do ## Callbacks @impl true - def init(args) do + def init(%{"id" => id} = args) do + Logger.metadata(external_id: id, project: id) # applying tenant's migrations apply_migrations(args) # need try to stop this PID @@ -27,14 +97,23 @@ defmodule Extensions.PostgresCdcRls.Migrations do end @spec apply_migrations(map()) :: [integer()] - defp apply_migrations(args) do + defp apply_migrations( + %{ + "db_host" => db_host, + "db_port" => db_port, + "db_name" => db_name, + "db_user" => db_user, + "db_password" => db_password, + "db_socket_opts" => db_socket_opts + } = _args + ) do {host, port, name, user, pass} = H.decrypt_creds( - args["db_host"], - args["db_port"], - args["db_name"], - args["db_user"], - args["db_password"] + db_host, + db_port, + db_name, + db_user, + db_password ) Repo.with_dynamic_repo( @@ -45,12 +124,12 @@ defmodule Extensions.PostgresCdcRls.Migrations do password: pass, username: user, pool_size: 2, - socket_options: args["db_socket_opts"] + socket_options: db_socket_opts ], fn repo -> Ecto.Migrator.run( Repo, - [Ecto.Migrator.migrations_path(Repo, "postgres/migrations")], + @migrations, :up, all: true, prefix: "realtime", diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index d7f0a0d..4b8734c 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -12,8 +12,9 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do alias Extensions.PostgresCdcRls.{Replications, MessageDispatcher} alias DBConnection.Backoff - alias Realtime.PubSub + alias Realtime.GenCounter + alias Realtime.Tenants alias Realtime.Adapters.Changes.{ DeletedRecord, @@ -151,10 +152,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do {:noreply, %{state | backoff: backoff, poll_ref: poll_ref}} - {:error, - %Postgrex.Error{ - postgres: %{code: :object_in_use, routine: "ReplicationSlotAcquire", message: msg} - }} -> + {:error, %Postgrex.Error{postgres: %{code: :object_in_use, message: msg}}} -> Logger.error("Error polling replication: :object_in_use") [_, db_pid] = Regex.run(~r/PID\s(\d*)$/, msg) diff --git a/lib/extensions/postgres_cdc_rls/replications.ex b/lib/extensions/postgres_cdc_rls/replications.ex index b143d0d..351a071 100644 --- a/lib/extensions/postgres_cdc_rls/replications.ex +++ b/lib/extensions/postgres_cdc_rls/replications.ex @@ -84,7 +84,7 @@ defmodule Extensions.PostgresCdcRls.Replications do string_agg( realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass), ',' - ) filter (where ppt.tablename is not null), + ) filter (where ppt.tablename is not null and ppt.tablename not like '% %'), '' ) w2j_add_tables from diff --git a/priv/repo/postgres/migrations/20211116024918_create_realtime_subscription_table.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116024918_create_realtime_subscription_table.ex similarity index 61% rename from priv/repo/postgres/migrations/20211116024918_create_realtime_subscription_table.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20211116024918_create_realtime_subscription_table.ex index 583f23a..48846ab 100644 --- a/priv/repo/postgres/migrations/20211116024918_create_realtime_subscription_table.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116024918_create_realtime_subscription_table.ex @@ -1,18 +1,20 @@ -defmodule Realtime.Repo.Migrations.CreateRealtimeSubscriptionTable do +defmodule Realtime.Extensions.Rls.Repo.Migrations.CreateRealtimeSubscriptionTable do + @moduledoc false + use Ecto.Migration def change do - execute "create type realtime.equality_op as enum( + execute("create type realtime.equality_op as enum( 'eq', 'neq', 'lt', 'lte', 'gt', 'gte' - );" + );") - execute "create type realtime.user_defined_filter as ( + execute("create type realtime.user_defined_filter as ( column_name text, op realtime.equality_op, value text - );" + );") - execute "create table realtime.subscription ( + execute("create table realtime.subscription ( -- Tracks which users are subscribed to each table id bigint not null generated always as identity, user_id uuid not null, @@ -24,8 +26,10 @@ defmodule Realtime.Repo.Migrations.CreateRealtimeSubscriptionTable do constraint pk_subscription primary key (id), unique (entity, user_id, filters) - )" + )") - execute "create index ix_realtime_subscription_entity on realtime.subscription using hash (entity)" + execute( + "create index ix_realtime_subscription_entity on realtime.subscription using hash (entity)" + ) end end diff --git a/priv/repo/postgres/migrations/20211116045059_create_realtime_check_filters_trigger.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116045059_create_realtime_check_filters_trigger.ex similarity index 87% rename from priv/repo/postgres/migrations/20211116045059_create_realtime_check_filters_trigger.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20211116045059_create_realtime_check_filters_trigger.ex index 4b3ce50..6184e1c 100644 --- a/priv/repo/postgres/migrations/20211116045059_create_realtime_check_filters_trigger.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116045059_create_realtime_check_filters_trigger.ex @@ -1,8 +1,10 @@ -defmodule Realtime.Repo.Migrations.CreateRealtimeCheckFiltersTrigger do +defmodule Realtime.Extensions.Rls.Repo.Migrations.CreateRealtimeCheckFiltersTrigger do + @moduledoc false + use Ecto.Migration def change do - execute "create function realtime.subscription_check_filters() + execute("create function realtime.subscription_check_filters() returns trigger language plpgsql as $$ @@ -53,11 +55,11 @@ defmodule Realtime.Repo.Migrations.CreateRealtimeCheckFiltersTrigger do return new; end; - $$;" + $$;") - execute "create trigger tr_check_filters + execute("create trigger tr_check_filters before insert or update on realtime.subscription for each row - execute function realtime.subscription_check_filters();" + execute function realtime.subscription_check_filters();") end end diff --git a/priv/repo/postgres/migrations/20211116050929_create_realtime_quote_wal2json_function.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116050929_create_realtime_quote_wal2json_function.ex similarity index 84% rename from priv/repo/postgres/migrations/20211116050929_create_realtime_quote_wal2json_function.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20211116050929_create_realtime_quote_wal2json_function.ex index f180af9..6b86dd8 100644 --- a/priv/repo/postgres/migrations/20211116050929_create_realtime_quote_wal2json_function.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116050929_create_realtime_quote_wal2json_function.ex @@ -1,8 +1,10 @@ -defmodule Realtime.Repo.Migrations.CreateRealtimeQuoteWal2jsonFunction do +defmodule Realtime.Extensions.Rls.Repo.Migrations.CreateRealtimeQuoteWal2jsonFunction do + @moduledoc false + use Ecto.Migration def change do - execute "create function realtime.quote_wal2json(entity regclass) + execute("create function realtime.quote_wal2json(entity regclass) returns text language sql immutable @@ -36,6 +38,6 @@ defmodule Realtime.Repo.Migrations.CreateRealtimeQuoteWal2jsonFunction do on pc.relnamespace = nsp.oid where pc.oid = entity - $$;" + $$;") end end diff --git a/priv/repo/postgres/migrations/20211116051442_create_realtime_check_equality_op_function.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116051442_create_realtime_check_equality_op_function.ex similarity index 81% rename from priv/repo/postgres/migrations/20211116051442_create_realtime_check_equality_op_function.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20211116051442_create_realtime_check_equality_op_function.ex index 9d5fb96..68c12ad 100644 --- a/priv/repo/postgres/migrations/20211116051442_create_realtime_check_equality_op_function.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116051442_create_realtime_check_equality_op_function.ex @@ -1,8 +1,10 @@ -defmodule Realtime.Repo.Migrations.CreateRealtimeCheckEqualityOpFunction do +defmodule Realtime.Extensions.Rls.Repo.Migrations.CreateRealtimeCheckEqualityOpFunction do + @moduledoc false + use Ecto.Migration def change do - execute "create function realtime.check_equality_op( + execute("create function realtime.check_equality_op( op realtime.equality_op, type_ regtype, val_1 text, @@ -32,6 +34,6 @@ defmodule Realtime.Repo.Migrations.CreateRealtimeCheckEqualityOpFunction do execute format('select %L::'|| type_::text || ' ' || op_symbol || ' %L::'|| type_::text, val_1, val_2) into res; return res; end; - $$;" + $$;") end end diff --git a/priv/repo/postgres/migrations/20211116212300_create_realtime_build_prepared_statement_sql_function.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116212300_create_realtime_build_prepared_statement_sql_function.ex similarity index 79% rename from priv/repo/postgres/migrations/20211116212300_create_realtime_build_prepared_statement_sql_function.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20211116212300_create_realtime_build_prepared_statement_sql_function.ex index 5691e7c..748bfb5 100644 --- a/priv/repo/postgres/migrations/20211116212300_create_realtime_build_prepared_statement_sql_function.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116212300_create_realtime_build_prepared_statement_sql_function.ex @@ -1,16 +1,18 @@ -defmodule Realtime.Repo.Migrations.CreateRealtimeBuildPreparedStatementSqlFunction do +defmodule Realtime.Extensions.Rls.Repo.Migrations.CreateRealtimeBuildPreparedStatementSqlFunction do + @moduledoc false + use Ecto.Migration def change do - execute "create type realtime.wal_column as ( + execute("create type realtime.wal_column as ( name text, type text, value jsonb, is_pkey boolean, is_selectable boolean - );" + );") - execute "create function realtime.build_prepared_statement_sql( + execute("create function realtime.build_prepared_statement_sql( prepared_statement_name text, entity regclass, columns realtime.wal_column[] @@ -42,6 +44,6 @@ defmodule Realtime.Repo.Migrations.CreateRealtimeBuildPreparedStatementSqlFuncti pkc.is_pkey group by entity - $$;" + $$;") end end diff --git a/priv/repo/postgres/migrations/20211116213355_create_realtime_cast_function.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116213355_create_realtime_cast_function.ex similarity index 60% rename from priv/repo/postgres/migrations/20211116213355_create_realtime_cast_function.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20211116213355_create_realtime_cast_function.ex index 0b11243..6e137e1 100644 --- a/priv/repo/postgres/migrations/20211116213355_create_realtime_cast_function.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116213355_create_realtime_cast_function.ex @@ -1,8 +1,10 @@ -defmodule Realtime.Repo.Migrations.CreateRealtimeCastFunction do +defmodule Realtime.Extensions.Rls.Repo.Migrations.CreateRealtimeCastFunction do + @moduledoc false + use Ecto.Migration def change do - execute "create function realtime.cast(val text, type_ regtype) + execute("create function realtime.cast(val text, type_ regtype) returns jsonb immutable language plpgsql @@ -13,6 +15,6 @@ defmodule Realtime.Repo.Migrations.CreateRealtimeCastFunction do execute format('select to_jsonb(%L::'|| type_::text || ')', val) into res; return res; end - $$;" + $$;") end end diff --git a/priv/repo/postgres/migrations/20211116213934_create_realtime_is_visible_through_filters_function.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116213934_create_realtime_is_visible_through_filters_function.ex similarity index 71% rename from priv/repo/postgres/migrations/20211116213934_create_realtime_is_visible_through_filters_function.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20211116213934_create_realtime_is_visible_through_filters_function.ex index 30d947f..1bbeab1 100644 --- a/priv/repo/postgres/migrations/20211116213934_create_realtime_is_visible_through_filters_function.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116213934_create_realtime_is_visible_through_filters_function.ex @@ -1,8 +1,11 @@ -defmodule Realtime.Repo.Migrations.CreateRealtimeIsVisibleThroughFiltersFunction do +defmodule Realtime.Extensions.Rls.Repo.Migrations.CreateRealtimeIsVisibleThroughFiltersFunction do + @moduledoc false + use Ecto.Migration def change do - execute "create function realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[]) + execute( + "create function realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[]) returns bool language sql immutable @@ -29,5 +32,6 @@ defmodule Realtime.Repo.Migrations.CreateRealtimeIsVisibleThroughFiltersFunction join unnest(columns) col on f.column_name = col.name; $$;" + ) end end diff --git a/priv/repo/postgres/migrations/20211116214523_create_realtime_apply_rls_function.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116214523_create_realtime_apply_rls_function.ex similarity index 95% rename from priv/repo/postgres/migrations/20211116214523_create_realtime_apply_rls_function.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20211116214523_create_realtime_apply_rls_function.ex index fc7879f..74fb6f4 100644 --- a/priv/repo/postgres/migrations/20211116214523_create_realtime_apply_rls_function.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211116214523_create_realtime_apply_rls_function.ex @@ -1,15 +1,20 @@ -defmodule Realtime.Repo.Migrations.CreateRealtimeApplyRlsFunction do +defmodule Realtime.Extensions.Rls.Repo.Migrations.CreateRealtimeApplyRlsFunction do + @moduledoc false + use Ecto.Migration def change do - execute "create type realtime.action as enum ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'ERROR');" - execute "create type realtime.wal_rls as ( + execute( + "create type realtime.action as enum ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'ERROR');" + ) + + execute("create type realtime.wal_rls as ( wal jsonb, is_rls_enabled boolean, users uuid[], errors text[] - );" - execute "create function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) + );") + execute("create function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) returns realtime.wal_rls language plpgsql volatile @@ -207,6 +212,6 @@ defmodule Realtime.Repo.Migrations.CreateRealtimeApplyRlsFunction do errors )::realtime.wal_rls; end; - $$;" + $$;") end end diff --git a/lib/extensions/postgres_cdc_rls/repo/migrations/20211122062447_grant_realtime_usage_to_authenticated_role.ex b/lib/extensions/postgres_cdc_rls/repo/migrations/20211122062447_grant_realtime_usage_to_authenticated_role.ex new file mode 100644 index 0000000..a6fbb53 --- /dev/null +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211122062447_grant_realtime_usage_to_authenticated_role.ex @@ -0,0 +1,9 @@ +defmodule Realtime.Extensions.Rls.Repo.Migrations.GrantRealtimeUsageToAuthenticatedRole do + @moduledoc false + + use Ecto.Migration + + def change do + execute("grant usage on schema realtime to authenticated;") + end +end diff --git a/priv/repo/postgres/migrations/20211124070109_enable_realtime_apply_rls_function_postgrest_9_compatibility.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20211124070109_enable_realtime_apply_rls_function_postgrest_9_compatibility.ex similarity index 96% rename from priv/repo/postgres/migrations/20211124070109_enable_realtime_apply_rls_function_postgrest_9_compatibility.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20211124070109_enable_realtime_apply_rls_function_postgrest_9_compatibility.ex index c2c937a..ea13337 100644 --- a/priv/repo/postgres/migrations/20211124070109_enable_realtime_apply_rls_function_postgrest_9_compatibility.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211124070109_enable_realtime_apply_rls_function_postgrest_9_compatibility.ex @@ -1,8 +1,11 @@ -defmodule Realtime.Repo.Migrations.EnableRealtimeApplyRlsFunctionPostgrest9Compatibility do +defmodule Realtime.Extensions.Rls.Repo.Migrations.EnableRealtimeApplyRlsFunctionPostgrest9Compatibility do + @moduledoc false + use Ecto.Migration def change do - execute "create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) + execute( + "create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) returns realtime.wal_rls language plpgsql volatile @@ -205,5 +208,6 @@ defmodule Realtime.Repo.Migrations.EnableRealtimeApplyRlsFunctionPostgrest9Compa )::realtime.wal_rls; end; $$;" + ) end end diff --git a/priv/repo/postgres/migrations/20211202204204_update_realtime_subscription_check_filters_function_security.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20211202204204_update_realtime_subscription_check_filters_function_security.ex similarity index 90% rename from priv/repo/postgres/migrations/20211202204204_update_realtime_subscription_check_filters_function_security.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20211202204204_update_realtime_subscription_check_filters_function_security.ex index cfdd2ad..a1bc768 100644 --- a/priv/repo/postgres/migrations/20211202204204_update_realtime_subscription_check_filters_function_security.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211202204204_update_realtime_subscription_check_filters_function_security.ex @@ -1,8 +1,10 @@ -defmodule Realtime.Repo.Migrations.UpdateRealtimeSubscriptionCheckFiltersFunctionSecurity do +defmodule Realtime.Extensions.Rls.Repo.Migrations.UpdateRealtimeSubscriptionCheckFiltersFunctionSecurity do + @moduledoc false + use Ecto.Migration def change do - execute "create or replace function realtime.subscription_check_filters() + execute("create or replace function realtime.subscription_check_filters() returns trigger language plpgsql as $$ @@ -53,6 +55,6 @@ defmodule Realtime.Repo.Migrations.UpdateRealtimeSubscriptionCheckFiltersFunctio return new; end; - $$;" + $$;") end end diff --git a/priv/repo/postgres/migrations/20211202204605_update_realtime_build_prepared_statement_sql_function_for_compatibility_with_all_types.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20211202204605_update_realtime_build_prepared_statement_sql_function_for_compatibility_with_all_types.ex similarity index 80% rename from priv/repo/postgres/migrations/20211202204605_update_realtime_build_prepared_statement_sql_function_for_compatibility_with_all_types.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20211202204605_update_realtime_build_prepared_statement_sql_function_for_compatibility_with_all_types.ex index 7eed7f6..3642a54 100644 --- a/priv/repo/postgres/migrations/20211202204605_update_realtime_build_prepared_statement_sql_function_for_compatibility_with_all_types.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211202204605_update_realtime_build_prepared_statement_sql_function_for_compatibility_with_all_types.ex @@ -1,8 +1,10 @@ -defmodule Realtime.Repo.Migrations.UpdateRealtimeBuildPreparedStatementSqlFunctionForCompatibilityWithAllTypes do +defmodule Realtime.Extensions.Rls.Repo.Migrations.UpdateRealtimeBuildPreparedStatementSqlFunctionForCompatibilityWithAllTypes do + @moduledoc false + use Ecto.Migration def change do - execute "create or replace function realtime.build_prepared_statement_sql( + execute("create or replace function realtime.build_prepared_statement_sql( prepared_statement_name text, entity regclass, columns realtime.wal_column[] @@ -34,6 +36,6 @@ defmodule Realtime.Repo.Migrations.UpdateRealtimeBuildPreparedStatementSqlFuncti pkc.is_pkey group by entity - $$;" + $$;") end end diff --git a/priv/repo/postgres/migrations/20211210212804_enable_generic_subscription_claims.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20211210212804_enable_generic_subscription_claims.ex similarity index 90% rename from priv/repo/postgres/migrations/20211210212804_enable_generic_subscription_claims.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20211210212804_enable_generic_subscription_claims.ex index 595ee7a..ab83a53 100644 --- a/priv/repo/postgres/migrations/20211210212804_enable_generic_subscription_claims.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211210212804_enable_generic_subscription_claims.ex @@ -1,34 +1,38 @@ -defmodule Realtime.Repo.Migrations.EnableGenericSubscriptionClaims do +defmodule Realtime.Extensions.Rls.Repo.Migrations.EnableGenericSubscriptionClaims do + @moduledoc false + use Ecto.Migration def change do - execute "truncate table realtime.subscription restart identity" + execute("truncate table realtime.subscription restart identity") - execute "alter table realtime.subscription + execute("alter table realtime.subscription drop constraint subscription_entity_user_id_filters_key cascade, drop column email cascade, - drop column created_at cascade" + drop column created_at cascade") - execute "alter table realtime.subscription rename user_id to subscription_id" + execute("alter table realtime.subscription rename user_id to subscription_id") - execute "create function realtime.to_regrole(role_name text) + execute("create function realtime.to_regrole(role_name text) returns regrole immutable language sql -- required to allow use in generated clause - as $$ select role_name::regrole $$;" + as $$ select role_name::regrole $$;") - execute "alter table realtime.subscription + execute("alter table realtime.subscription add column claims jsonb not null, add column claims_role regrole not null generated always as (realtime.to_regrole(claims ->> 'role')) stored, - add column created_at timestamp not null default timezone('utc', now())" + add column created_at timestamp not null default timezone('utc', now())") - execute "create unique index subscription_subscription_id_entity_filters_key on realtime.subscription (subscription_id, entity, filters)" + execute( + "create unique index subscription_subscription_id_entity_filters_key on realtime.subscription (subscription_id, entity, filters)" + ) - execute "revoke usage on schema realtime from authenticated;" - execute "revoke all on realtime.subscription from authenticated;" + execute("revoke usage on schema realtime from authenticated;") + execute("revoke all on realtime.subscription from authenticated;") - execute "create or replace function realtime.subscription_check_filters() + execute("create or replace function realtime.subscription_check_filters() returns trigger language plpgsql as $$ @@ -79,12 +83,12 @@ defmodule Realtime.Repo.Migrations.EnableGenericSubscriptionClaims do return new; end; - $$;" + $$;") - execute "alter type realtime.wal_rls rename attribute users to subscription_ids cascade;" + execute("alter type realtime.wal_rls rename attribute users to subscription_ids cascade;") - execute "drop function realtime.apply_rls(jsonb, integer);" - execute "create function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) + execute("drop function realtime.apply_rls(jsonb, integer);") + execute("create function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) returns setof realtime.wal_rls language plpgsql volatile @@ -316,6 +320,6 @@ defmodule Realtime.Repo.Migrations.EnableGenericSubscriptionClaims do perform set_config('role', null, true); end; - $$;" + $$;") end end diff --git a/priv/repo/postgres/migrations/20211228014915_add_wal_payload_on_errors_in_apply_rls_function.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20211228014915_add_wal_payload_on_errors_in_apply_rls_function.ex similarity index 97% rename from priv/repo/postgres/migrations/20211228014915_add_wal_payload_on_errors_in_apply_rls_function.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20211228014915_add_wal_payload_on_errors_in_apply_rls_function.ex index 25f79a4..bea449e 100644 --- a/priv/repo/postgres/migrations/20211228014915_add_wal_payload_on_errors_in_apply_rls_function.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20211228014915_add_wal_payload_on_errors_in_apply_rls_function.ex @@ -1,8 +1,11 @@ -defmodule Realtime.Repo.Migrations.AddWalPayloadOnErrorsInApplyRlsFunction do +defmodule Realtime.Extensions.Rls.Repo.Migrations.AddWalPayloadOnErrorsInApplyRlsFunction do + @moduledoc false + use Ecto.Migration def change do - execute "create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) + execute( + "create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) returns setof realtime.wal_rls language plpgsql volatile @@ -243,5 +246,6 @@ defmodule Realtime.Repo.Migrations.AddWalPayloadOnErrorsInApplyRlsFunction do perform set_config('role', null, true); end; $$;" + ) end end diff --git a/priv/repo/postgres/migrations/20220312004840_update_apply_rls_function_to_apply_iso_8601.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20220107221237_update_change_timestamp_to_iso_8601_zulu_format.ex similarity index 97% rename from priv/repo/postgres/migrations/20220312004840_update_apply_rls_function_to_apply_iso_8601.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20220107221237_update_change_timestamp_to_iso_8601_zulu_format.ex index 0f096a5..a5f837a 100644 --- a/priv/repo/postgres/migrations/20220312004840_update_apply_rls_function_to_apply_iso_8601.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20220107221237_update_change_timestamp_to_iso_8601_zulu_format.ex @@ -1,8 +1,11 @@ -defmodule Realtime.Repo.Migrations.UpdateApplyRlsFunctionToApplyIso8601 do +defmodule Realtime.Extensions.Rls.Repo.Migrations.UpdateChangeTimestampToIso8601ZuluFormat do + @moduledoc false + use Ecto.Migration def change do - execute "create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) + execute( + "create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) returns setof realtime.wal_rls language plpgsql volatile @@ -238,5 +241,6 @@ defmodule Realtime.Repo.Migrations.UpdateApplyRlsFunctionToApplyIso8601 do perform set_config('role', null, true); end; $$;" + ) end end diff --git a/priv/repo/postgres/migrations/20220228202821_update_subscription_check_filters_function_dynamic_table_name.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20220228202821_update_subscription_check_filters_function_dynamic_table_name.ex similarity index 90% rename from priv/repo/postgres/migrations/20220228202821_update_subscription_check_filters_function_dynamic_table_name.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20220228202821_update_subscription_check_filters_function_dynamic_table_name.ex index dd55974..5e37caa 100644 --- a/priv/repo/postgres/migrations/20220228202821_update_subscription_check_filters_function_dynamic_table_name.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20220228202821_update_subscription_check_filters_function_dynamic_table_name.ex @@ -1,8 +1,10 @@ -defmodule Realtime.Repo.Migrations.UpdateSubscriptionCheckFiltersFunctionDynamicTableName do +defmodule Realtime.Extensions.Rls.Repo.Migrations.UpdateSubscriptionCheckFiltersFunctionDynamicTableName do + @moduledoc false + use Ecto.Migration def change do - execute "create or replace function realtime.subscription_check_filters() + execute("create or replace function realtime.subscription_check_filters() returns trigger language plpgsql as $$ @@ -58,6 +60,6 @@ defmodule Realtime.Repo.Migrations.UpdateSubscriptionCheckFiltersFunctionDynamic return new; end; - $$;" + $$;") end end diff --git a/priv/repo/postgres/migrations/20220107221237_update_change_timestamp_to_iso_8601_zulu_format.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20220312004840_update_apply_rls_function_to_apply_iso_8601.ex similarity index 97% rename from priv/repo/postgres/migrations/20220107221237_update_change_timestamp_to_iso_8601_zulu_format.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20220312004840_update_apply_rls_function_to_apply_iso_8601.ex index 6c03285..95a1b81 100644 --- a/priv/repo/postgres/migrations/20220107221237_update_change_timestamp_to_iso_8601_zulu_format.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20220312004840_update_apply_rls_function_to_apply_iso_8601.ex @@ -1,8 +1,11 @@ -defmodule Realtime.Repo.Migrations.UpdateChangeTimestampToIso8601ZuluFormat do +defmodule Realtime.Extensions.Rls.Repo.Migrations.UpdateApplyRlsFunctionToApplyIso8601 do + @moduledoc false + use Ecto.Migration def change do - execute "create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) + execute( + "create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) returns setof realtime.wal_rls language plpgsql volatile @@ -238,5 +241,6 @@ defmodule Realtime.Repo.Migrations.UpdateChangeTimestampToIso8601ZuluFormat do perform set_config('role', null, true); end; $$;" + ) end end diff --git a/priv/repo/postgres/migrations/20220603231003_add_quoted_regtypes_support.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20220603231003_add_quoted_regtypes_support.ex similarity index 99% rename from priv/repo/postgres/migrations/20220603231003_add_quoted_regtypes_support.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20220603231003_add_quoted_regtypes_support.ex index f1d04f7..e42c524 100644 --- a/priv/repo/postgres/migrations/20220603231003_add_quoted_regtypes_support.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20220603231003_add_quoted_regtypes_support.ex @@ -1,4 +1,6 @@ -defmodule Realtime.RLS.Repo.Migrations.AddQuotedRegtypesSupport do +defmodule Realtime.Extensions.Rls.Repo.Migrations.AddQuotedRegtypesSupport do + @moduledoc false + use Ecto.Migration def change do diff --git a/priv/repo/postgres/migrations/20220603232444_add_output_for_data_less_than_equal_64_bytes_when_payload_too_large.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20220603232444_add_output_for_data_less_than_equal_64_bytes_when_payload_too_large.ex similarity index 98% rename from priv/repo/postgres/migrations/20220603232444_add_output_for_data_less_than_equal_64_bytes_when_payload_too_large.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20220603232444_add_output_for_data_less_than_equal_64_bytes_when_payload_too_large.ex index bdfd9a7..6130ab7 100644 --- a/priv/repo/postgres/migrations/20220603232444_add_output_for_data_less_than_equal_64_bytes_when_payload_too_large.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20220603232444_add_output_for_data_less_than_equal_64_bytes_when_payload_too_large.ex @@ -1,4 +1,6 @@ -defmodule Realtime.RLS.Repo.Migrations.AddOutputForDataLessThanEqual64BytesWhenPayloadTooLarge do +defmodule Realtime.Extensions.Rls.Repo.Migrations.AddOutputForDataLessThanEqual64BytesWhenPayloadTooLarge do + @moduledoc false + use Ecto.Migration def change do diff --git a/priv/repo/postgres/migrations/20220615214548_add_quoted_regtypes_backward_compatibility_support.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20220615214548_add_quoted_regtypes_backward_compatibility_support.ex similarity index 99% rename from priv/repo/postgres/migrations/20220615214548_add_quoted_regtypes_backward_compatibility_support.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20220615214548_add_quoted_regtypes_backward_compatibility_support.ex index d81afbd..ad64cf7 100644 --- a/priv/repo/postgres/migrations/20220615214548_add_quoted_regtypes_backward_compatibility_support.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20220615214548_add_quoted_regtypes_backward_compatibility_support.ex @@ -1,4 +1,6 @@ -defmodule Realtime.RLS.Repo.Migrations.AddQuotedRegtypesBackwardCompatibilitySupport do +defmodule Realtime.Extensions.Rls.Repo.Migrations.AddQuotedRegtypesBackwardCompatibilitySupport do + @moduledoc false + use Ecto.Migration def change do diff --git a/priv/repo/postgres/migrations/20220712093339_recreate_realtime_build_prepared_statement_sql_function.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20220712093339_recreate_realtime_build_prepared_statement_sql_function.ex similarity index 88% rename from priv/repo/postgres/migrations/20220712093339_recreate_realtime_build_prepared_statement_sql_function.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20220712093339_recreate_realtime_build_prepared_statement_sql_function.ex index 4a5c51e..f745784 100644 --- a/priv/repo/postgres/migrations/20220712093339_recreate_realtime_build_prepared_statement_sql_function.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20220712093339_recreate_realtime_build_prepared_statement_sql_function.ex @@ -1,8 +1,10 @@ -defmodule Realtime.RLS.Repo.Migrations.RecreateRealtimeBuildPreparedStatementSqlFunction do +defmodule Realtime.Extensions.Rls.Repo.Migrations.RecreateRealtimeBuildPreparedStatementSqlFunction do + @moduledoc false + use Ecto.Migration def change do - execute " + execute(" create or replace function realtime.build_prepared_statement_sql( prepared_statement_name text, entity regclass, @@ -35,6 +37,6 @@ defmodule Realtime.RLS.Repo.Migrations.RecreateRealtimeBuildPreparedStatementSql group by entity $$; - " + ") end end diff --git a/priv/repo/postgres/migrations/20220908172859_null_passes_filters_recreate_is_visible_through_filters.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20220908172859_null_passes_filters_recreate_is_visible_through_filters.ex similarity index 90% rename from priv/repo/postgres/migrations/20220908172859_null_passes_filters_recreate_is_visible_through_filters.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20220908172859_null_passes_filters_recreate_is_visible_through_filters.ex index 3f3dcc7..09a24ba 100644 --- a/priv/repo/postgres/migrations/20220908172859_null_passes_filters_recreate_is_visible_through_filters.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20220908172859_null_passes_filters_recreate_is_visible_through_filters.ex @@ -1,8 +1,10 @@ -defmodule Realtime.RLS.Repo.Migrations.NullPassesFiltersRecreateIsVisibleThroughFilters do +defmodule Realtime.Extensions.Rls.Repo.Migrations.NullPassesFiltersRecreateIsVisibleThroughFilters do + @moduledoc false + use Ecto.Migration def change do - execute " + execute(" create or replace function realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[]) returns bool language sql @@ -35,6 +37,6 @@ defmodule Realtime.RLS.Repo.Migrations.NullPassesFiltersRecreateIsVisibleThrough join unnest(columns) col on f.column_name = col.name; $$; - " + ") end end diff --git a/priv/repo/postgres/migrations/20220916233421_update_apply_rls_function_to_pass_through_delete_events_on_filter.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20220916233421_update_apply_rls_function_to_pass_through_delete_events_on_filter.ex similarity index 98% rename from priv/repo/postgres/migrations/20220916233421_update_apply_rls_function_to_pass_through_delete_events_on_filter.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20220916233421_update_apply_rls_function_to_pass_through_delete_events_on_filter.ex index 9d3de9e..7499653 100644 --- a/priv/repo/postgres/migrations/20220916233421_update_apply_rls_function_to_pass_through_delete_events_on_filter.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20220916233421_update_apply_rls_function_to_pass_through_delete_events_on_filter.ex @@ -1,4 +1,6 @@ -defmodule Realtime.RLS.Repo.Migrations.UpdateApplyRlsFunctionToPassThroughDeleteEventsOnFilter do +defmodule Realtime.Extensions.Rls.Repo.Migrations.UpdateApplyRlsFunctionToPassThroughDeleteEventsOnFilter do + @moduledoc false + use Ecto.Migration def change do diff --git a/priv/repo/postgres/migrations/20230119133233_millisecond_precision_for_walrus.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20230119133233_millisecond_precision_for_walrus.ex similarity index 99% rename from priv/repo/postgres/migrations/20230119133233_millisecond_precision_for_walrus.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20230119133233_millisecond_precision_for_walrus.ex index 2aeb202..ce68233 100644 --- a/priv/repo/postgres/migrations/20230119133233_millisecond_precision_for_walrus.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20230119133233_millisecond_precision_for_walrus.ex @@ -1,4 +1,6 @@ -defmodule Realtime.RLS.Repo.Migrations.MillisecondPrecisionForWalrus do +defmodule Realtime.Extensions.Rls.Repo.Migrations.MillisecondPrecisionForWalrus do + @moduledoc false + use Ecto.Migration def change do diff --git a/priv/repo/postgres/migrations/20230128025114_add_in_op_to_filters.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20230128025114_add_in_op_to_filters.ex similarity index 97% rename from priv/repo/postgres/migrations/20230128025114_add_in_op_to_filters.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20230128025114_add_in_op_to_filters.ex index 405944d..b459136 100644 --- a/priv/repo/postgres/migrations/20230128025114_add_in_op_to_filters.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20230128025114_add_in_op_to_filters.ex @@ -1,4 +1,6 @@ -defmodule Realtime.RLS.Repo.Migrations.AddInOpToFilters do +defmodule Realtime.Extensions.Rls.Repo.Migrations.AddInOpToFilters do + @moduledoc false + use Ecto.Migration def change do diff --git a/priv/repo/postgres/migrations/20230128025212_enable_filtering_on_delete_record.exs b/lib/extensions/postgres_cdc_rls/repo/migrations/20230128025212_enable_filtering_on_delete_record.ex similarity index 99% rename from priv/repo/postgres/migrations/20230128025212_enable_filtering_on_delete_record.exs rename to lib/extensions/postgres_cdc_rls/repo/migrations/20230128025212_enable_filtering_on_delete_record.ex index 048c7ba..c0a54a1 100644 --- a/priv/repo/postgres/migrations/20230128025212_enable_filtering_on_delete_record.exs +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20230128025212_enable_filtering_on_delete_record.ex @@ -1,4 +1,6 @@ -defmodule Realtime.RLS.Repo.Migrations.EnableFilteringOnDeleteRecord do +defmodule Realtime.Extensions.Rls.Repo.Migrations.EnableFilteringOnDeleteRecord do + @moduledoc false + use Ecto.Migration def change do diff --git a/lib/extensions/postgres_cdc_rls/repo/migrations/20230227211149_update_subscription_check_filters_for_in_filter_non_text_types.ex b/lib/extensions/postgres_cdc_rls/repo/migrations/20230227211149_update_subscription_check_filters_for_in_filter_non_text_types.ex new file mode 100644 index 0000000..0fe46c3 --- /dev/null +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20230227211149_update_subscription_check_filters_for_in_filter_non_text_types.ex @@ -0,0 +1,79 @@ +defmodule Realtime.Extensions.Rls.Repo.Migrations.UpdateSubscriptionCheckFiltersForInFilterNonTextTypes do + @moduledoc false + + use Ecto.Migration + + def change do + execute(" + create or replace function realtime.subscription_check_filters() + returns trigger + language plpgsql + as $$ + /* + Validates that the user defined filters for a subscription: + - refer to valid columns that the claimed role may access + - values are coercable to the correct column type + */ + declare + col_names text[] = coalesce( + array_agg(c.column_name order by c.ordinal_position), + '{}'::text[] + ) + from + information_schema.columns c + where + format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity + and pg_catalog.has_column_privilege( + (new.claims ->> 'role'), + format('%I.%I', c.table_schema, c.table_name)::regclass, + c.column_name, + 'SELECT' + ); + filter realtime.user_defined_filter; + col_type regtype; + + in_val jsonb; + begin + for filter in select * from unnest(new.filters) loop + -- Filtered column is valid + if not filter.column_name = any(col_names) then + raise exception 'invalid column for filter %', filter.column_name; + end if; + + -- Type is sanitized and safe for string interpolation + col_type = ( + select atttypid::regtype + from pg_catalog.pg_attribute + where attrelid = new.entity + and attname = filter.column_name + ); + if col_type is null then + raise exception 'failed to lookup type for column %', filter.column_name; + end if; + + -- Set maximum number of entries for in filter + if filter.op = 'in'::realtime.equality_op then + in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype); + if coalesce(jsonb_array_length(in_val), 0) > 100 then + raise exception 'too many values for `in` filter. Maximum 100'; + end if; + else + -- raises an exception if value is not coercable to type + perform realtime.cast(filter.value, col_type); + end if; + + end loop; + + -- Apply consistent order to filters so the unique constraint on + -- (subscription_id, entity, filters) can't be tricked by a different filter order + new.filters = coalesce( + array_agg(f order by f.column_name, f.op, f.value), + '{}' + ) from unnest(new.filters) f; + + return new; + end; + $$; + ") + end +end diff --git a/lib/extensions/postgres_cdc_rls/repo/migrations/20230228184745_convert_commit_timestamp_to_utc.ex b/lib/extensions/postgres_cdc_rls/repo/migrations/20230228184745_convert_commit_timestamp_to_utc.ex new file mode 100644 index 0000000..e97e3d4 --- /dev/null +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20230228184745_convert_commit_timestamp_to_utc.ex @@ -0,0 +1,295 @@ +defmodule Realtime.Extensions.Rls.Repo.Migrations.ConvertCommitTimestampToUtc do + @moduledoc false + + use Ecto.Migration + + def change do + execute(" + create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) + returns setof realtime.wal_rls + language plpgsql + volatile + as $$ + declare + -- Regclass of the table e.g. public.notes + entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass; + + -- I, U, D, T: insert, update ... + action realtime.action = ( + case wal ->> 'action' + when 'I' then 'INSERT' + when 'U' then 'UPDATE' + when 'D' then 'DELETE' + else 'ERROR' + end + ); + + -- Is row level security enabled for the table + is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_; + + subscriptions realtime.subscription[] = array_agg(subs) + from + realtime.subscription subs + where + subs.entity = entity_; + + -- Subscription vars + roles regrole[] = array_agg(distinct us.claims_role) + from + unnest(subscriptions) us; + + working_role regrole; + claimed_role regrole; + claims jsonb; + + subscription_id uuid; + subscription_has_access bool; + visible_to_subscription_ids uuid[] = '{}'; + + -- structured info for wal's columns + columns realtime.wal_column[]; + -- previous identity values for update/delete + old_columns realtime.wal_column[]; + + error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes; + + -- Primary jsonb output for record + output jsonb; + + begin + perform set_config('role', null, true); + + columns = + array_agg( + ( + x->>'name', + x->>'type', + x->>'typeoid', + realtime.cast( + (x->'value') #>> '{}', + coalesce( + (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 + (x->>'type')::regtype + ) + ), + (pks ->> 'name') is not null, + true + )::realtime.wal_column + ) + from + jsonb_array_elements(wal -> 'columns') x + left join jsonb_array_elements(wal -> 'pk') pks + on (x ->> 'name') = (pks ->> 'name'); + + old_columns = + array_agg( + ( + x->>'name', + x->>'type', + x->>'typeoid', + realtime.cast( + (x->'value') #>> '{}', + coalesce( + (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 + (x->>'type')::regtype + ) + ), + (pks ->> 'name') is not null, + true + )::realtime.wal_column + ) + from + jsonb_array_elements(wal -> 'identity') x + left join jsonb_array_elements(wal -> 'pk') pks + on (x ->> 'name') = (pks ->> 'name'); + + for working_role in select * from unnest(roles) loop + + -- Update `is_selectable` for columns and old_columns + columns = + array_agg( + ( + c.name, + c.type_name, + c.type_oid, + c.value, + c.is_pkey, + pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') + )::realtime.wal_column + ) + from + unnest(columns) c; + + old_columns = + array_agg( + ( + c.name, + c.type_name, + c.type_oid, + c.value, + c.is_pkey, + pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') + )::realtime.wal_column + ) + from + unnest(old_columns) c; + + if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then + return next ( + jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action + ), + is_rls_enabled, + -- subscriptions is already filtered by entity + (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), + array['Error 400: Bad Request, no primary key'] + )::realtime.wal_rls; + + -- The claims role does not have SELECT permission to the primary key of entity + elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey then + return next ( + jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action + ), + is_rls_enabled, + (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), + array['Error 401: Unauthorized'] + )::realtime.wal_rls; + + else + output = jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action, + 'commit_timestamp', to_char( + ((wal ->> 'timestamp')::timestamptz at time zone 'utc'), + 'YYYY-MM-DD\"T\"HH24:MI:SS.MS\"Z\"' + ), + 'columns', ( + select + jsonb_agg( + jsonb_build_object( + 'name', pa.attname, + 'type', pt.typname + ) + order by pa.attnum asc + ) + from + pg_attribute pa + join pg_type pt + on pa.atttypid = pt.oid + where + attrelid = entity_ + and attnum > 0 + and pg_catalog.has_column_privilege(working_role, entity_, pa.attname, 'SELECT') + ) + ) + -- Add \"record\" key for insert and update + || case + when action in ('INSERT', 'UPDATE') then + jsonb_build_object( + 'record', + ( + select jsonb_object_agg((c).name, (c).value) + from unnest(columns) c + where + (c).is_selectable + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + ) + ) + else '{}'::jsonb + end + -- Add \"old_record\" key for update and delete + || case + when action = 'UPDATE' then + jsonb_build_object( + 'old_record', + ( + select jsonb_object_agg((c).name, (c).value) + from unnest(old_columns) c + where + (c).is_selectable + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + ) + ) + when action = 'DELETE' then + jsonb_build_object( + 'old_record', + ( + select jsonb_object_agg((c).name, (c).value) + from unnest(old_columns) c + where + (c).is_selectable + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + and ( not is_rls_enabled or (c).is_pkey ) -- if RLS enabled, we can't secure deletes so filter to pkey + ) + ) + else '{}'::jsonb + end; + + -- Create the prepared statement + if is_rls_enabled and action <> 'DELETE' then + if (select 1 from pg_prepared_statements where name = 'walrus_rls_stmt' limit 1) > 0 then + deallocate walrus_rls_stmt; + end if; + execute realtime.build_prepared_statement_sql('walrus_rls_stmt', entity_, columns); + end if; + + visible_to_subscription_ids = '{}'; + + for subscription_id, claims in ( + select + subs.subscription_id, + subs.claims + from + unnest(subscriptions) subs + where + subs.entity = entity_ + and subs.claims_role = working_role + and ( + realtime.is_visible_through_filters(columns, subs.filters) + or action = 'DELETE' + ) + ) loop + + if not is_rls_enabled or action = 'DELETE' then + visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; + else + -- Check if RLS allows the role to see the record + perform + set_config('role', working_role::text, true), + set_config('request.jwt.claims', claims::text, true); + + execute 'execute walrus_rls_stmt' into subscription_has_access; + + if subscription_has_access then + visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; + end if; + end if; + end loop; + + perform set_config('role', null, true); + + return next ( + output, + is_rls_enabled, + visible_to_subscription_ids, + case + when error_record_exceeds_max_size then array['Error 413: Payload Too Large'] + else '{}' + end + )::realtime.wal_rls; + + end if; + end loop; + + perform set_config('role', null, true); + end; + $$; + ") + end +end diff --git a/lib/extensions/postgres_cdc_rls/repo/migrations/20230308225145_output_full_record_when_unchanged_toast.ex b/lib/extensions/postgres_cdc_rls/repo/migrations/20230308225145_output_full_record_when_unchanged_toast.ex new file mode 100644 index 0000000..aa08a4f --- /dev/null +++ b/lib/extensions/postgres_cdc_rls/repo/migrations/20230308225145_output_full_record_when_unchanged_toast.ex @@ -0,0 +1,306 @@ +defmodule Realtime.Extensions.Rls.Repo.Migrations.OutputFullRecordWhenUnchangedToast do + @moduledoc false + + use Ecto.Migration + + def change do + execute(" + create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) + returns setof realtime.wal_rls + language plpgsql + volatile + as $$ + declare + -- Regclass of the table e.g. public.notes + entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass; + + -- I, U, D, T: insert, update ... + action realtime.action = ( + case wal ->> 'action' + when 'I' then 'INSERT' + when 'U' then 'UPDATE' + when 'D' then 'DELETE' + else 'ERROR' + end + ); + + -- Is row level security enabled for the table + is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_; + + subscriptions realtime.subscription[] = array_agg(subs) + from + realtime.subscription subs + where + subs.entity = entity_; + + -- Subscription vars + roles regrole[] = array_agg(distinct us.claims_role) + from + unnest(subscriptions) us; + + working_role regrole; + claimed_role regrole; + claims jsonb; + + subscription_id uuid; + subscription_has_access bool; + visible_to_subscription_ids uuid[] = '{}'; + + -- structured info for wal's columns + columns realtime.wal_column[]; + -- previous identity values for update/delete + old_columns realtime.wal_column[]; + + error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes; + + -- Primary jsonb output for record + output jsonb; + + begin + perform set_config('role', null, true); + + columns = + array_agg( + ( + x->>'name', + x->>'type', + x->>'typeoid', + realtime.cast( + (x->'value') #>> '{}', + coalesce( + (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 + (x->>'type')::regtype + ) + ), + (pks ->> 'name') is not null, + true + )::realtime.wal_column + ) + from + jsonb_array_elements(wal -> 'columns') x + left join jsonb_array_elements(wal -> 'pk') pks + on (x ->> 'name') = (pks ->> 'name'); + + old_columns = + array_agg( + ( + x->>'name', + x->>'type', + x->>'typeoid', + realtime.cast( + (x->'value') #>> '{}', + coalesce( + (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 + (x->>'type')::regtype + ) + ), + (pks ->> 'name') is not null, + true + )::realtime.wal_column + ) + from + jsonb_array_elements(wal -> 'identity') x + left join jsonb_array_elements(wal -> 'pk') pks + on (x ->> 'name') = (pks ->> 'name'); + + for working_role in select * from unnest(roles) loop + + -- Update `is_selectable` for columns and old_columns + columns = + array_agg( + ( + c.name, + c.type_name, + c.type_oid, + c.value, + c.is_pkey, + pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') + )::realtime.wal_column + ) + from + unnest(columns) c; + + old_columns = + array_agg( + ( + c.name, + c.type_name, + c.type_oid, + c.value, + c.is_pkey, + pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') + )::realtime.wal_column + ) + from + unnest(old_columns) c; + + if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then + return next ( + jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action + ), + is_rls_enabled, + -- subscriptions is already filtered by entity + (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), + array['Error 400: Bad Request, no primary key'] + )::realtime.wal_rls; + + -- The claims role does not have SELECT permission to the primary key of entity + elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey then + return next ( + jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action + ), + is_rls_enabled, + (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), + array['Error 401: Unauthorized'] + )::realtime.wal_rls; + + else + output = jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action, + 'commit_timestamp', to_char( + ((wal ->> 'timestamp')::timestamptz at time zone 'utc'), + 'YYYY-MM-DD\"T\"HH24:MI:SS.MS\"Z\"' + ), + 'columns', ( + select + jsonb_agg( + jsonb_build_object( + 'name', pa.attname, + 'type', pt.typname + ) + order by pa.attnum asc + ) + from + pg_attribute pa + join pg_type pt + on pa.atttypid = pt.oid + where + attrelid = entity_ + and attnum > 0 + and pg_catalog.has_column_privilege(working_role, entity_, pa.attname, 'SELECT') + ) + ) + -- Add \"record\" key for insert and update + || case + when action in ('INSERT', 'UPDATE') then + jsonb_build_object( + 'record', + ( + select + jsonb_object_agg( + -- if unchanged toast, get column name and value from old record + coalesce((c).name, (oc).name), + case + when (c).name is null then (oc).value + else (c).value + end + ) + from + unnest(columns) c + full outer join unnest(old_columns) oc + on (c).name = (oc).name + where + coalesce((c).is_selectable, (oc).is_selectable) + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + ) + ) + else '{}'::jsonb + end + -- Add \"old_record\" key for update and delete + || case + when action = 'UPDATE' then + jsonb_build_object( + 'old_record', + ( + select jsonb_object_agg((c).name, (c).value) + from unnest(old_columns) c + where + (c).is_selectable + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + ) + ) + when action = 'DELETE' then + jsonb_build_object( + 'old_record', + ( + select jsonb_object_agg((c).name, (c).value) + from unnest(old_columns) c + where + (c).is_selectable + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + and ( not is_rls_enabled or (c).is_pkey ) -- if RLS enabled, we can't secure deletes so filter to pkey + ) + ) + else '{}'::jsonb + end; + + -- Create the prepared statement + if is_rls_enabled and action <> 'DELETE' then + if (select 1 from pg_prepared_statements where name = 'walrus_rls_stmt' limit 1) > 0 then + deallocate walrus_rls_stmt; + end if; + execute realtime.build_prepared_statement_sql('walrus_rls_stmt', entity_, columns); + end if; + + visible_to_subscription_ids = '{}'; + + for subscription_id, claims in ( + select + subs.subscription_id, + subs.claims + from + unnest(subscriptions) subs + where + subs.entity = entity_ + and subs.claims_role = working_role + and ( + realtime.is_visible_through_filters(columns, subs.filters) + or action = 'DELETE' + ) + ) loop + + if not is_rls_enabled or action = 'DELETE' then + visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; + else + -- Check if RLS allows the role to see the record + perform + set_config('role', working_role::text, true), + set_config('request.jwt.claims', claims::text, true); + + execute 'execute walrus_rls_stmt' into subscription_has_access; + + if subscription_has_access then + visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; + end if; + end if; + end loop; + + perform set_config('role', null, true); + + return next ( + output, + is_rls_enabled, + visible_to_subscription_ids, + case + when error_record_exceeds_max_size then array['Error 413: Payload Too Large'] + else '{}' + end + )::realtime.wal_rls; + + end if; + end loop; + + perform set_config('role', null, true); + end; + $$; + ") + end +end diff --git a/lib/extensions/postgres_cdc_rls/subscription_manager.ex b/lib/extensions/postgres_cdc_rls/subscription_manager.ex index e784c6a..37634c2 100644 --- a/lib/extensions/postgres_cdc_rls/subscription_manager.ex +++ b/lib/extensions/postgres_cdc_rls/subscription_manager.ex @@ -94,7 +94,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do def handle_info({:subscribed, {pid, id}}, state) do true = state.subscribers_tid - |> :ets.insert({pid, id, Process.monitor(pid)}) + |> :ets.insert({pid, id, Process.monitor(pid), node(pid)}) {:noreply, %{state | no_users_ts: nil}} end @@ -114,7 +114,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do Logger.warning("Found new oids #{inspect(new_oids, pretty: true)}") Subscriptions.delete_all(conn) - fn {pid, _id, ref}, _acc -> + fn {pid, _id, ref, _node}, _acc -> Process.demonitor(ref, [:flush]) send(pid, :postgres_subscribe) end @@ -136,7 +136,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do q values -> - for {_pid, id, _ref} <- values, reduce: q do + for {_pid, id, _ref, _node} <- values, reduce: q do acc -> UUID.string_to_binary!(id) |> :queue.in(acc) @@ -151,7 +151,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do q1 = if !:queue.is_empty(q) do - {ids, q1} = queue_take(q, @max_delete_records) + {ids, q1} = H.queue_take(q, @max_delete_records) Logger.debug("delete sub id #{inspect(ids)}") case Subscriptions.delete_multi(state.conn, ids) do @@ -203,18 +203,6 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do ## Internal functions - def queue_take(q, count) do - Enum.reduce_while(0..count, {[], q}, fn _, {items, queue} -> - case :queue.out(queue) do - {{:value, item}, new_q} -> - {:cont, {[item | items], new_q}} - - {:empty, new_q} -> - {:halt, {items, new_q}} - end - end) - end - defp check_delete_queue(timeout \\ @timeout) do Process.send_after( self(), diff --git a/lib/extensions/postgres_cdc_rls/subscriptions.ex b/lib/extensions/postgres_cdc_rls/subscriptions.ex index 9c162c1..44d281a 100644 --- a/lib/extensions/postgres_cdc_rls/subscriptions.ex +++ b/lib/extensions/postgres_cdc_rls/subscriptions.ex @@ -58,8 +58,17 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do {:ok, %{num_rows: num} = result} when num > 0 -> result - _ -> - rollback(conn, {:subscription_insert_failed, params}) + {:ok, _} -> + rollback( + conn, + "Subscription insert failed with 0 rows. Check that tables are part of publication #{publication} and subscription params are correct: #{inspect(params)}" + ) + + {:error, exception} -> + rollback( + conn, + "Subscription insert failed with error: #{Exception.message(exception)}. Check that tables are part of publication #{publication} and subscription params are correct: #{inspect(params)}" + ) end {:error, reason} -> @@ -126,6 +135,10 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do case query(conn, sql, [publication]) do {:ok, %{columns: ["schemaname", "tablename", "oid"], rows: rows}} -> Enum.reduce(rows, %{}, fn [schema, table, oid], acc -> + if String.contains?(table, " ") do + Logger.error("Publication table name contains spaces: \"#{schema}\".\"#{table}\"") + end + Map.put(acc, {schema, table}, [oid]) |> Map.update({schema}, [oid], &[oid | &1]) |> Map.update({"*"}, [oid], &[oid | &1]) diff --git a/lib/extensions/postgres_cdc_rls/subscriptions_checker.ex b/lib/extensions/postgres_cdc_rls/subscriptions_checker.ex index 616db08..9404cb7 100644 --- a/lib/extensions/postgres_cdc_rls/subscriptions_checker.ex +++ b/lib/extensions/postgres_cdc_rls/subscriptions_checker.ex @@ -9,16 +9,21 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do alias Realtime.Helpers, as: H @timeout 120_000 + @max_delete_records 1000 defmodule State do @moduledoc false - defstruct [:id, :conn, :check_active_pids, :subscribers_tid] + defstruct [:id, :conn, :check_active_pids, :subscribers_tid, :delete_queue] @type t :: %__MODULE__{ id: String.t(), conn: Postgrex.conn(), check_active_pids: reference(), - subscribers_tid: :ets.tid() + subscribers_tid: :ets.tid(), + delete_queue: %{ + ref: reference(), + queue: :queue.queue() + } } end @@ -42,13 +47,19 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do "subscribers_tid" => subscribers_tid } = args + Logger.metadata(external_id: id, project: id) + {:ok, conn} = H.connect_db(host, port, name, user, pass, socket_opts, 1) state = %State{ id: id, conn: conn, check_active_pids: check_active_pids(), - subscribers_tid: subscribers_tid + subscribers_tid: subscribers_tid, + delete_queue: %{ + ref: nil, + queue: :queue.new() + } } {:ok, state} @@ -57,33 +68,133 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do @impl true def handle_info( :check_active_pids, - %State{check_active_pids: ref, subscribers_tid: tid} = state + %State{check_active_pids: ref, subscribers_tid: tid, delete_queue: delete_queue} = state ) do H.cancel_timer(ref) ids = - fn {pid, postgres_id, _ref}, acc -> - case :rpc.call(node(pid), Process, :alive?, [pid]) do - true -> - acc + subscribers_by_node(tid) + |> not_alive_pids_dist() + |> pop_not_alive_pids(tid) + + new_delete_queue = + if length(ids) > 0 do + q = + Enum.reduce(ids, delete_queue.queue, fn id, acc -> + if :queue.member(id, acc), do: acc, else: :queue.in(id, acc) + end) + + %{ + ref: check_delete_queue(), + queue: q + } + else + delete_queue + end - _ -> - Logger.error("Detected phantom subscriber") - :ets.delete(tid, pid) - [UUID.string_to_binary!(postgres_id) | acc] + {:noreply, %{state | check_active_pids: check_active_pids(), delete_queue: new_delete_queue}} + end + + def handle_info(:check_delete_queue, %State{delete_queue: %{ref: ref, queue: q}} = state) do + H.cancel_timer(ref) + + new_queue = + if !:queue.is_empty(q) do + {ids, q1} = H.queue_take(q, @max_delete_records) + Logger.error("Delete #{length(ids)} phantom subscribers from db") + + case Subscriptions.delete_multi(state.conn, ids) do + {:ok, _} -> + q1 + + {:error, reason} -> + Logger.error("delete phantom subscriptions from the queue failed: #{inspect(reason)}") + q end + else + q end - |> :ets.foldl([], tid) - if length(ids) > 0 do - Subscriptions.delete_multi(state.conn, ids) - end + new_ref = if !:queue.is_empty(new_queue), do: check_delete_queue(), else: ref - {:noreply, %{state | check_active_pids: check_active_pids()}} + {:noreply, %{state | delete_queue: %{ref: new_ref, queue: new_queue}}} end ## Internal functions + @spec pop_not_alive_pids([pid()], :ets.tid()) :: [Ecto.UUID.t()] + def pop_not_alive_pids(pids, tid) do + Enum.reduce(pids, [], fn pid, acc -> + case :ets.lookup(tid, pid) do + [] -> + Logger.error("Can't find pid in subscribers table: #{inspect(pid)}") + acc + + results -> + for {^pid, postgres_id, _ref, _node} <- results do + Logger.error( + "Detected phantom subscriber #{inspect(pid)} with postgres_id #{inspect(postgres_id)}" + ) + + :ets.delete(tid, pid) + UUID.string_to_binary!(postgres_id) + end ++ acc + end + end) + end + + @spec subscribers_by_node(:ets.tid()) :: %{node() => MapSet.t(pid())} + def subscribers_by_node(tid) do + fn {pid, _postgres_id, _ref, node}, acc -> + set = + if Map.has_key?(acc, node) do + MapSet.put(acc[node], pid) + else + MapSet.new([pid]) + end + + Map.put(acc, node, set) + end + |> :ets.foldl(%{}, tid) + end + + @spec not_alive_pids_dist(%{node() => MapSet.t(pid())}) :: [pid()] | [] + def not_alive_pids_dist(pids) do + Enum.reduce(pids, [], fn {node, pids}, acc -> + if node == node() do + acc ++ not_alive_pids(pids) + else + case :rpc.call(node, __MODULE__, :not_alive_pids, [pids], 15_000) do + {:badrpc, _} = error -> + Logger.error("Can't check pids on node #{inspect(node)}: #{inspect(error)}") + acc + + pids -> + acc ++ pids + end + end + end) + end + + @spec not_alive_pids(MapSet.t(pid())) :: [pid()] | [] + def not_alive_pids(pids) do + Enum.reduce(pids, [], fn pid, acc -> + if Process.alive?(pid) do + acc + else + [pid | acc] + end + end) + end + + defp check_delete_queue() do + Process.send_after( + self(), + :check_delete_queue, + 1000 + ) + end + defp check_active_pids() do Process.send_after( self(), diff --git a/lib/extensions/postgres_cdc_rls/supervisor.ex b/lib/extensions/postgres_cdc_rls/supervisor.ex index a26bb95..d6a726a 100644 --- a/lib/extensions/postgres_cdc_rls/supervisor.ex +++ b/lib/extensions/postgres_cdc_rls/supervisor.ex @@ -13,6 +13,8 @@ defmodule Extensions.PostgresCdcRls.Supervisor do @impl true def init(_args) do + load_migrations_modules() + :syn.set_event_handler(Rls.SynHandler) :syn.add_node_to_scopes([Rls]) @@ -28,4 +30,14 @@ defmodule Extensions.PostgresCdcRls.Supervisor do Supervisor.init(children, strategy: :one_for_one) end + + defp load_migrations_modules() do + {:ok, modules} = :application.get_key(:realtime, :modules) + + modules + |> Enum.filter( + &String.starts_with?(to_string(&1), "Elixir.Realtime.Extensions.Rls.Repo.Migrations") + ) + |> Enum.each(&Code.ensure_loaded!/1) + end end diff --git a/lib/realtime/api.ex b/lib/realtime/api.ex index 50c6d7c..b947878 100644 --- a/lib/realtime/api.ex +++ b/lib/realtime/api.ex @@ -6,7 +6,7 @@ defmodule Realtime.Api do import Ecto.Query - alias Realtime.{Repo, Api.Tenant, Api.Extensions, RateCounter, GenCounter} + alias Realtime.{Repo, Api.Tenant, Api.Extensions, RateCounter, GenCounter, Tenants} @doc """ Returns the list of tenants. @@ -28,7 +28,7 @@ defmodule Realtime.Api do def list_tenants(opts) when is_list(opts) do repo_replica = Repo.replica() - field = Keyword.get(opts, :sort_by, "inserted_at") |> String.to_atom() + field = Keyword.get(opts, :order_by, "inserted_at") |> String.to_atom() external_id = Keyword.get(opts, :search) limit = Keyword.get(opts, :limit, 50) order = Keyword.get(opts, :order, "desc") |> String.to_atom() @@ -178,7 +178,7 @@ defmodule Realtime.Api do end def preload_counters(%Tenant{} = tenant) do - id = {:plug, :requests, tenant.external_id} + id = Tenants.requests_per_second_key(tenant) preload_counters(tenant, id) end diff --git a/lib/realtime/helpers.ex b/lib/realtime/helpers.ex index d9d0901..bfbd8e0 100644 --- a/lib/realtime/helpers.ex +++ b/lib/realtime/helpers.ex @@ -141,6 +141,32 @@ defmodule Realtime.Helpers do end end + @doc """ + Takes the first N items from the queue and returns the list of items and the new queue. + + ## Examples + + iex> q = :queue.new() + iex> q = :queue.in(1, q) + iex> q = :queue.in(2, q) + iex> q = :queue.in(3, q) + iex> Realtime.Helpers.queue_take(q, 2) + {[2, 1], {[], [3]}} + """ + + @spec queue_take(:queue.queue(), non_neg_integer()) :: {list(), :queue.queue()} + def queue_take(q, count) do + Enum.reduce_while(1..count, {[], q}, fn _, {items, queue} -> + case :queue.out(queue) do + {{:value, item}, new_q} -> + {:cont, {[item | items], new_q}} + + {:empty, new_q} -> + {:halt, {items, new_q}} + end + end) + end + defp pad(data) do to_add = 16 - rem(byte_size(data), 16) data <> :binary.copy(<>, to_add) diff --git a/lib/realtime/monitoring/prom_ex/plugins/tenant.ex b/lib/realtime/monitoring/prom_ex/plugins/tenant.ex index 9322f2b..6669532 100644 --- a/lib/realtime/monitoring/prom_ex/plugins/tenant.ex +++ b/lib/realtime/monitoring/prom_ex/plugins/tenant.ex @@ -33,10 +33,17 @@ defmodule Realtime.PromEx.Plugins.Tenant do last_value( [:realtime, :connections, :connected], event_name: [:realtime, :connections], - description: "The total count of connected clients for a tenant.", + description: "The node total count of connected clients for a tenant.", measurement: :connected, tags: [:tenant] ), + last_value( + [:realtime, :connections, :connected_cluster], + event_name: [:realtime, :connections], + description: "The cluster total count of connected clients for a tenant.", + measurement: :connected_cluster, + tags: [:tenant] + ), last_value( [:realtime, :connections, :limit_concurrent], event_name: [:realtime, :connections], @@ -53,11 +60,12 @@ defmodule Realtime.PromEx.Plugins.Tenant do for t <- tenants do count = UsersCounter.tenant_users(Node.self(), t) + cluster_count = UsersCounter.tenant_users(t) tenant = Tenants.Cache.get_tenant_by_external_id(t) Telemetry.execute( [:realtime, :connections], - %{connected: count, limit: tenant.max_concurrent_users}, + %{connected: count, connected_cluster: cluster_count, limit: tenant.max_concurrent_users}, %{tenant: t} ) end diff --git a/lib/realtime/postgres_cdc.ex b/lib/realtime/postgres_cdc.ex index 63fab71..6923d85 100644 --- a/lib/realtime/postgres_cdc.ex +++ b/lib/realtime/postgres_cdc.ex @@ -67,11 +67,11 @@ defmodule Realtime.PostgresCdc do "us-west-1" -> "sea" "sa-east-1" -> "iad" "ca-central-1" -> "iad" - "ap-southeast-1" -> "sin" - "ap-northeast-1" -> "sin" - "ap-northeast-2" -> "sin" - "ap-southeast-2" -> "sin" - "ap-south-1" -> "sin" + "ap-southeast-1" -> "syd" + "ap-northeast-1" -> "syd" + "ap-northeast-2" -> "syd" + "ap-southeast-2" -> "syd" + "ap-south-1" -> "syd" "eu-west-1" -> "lhr" "eu-west-2" -> "lhr" "eu-west-3" -> "lhr" diff --git a/lib/realtime/repo.ex b/lib/realtime/repo.ex index 7500e6f..b7259b6 100644 --- a/lib/realtime/repo.ex +++ b/lib/realtime/repo.ex @@ -10,6 +10,7 @@ defmodule Realtime.Repo do "iad" => Realtime.Repo.Replica.IAD, "sin" => Realtime.Repo.Replica.SIN, "maa" => Realtime.Repo.Replica.SIN, + "syd" => Realtime.Repo.Replica.SIN, "lhr" => Realtime.Repo.Replica.FRA, "fra" => Realtime.Repo.Replica.FRA } diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index fc9fe08..0e2e8f8 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -29,13 +29,61 @@ defmodule Realtime.Tenants do @spec limiter_keys(Tenant.t()) :: [{atom(), atom(), String.t()}] def limiter_keys(%Tenant{} = tenant) do [ - {:plug, :requests, tenant.external_id}, - {:channel, :clients_per, tenant.external_id}, - {:channel, :joins, tenant.external_id}, - {:channel, :events, tenant.external_id} + requests_per_second_key(tenant), + channels_per_client_key(tenant), + joins_per_second_key(tenant), + events_per_second_key(tenant) ] end + @doc """ + The GenCounter key to use for counting requests through Plug. + """ + + @spec requests_per_second_key(Tenant.t() | String.t()) :: {:plug, :requests, String.t()} + def requests_per_second_key(%Tenant{} = tenant) do + {:plug, :requests, tenant.external_id} + end + + @doc """ + The GenCounter key to use for counting RealtimeChannel joins. + """ + + @spec joins_per_second_key(Tenant.t() | String.t()) :: {:channel, :joins, String.t()} + def joins_per_second_key(tenant) when is_binary(tenant) do + {:channel, :joins, tenant} + end + + def joins_per_second_key(%Tenant{} = tenant) do + {:channel, :joins, tenant.external_id} + end + + @doc """ + The GenCounter key to use to limit the amount of clients connected to the same same channel. + """ + + @spec channels_per_client_key(Tenant.t() | String.t()) :: {:channel, :clients_per, String.t()} + def channels_per_client_key(tenant) when is_binary(tenant) do + {:channel, :clients_per, tenant} + end + + def channels_per_client_key(%Tenant{} = tenant) do + {:channel, :clients_per, tenant.external_id} + end + + @doc """ + The GenCounter key to use when counting events for RealtimeChannel events. + """ + + @spec events_per_second_key(Tenant.t() | String.t()) :: {:channel, :events, String.t()} + def events_per_second_key(tenant) when is_binary(tenant) do + {:channel, :events, tenant} + end + + def events_per_second_key(%Tenant{} = tenant) do + {:channel, :events, tenant.external_id} + end + @spec get_tenant_limits(Realtime.Api.Tenant.t(), maybe_improper_list) :: list def get_tenant_limits(%Tenant{} = tenant, keys) when is_list(keys) do nodes = [Node.self() | Node.list()] diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 944066f..957b1db 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -9,7 +9,7 @@ defmodule RealtimeWeb.RealtimeChannel do alias DBConnection.Backoff alias Phoenix.Tracker.Shard alias RealtimeWeb.{ChannelsAuthorization, Endpoint, Presence} - alias Realtime.{GenCounter, RateCounter, PostgresCdc, SignalHandler} + alias Realtime.{GenCounter, RateCounter, PostgresCdc, SignalHandler, Tenants} import Realtime.Helpers, only: [cancel_timer: 1, decrypt!: 2] @@ -254,7 +254,6 @@ defmodule RealtimeWeb.RealtimeChannel do @impl true def handle_info(%{event: "postgres_cdc_down"}, socket) do - socket = count(socket) pg_sub_ref = postgres_subscribe() {:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})} @@ -282,7 +281,6 @@ defmodule RealtimeWeb.RealtimeChannel do } } = socket ) do - socket = count(socket) cancel_timer(pg_sub_ref) args = Map.put(postgres_extension, "id", tenant) @@ -317,8 +315,6 @@ defmodule RealtimeWeb.RealtimeChannel do @impl true def handle_info(:confirm_token, %{assigns: %{pg_change_params: pg_change_params}} = socket) do - socket = count(socket) - case confirm_token(socket) do {:ok, claims, confirm_token_ref} -> pg_change_params = Enum.map(pg_change_params, &Map.put(&1, :claims, claims)) @@ -379,7 +375,7 @@ defmodule RealtimeWeb.RealtimeChannel do %{assigns: %{pg_sub_ref: pg_sub_ref, pg_change_params: pg_change_params}} = socket ) when is_binary(refresh_token) do - socket = count(socket) |> assign(:access_token, refresh_token) + socket = socket |> assign(:access_token, refresh_token) case confirm_token(socket) do {:ok, claims, confirm_token_ref} -> @@ -495,7 +491,7 @@ defmodule RealtimeWeb.RealtimeChannel do end def limit_joins(%{assigns: %{tenant: tenant, limits: limits}}) do - id = {:channel, :joins, tenant} + id = Tenants.joins_per_second_key(tenant) GenCounter.new(id) RateCounter.new(id, @@ -524,20 +520,16 @@ defmodule RealtimeWeb.RealtimeChannel do end def limit_channels(%{assigns: %{tenant: tenant, limits: limits}, transport_pid: pid}) do - key = limit_channels_key(tenant) + key = Tenants.channels_per_client_key(tenant) if Registry.count_match(Realtime.Registry, key, pid) > limits.max_channels_per_client do {:error, :too_many_channels} else - Registry.register(Realtime.Registry, limit_channels_key(tenant), pid) + Registry.register(Realtime.Registry, Tenants.channels_per_client_key(tenant), pid) :ok end end - defp limit_channels_key(tenant) do - {:channel, :clients_per, tenant} - end - defp limit_max_users(%{ assigns: %{limits: %{max_concurrent_users: max_conn_users}, tenant: tenant} }) do @@ -551,7 +543,7 @@ defmodule RealtimeWeb.RealtimeChannel do end defp assign_counter(%{assigns: %{tenant: tenant, limits: limits}} = socket) do - key = {:channel, :events, tenant} + key = Tenants.events_per_second_key(tenant) GenCounter.new(key) diff --git a/lib/realtime_web/live/components.ex b/lib/realtime_web/live/components.ex index 0dba69f..064e9f9 100644 --- a/lib/realtime_web/live/components.ex +++ b/lib/realtime_web/live/components.ex @@ -255,7 +255,7 @@ defmodule RealtimeWeb.Components do @doc """ Renders a for field select dropdown. ## Examples - <.select form={f} field={:sort_by} list={@sort_fields} selected={:inserted_at}> + <.select form={f} field={:order_by} list={@sort_fields} selected={:inserted_at}> """ attr :selected, :atom, required: true diff --git a/lib/realtime_web/live/tenants_live/index.ex b/lib/realtime_web/live/tenants_live/index.ex index 7eebb90..0ccd8d9 100644 --- a/lib/realtime_web/live/tenants_live/index.ex +++ b/lib/realtime_web/live/tenants_live/index.ex @@ -13,15 +13,15 @@ defmodule RealtimeWeb.TenantsLive.Index do import Ecto.Changeset schema "f" do - field(:sort_by, :string, default: "inserted_at") - field(:search, :string) - field(:limit, :integer, default: 50) + field(:order_by, :string, default: "inserted_at") + field(:search, :string, default: nil) + field(:limit, :integer, default: 10) field(:order, :string, default: "desc") end def changeset(form, params \\ %{}) do form - |> cast(params, [:sort_by, :search, :limit, :order]) + |> cast(params, [:order_by, :search, :limit, :order]) end def apply_changes_form(changeset) do @@ -61,7 +61,7 @@ defmodule RealtimeWeb.TenantsLive.Index do tenants: Api.list_tenants( search: form.search, - order_by: form.sort_by, + order_by: form.order_by, limit: form.limit, order: form.order ) diff --git a/lib/realtime_web/live/tenants_live/index.html.heex b/lib/realtime_web/live/tenants_live/index.html.heex index 8adf9cf..5cd1fd9 100644 --- a/lib/realtime_web/live/tenants_live/index.html.heex +++ b/lib/realtime_web/live/tenants_live/index.html.heex @@ -6,7 +6,7 @@
<.form :let={f} for={@filter_changeset} phx-change="validate" phx-submit="filter_submit">
- <.select form={f} field={:sort_by} list={@sort_fields} selected={:inserted_at} /> + <.select form={f} field={:order_by} list={@sort_fields} selected={:inserted_at} />
<.select form={f} field={:order} list={[:desc, :asc]} selected={:desc} /> diff --git a/lib/realtime_web/plugs/assign_tenant.ex b/lib/realtime_web/plugs/assign_tenant.ex index c0aad91..3009bb0 100644 --- a/lib/realtime_web/plugs/assign_tenant.ex +++ b/lib/realtime_web/plugs/assign_tenant.ex @@ -12,6 +12,7 @@ defmodule RealtimeWeb.Plugs.AssignTenant do alias Realtime.RateCounter alias Realtime.GenCounter alias Realtime.Api.Tenant + alias Realtime.Tenants def init(opts) do opts @@ -22,9 +23,9 @@ defmodule RealtimeWeb.Plugs.AssignTenant do %Tenant{} = tenant <- Api.get_tenant_by_external_id(external_id) do tenant = tenant - |> tap(&GenCounter.new({:plug, :requests, &1.external_id})) - |> tap(&RateCounter.new({:plug, :requests, &1.external_id}, idle_shutdown: :infinity)) - |> tap(&GenCounter.add({:plug, :requests, &1.external_id})) + |> tap(&GenCounter.new(Tenants.requests_per_second_key(&1))) + |> tap(&RateCounter.new(Tenants.requests_per_second_key(&1), idle_shutdown: :infinity)) + |> tap(&GenCounter.add(Tenants.requests_per_second_key(&1))) |> Api.preload_counters() assign(conn, :tenant, tenant) diff --git a/mix.exs b/mix.exs index 8c3875c..a39f87b 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,8 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.1.1", + # When bumping this version make sure to check + version: "2.7.1", elixir: "~> 1.14.0", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/priv/repo/postgres/migrations/20211122062447_grant_realtime_usage_to_authenticated_role.exs b/priv/repo/postgres/migrations/20211122062447_grant_realtime_usage_to_authenticated_role.exs deleted file mode 100644 index 91af872..0000000 --- a/priv/repo/postgres/migrations/20211122062447_grant_realtime_usage_to_authenticated_role.exs +++ /dev/null @@ -1,7 +0,0 @@ -defmodule Realtime.Repo.Migrations.GrantRealtimeUsageToAuthenticatedRole do - use Ecto.Migration - - def change do - execute "grant usage on schema realtime to authenticated;" - end -end diff --git a/test/realtime/extensions/cdc_rls/subscriptions_checker_test.exs b/test/realtime/extensions/cdc_rls/subscriptions_checker_test.exs new file mode 100644 index 0000000..0b105ba --- /dev/null +++ b/test/realtime/extensions/cdc_rls/subscriptions_checker_test.exs @@ -0,0 +1,83 @@ +defmodule SubscriptionsCheckerTest do + use ExUnit.Case + alias Extensions.PostgresCdcRls.SubscriptionsChecker, as: Checker + + test "subscribers_by_node/1" do + tid = :ets.new(:table, [:public, :bag]) + + test_data = [ + {:pid1, "id1", :ref, :node1}, + {:pid1, "id1.2", :ref, :node1}, + {:pid2, "id2", :ref, :node2} + ] + + :ets.insert(tid, test_data) + + result = Checker.subscribers_by_node(tid) + + assert Checker.subscribers_by_node(tid) == %{ + node1: MapSet.new([:pid1]), + node2: MapSet.new([:pid2]) + } + end + + describe "not_alive_pids/1" do + test "returns empty list for empty input" do + assert Checker.not_alive_pids(MapSet.new()) == [] + end + + test "returns empty list for all alive PIDs" do + pid1 = spawn(fn -> Process.sleep(5000) end) + pid2 = spawn(fn -> Process.sleep(5000) end) + pid3 = spawn(fn -> Process.sleep(5000) end) + assert Checker.not_alive_pids(MapSet.new([pid1, pid2, pid3])) == [] + end + + test "returns list of dead PIDs" do + pid1 = spawn(fn -> Process.sleep(5000) end) + pid2 = spawn(fn -> Process.sleep(5000) end) + pid3 = spawn(fn -> Process.sleep(5000) end) + Process.exit(pid2, :kill) + assert Checker.not_alive_pids(MapSet.new([pid1, pid2, pid3])) == [pid2] + end + end + + describe "pop_not_alive_pids/2" do + test "one subscription per channel" do + tid = :ets.new(:table, [:public, :bag]) + + uuid1 = UUID.uuid1() + uuid2 = UUID.uuid1() + + test_data = [ + {:pid1, uuid1, :ref, :node1}, + {:pid1, uuid2, :ref, :node1}, + {:pid2, "uuid", :ref, :node2} + ] + + :ets.insert(tid, test_data) + + assert Checker.pop_not_alive_pids([:pid1], tid) == [ + UUID.string_to_binary!(uuid1), + UUID.string_to_binary!(uuid2) + ] + + assert :ets.tab2list(tid) == [{:pid2, "uuid", :ref, :node2}] + end + + test "two subscriptions per channel" do + tid = :ets.new(:table, [:public, :bag]) + + uuid1 = UUID.uuid1() + + test_data = [ + {:pid1, uuid1, :ref, :node1}, + {:pid2, "uuid", :ref, :node2} + ] + + :ets.insert(tid, test_data) + assert Checker.pop_not_alive_pids([:pid1], tid) == [UUID.string_to_binary!(uuid1)] + assert :ets.tab2list(tid) == [{:pid2, "uuid", :ref, :node2}] + end + end +end diff --git a/test/realtime/tenants_test.exs b/test/realtime/tenants_test.exs index 28b6d8a..ba254b0 100644 --- a/test/realtime/tenants_test.exs +++ b/test/realtime/tenants_test.exs @@ -53,22 +53,23 @@ defmodule Realtime.TenantsTest do keys = Tenants.limiter_keys(tenant) limits = Tenants.get_tenant_limits(tenant, keys) - [all] = Enum.filter(limits, fn e -> e.limiter == {:plug, :requests, "external_id"} end) + [all] = + Enum.filter(limits, fn e -> e.limiter == Tenants.requests_per_second_key(tenant) end) assert all.counter == 9 [user_channels] = - Enum.filter(limits, fn e -> e.limiter == {:channel, :clients_per, "external_id"} end) + Enum.filter(limits, fn e -> e.limiter == Tenants.channels_per_client_key(tenant) end) assert user_channels.counter == 9 [channel_joins] = - Enum.filter(limits, fn e -> e.limiter == {:channel, :joins, "external_id"} end) + Enum.filter(limits, fn e -> e.limiter == Tenants.joins_per_second_key(tenant) end) assert channel_joins.counter == 9 [tenant_events] = - Enum.filter(limits, fn e -> e.limiter == {:channel, :events, "external_id"} end) + Enum.filter(limits, fn e -> e.limiter == Tenants.events_per_second_key(tenant) end) assert tenant_events.counter == 9 end