diff --git a/build.gradle b/build.gradle index 5970dd77f46..c978f18088f 100644 --- a/build.gradle +++ b/build.gradle @@ -326,10 +326,7 @@ subprojects { // Protocol changes should pin the current version via this override and remove the override in a follow-up PR // when actually using the new protocol. Example to pin KME to v12 when introducing v13: // project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v12', PathValidation.DIRECTORY) - def versionOverrides = [ - project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v37', PathValidation.DIRECTORY), - project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v92', PathValidation.DIRECTORY) - ] + def versionOverrides = [] def schemaDirs = [sourceDir] sourceDir.eachDir { typeDir -> diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index de911b3f7af..277db3183df 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -1407,6 +1407,8 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) { Utils.parseStoreLifecycleHooksListFromString(storeLifecycleHooksStr, Arg.STORE_LIFECYCLE_HOOKS_LIST.toString()); params.setStoreLifecycleHooks(lifecycleHooksList); + booleanParam(cmd, Arg.FLINK_VENICE_VIEWS_ENABLED, p -> params.setFlinkVeniceViewsEnabled(p), argSet); + /** * {@link Arg#REPLICATE_ALL_CONFIGS} doesn't require parameters; once specified, it means true. */ diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java index 21784d27c1a..dec2ee2a33d 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java @@ -255,6 +255,7 @@ public enum Arg { ), VIEW_NAME("view-name", "vn", true, "Name of a store view"), VIEW_CLASS("view-class", "vc", true, "Name of a store view class"), VIEW_PARAMS("view-params", "vp", true, "Additional parameter map of a store view class"), + FLINK_VENICE_VIEWS_ENABLED("flink-venice-views-enabled", "fvve", true, "Enable flink-based views"), REMOVE_VIEW("remove-view", "rv", false, "Optional config to specify to disable certain store view"), PARTITION_DETAIL_ENABLED( "partition-detail-enabled", "pde", true, "A flag to indicate whether to retrieve partition details" diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java index 680349525f9..b534be4143a 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java @@ -48,6 +48,7 @@ import static com.linkedin.venice.Arg.FABRIC; import static com.linkedin.venice.Arg.FABRIC_A; import static com.linkedin.venice.Arg.FABRIC_B; +import static com.linkedin.venice.Arg.FLINK_VENICE_VIEWS_ENABLED; import static com.linkedin.venice.Arg.FORCE; import static com.linkedin.venice.Arg.FUTURE_VERSION_ETL_ENABLED; import static com.linkedin.venice.Arg.GLOBAL_RT_DIV_ENABLED; @@ -326,7 +327,7 @@ public enum Command { NEARLINE_PRODUCER_COMPRESSION_ENABLED, NEARLINE_PRODUCER_COUNT_PER_WRITER, TARGET_SWAP_REGION, TARGET_SWAP_REGION_WAIT_TIME, DAVINCI_HEARTBEAT_REPORTED, ENABLE_STORE_MIGRATION, GLOBAL_RT_DIV_ENABLED, ENUM_SCHEMA_EVOLUTION_ALLOWED, STORE_LIFECYCLE_HOOKS_LIST, BLOB_TRANSFER_IN_SERVER_ENABLED, - KEY_URN_COMPRESSION_EANBLED, KEY_URN_FIELDS } + KEY_URN_COMPRESSION_EANBLED, KEY_URN_FIELDS, FLINK_VENICE_VIEWS_ENABLED } ), UPDATE_CLUSTER_CONFIG( "update-cluster-config", "Update live cluster configs", new Arg[] { URL, CLUSTER }, diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/RecoverStoreMetadata.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/RecoverStoreMetadata.java index 2a4ee0e932f..2380e1299da 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/RecoverStoreMetadata.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/RecoverStoreMetadata.java @@ -263,7 +263,8 @@ public static void recover( .setTargetRegionSwap(deletedStore.getTargetSwapRegion()) .setTargetRegionSwapWaitTime(deletedStore.getTargetSwapRegionWaitTime()) .setIsDavinciHeartbeatReported(deletedStore.getIsDavinciHeartbeatReported()) - .setGlobalRtDivEnabled(deletedStore.isGlobalRtDivEnabled()); + .setGlobalRtDivEnabled(deletedStore.isGlobalRtDivEnabled()) + .setFlinkVeniceViewsEnabled(deletedStore.isFlinkVeniceViewsEnabled()); System.out.println( "Updating store: " + storeName + " in cluster: " + recoverCluster + " with params: " + updateParams.toString()); diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java index a3aab2888ac..cb53c3d9551 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java @@ -11,6 +11,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.expectThrows; import com.fasterxml.jackson.core.JsonProcessingException; @@ -103,7 +104,8 @@ public void testAdminUpdateStoreArg() throws ParseException, IOException { "true", "--venice-etl-strategy", "EXTERNAL_WITH_VENICE_TRIGGER", "--partitioner-params", "{\"" + K1 + "\":\"" + V1 + "\",\"" + K2 + "\":\"" + V2 + "\",\"" + K3 + "\":\"" + V3 + "\"}", "--store-lifecycle-hooks-list", - "[{\"storeLifecycleHooksClassName\":\"com.example.MyHook1\",\"storeLifecycleHooksParams\":{\"paramA\":\"valueA\",\"paramB\":\"valueB\"}},{\"storeLifecycleHooksClassName\":\"com.example.MyHook2\",\"storeLifecycleHooksParams\":{\"foo\":\"bar\"}}]" }; + "[{\"storeLifecycleHooksClassName\":\"com.example.MyHook1\",\"storeLifecycleHooksParams\":{\"paramA\":\"valueA\",\"paramB\":\"valueB\"}},{\"storeLifecycleHooksClassName\":\"com.example.MyHook2\",\"storeLifecycleHooksParams\":{\"foo\":\"bar\"}}]", + "--flink-venice-views-enabled", "true" }; CommandLine commandLine = AdminTool.getCommandLine(args); UpdateStoreQueryParams params = AdminTool.getUpdateStoreQueryParams(commandLine); @@ -132,6 +134,8 @@ public void testAdminUpdateStoreArg() throws ParseException, IOException { Assert.assertTrue(params.getStoreLifecycleHooks().isPresent()); List lifecycleHooksRecords = params.getStoreLifecycleHooks().get(); assertEquals(lifecycleHooksRecords.size(), 2); + assertTrue(params.getFlinkVeniceViewsEnabled().isPresent()); + assertTrue(params.getFlinkVeniceViewsEnabled().get()); } @Test diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java index 2b8e66f02fb..07c1e69cd48 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java @@ -123,6 +123,7 @@ public class ControllerApiConstants { public static final String STORE_VIEW_CLASS = "store_view_class"; public static final String STORE_VIEW_PARAMS = "store_view_params"; public static final String DISABLE_STORE_VIEW = "disable_store_view"; + public static final String FLINK_VENICE_VIEWS_ENABLED = "flink_venice_views_enabled"; public static final String NATIVE_REPLICATION_ENABLED = "native_replication_enabled"; public static final String PUSH_STREAM_SOURCE_ADDRESS = "push_stream_source_address"; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java index e030ffea099..52d6b7b4dae 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java @@ -26,6 +26,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.ENUM_SCHEMA_EVOLUTION_ALLOWED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ETLED_PROXY_USER_ACCOUNT; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ETL_STRATEGY; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.FLINK_VENICE_VIEWS_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.FUTURE_VERSION_ETL_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.GLOBAL_RT_DIV_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.HYBRID_STORE_DISK_QUOTA_ENABLED; @@ -181,7 +182,8 @@ public UpdateStoreQueryParams(StoreInfo srcStore, boolean storeMigrating) { .stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()))) .setKeyUrnCompressionEnabled(srcStore.isKeyUrnCompressionEnabled()) - .setKeyUrnFields(srcStore.getKeyUrnFields()); + .setKeyUrnFields(srcStore.getKeyUrnFields()) + .setFlinkVeniceViewsEnabled(srcStore.isFlinkVeniceViewsEnabled()); if (srcStore.getReplicationMetadataVersionId() != -1) { updateStoreQueryParams.setReplicationMetadataVersionID(srcStore.getReplicationMetadataVersionId()); @@ -542,6 +544,14 @@ public Optional> getStoreViews() { return getStringMap(STORE_VIEW); } + public UpdateStoreQueryParams setFlinkVeniceViewsEnabled(boolean flinkVeniceViewsEnabled) { + return putBoolean(FLINK_VENICE_VIEWS_ENABLED, flinkVeniceViewsEnabled); + } + + public Optional getFlinkVeniceViewsEnabled() { + return getBoolean(FLINK_VENICE_VIEWS_ENABLED); + } + public UpdateStoreQueryParams setPushStreamSourceAddress(String pushStreamSourceAddress) { return putString(PUSH_STREAM_SOURCE_ADDRESS, pushStreamSourceAddress); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java index a8ebdd33e3b..7fdb6a521ee 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java @@ -1093,6 +1093,16 @@ public Map getViewConfigs() { .collect(Collectors.toMap(Map.Entry::getKey, e -> new ReadOnlyViewConfig(e.getValue())))); } + @Override + public boolean isFlinkVeniceViewsEnabled() { + return this.delegate.isFlinkVeniceViewsEnabled(); + } + + @Override + public void setFlinkVeniceViewsEnabled(boolean flinkVeniceViewsEnabled) { + throw new UnsupportedOperationException(); + } + @Override public void setViewConfigs(Map viewConfigList) { throw new UnsupportedOperationException(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java index 1fc26a28ab5..15cbec99e34 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java @@ -116,6 +116,10 @@ static boolean isSystemStore(String storeName) { void setViewConfigs(Map viewConfigMap); + boolean isFlinkVeniceViewsEnabled(); + + void setFlinkVeniceViewsEnabled(boolean flinkVeniceViewsEnabled); + boolean isHybrid(); CompressionStrategy getCompressionStrategy(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreInfo.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreInfo.java index 1aa70d5b3f8..d767f02f810 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreInfo.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreInfo.java @@ -93,6 +93,7 @@ public static StoreInfo fromStore(Store store) { storeInfo.setLatestVersionPromoteToCurrentTimestamp(store.getLatestVersionPromoteToCurrentTimestamp()); storeInfo.setKeyUrnCompressionEnabled(store.isKeyUrnCompressionEnabled()); storeInfo.setKeyUrnFields(store.getKeyUrnFields()); + storeInfo.setFlinkVeniceViewsEnabled(store.isFlinkVeniceViewsEnabled()); return storeInfo; } @@ -376,6 +377,7 @@ public static StoreInfo fromStore(Store store) { private long getLatestVersionPromoteToCurrentTimestamp; private boolean keyUrnCompressionEnabled = false; private List keyUrnFields = new ArrayList<>(); + private boolean flinkVeniceViewsEnabled = false; public StoreInfo() { } @@ -978,6 +980,14 @@ public void setEnumSchemaEvolutionAllowed(boolean enumSchemaEvolutionAllowed) { this.enumSchemaEvolutionAllowed = enumSchemaEvolutionAllowed; } + public boolean isFlinkVeniceViewsEnabled() { + return flinkVeniceViewsEnabled; + } + + public void setFlinkVeniceViewsEnabled(boolean flinkVeniceViewsEnabled) { + this.flinkVeniceViewsEnabled = flinkVeniceViewsEnabled; + } + public List getStoreLifecycleHooks() { return this.storeLifecycleHooks; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java index 2490cddad1c..04e6b7c7a7c 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java @@ -287,6 +287,16 @@ public void setViewConfigs(Map viewConfigList) { throwUnsupportedOperationException("setViewConfig"); } + @Override + public boolean isFlinkVeniceViewsEnabled() { + return zkSharedStore.isFlinkVeniceViewsEnabled(); + } + + @Override + public void setFlinkVeniceViewsEnabled(boolean flinkVeniceViewsEnabled) { + throwUnsupportedOperationException("setFlinkVeniceViewsEnabled"); + } + @Override public boolean isHybrid() { return zkSharedStore.isHybrid(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ZKStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ZKStore.java index 9dfb9c1bae4..0da95cd22f8 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ZKStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ZKStore.java @@ -246,6 +246,7 @@ public ZKStore(Store store) { setStoreLifecycleHooks(store.getStoreLifecycleHooks()); setKeyUrnCompressionEnabled(store.isKeyUrnCompressionEnabled()); setKeyUrnFields(store.getKeyUrnFields()); + setFlinkVeniceViewsEnabled(store.isFlinkVeniceViewsEnabled()); for (Version storeVersion: store.getVersions()) { forceAddVersion(storeVersion.cloneVersion(), true); @@ -513,6 +514,16 @@ public void setViewConfigs(Map viewConfigList) { } } + @Override + public boolean isFlinkVeniceViewsEnabled() { + return this.storeProperties.flinkVeniceViewsEnabled; + } + + @Override + public void setFlinkVeniceViewsEnabled(boolean flinkVeniceViewsEnabled) { + this.storeProperties.flinkVeniceViewsEnabled = flinkVeniceViewsEnabled; + } + @Override public boolean isHybrid() { return this.storeProperties.hybridConfig != null; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java index d328bb20867..3c31e74c04a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java @@ -77,7 +77,7 @@ public enum AvroProtocolDefinition { * * TODO: Move AdminOperation to venice-common module so that we can properly reference it here. */ - ADMIN_OPERATION(92, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"), + ADMIN_OPERATION(93, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"), /** * Single chunk of a large multi-chunk value. Just a bunch of bytes. @@ -148,7 +148,7 @@ public enum AvroProtocolDefinition { /** * Value schema for metadata system store. */ - METADATA_SYSTEM_SCHEMA_STORE(37, StoreMetaValue.class), + METADATA_SYSTEM_SCHEMA_STORE(38, StoreMetaValue.class), /* Value Schema for Parent Controller Metadata system store diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestStoreInfo.java b/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestStoreInfo.java index c5a1c05a493..1f8bf6a8a59 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestStoreInfo.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestStoreInfo.java @@ -100,4 +100,15 @@ public void testBlobTransferStoreLevelConfigs() { assertEquals(ActivationState.ENABLED.name(), storeInfo.getBlobTransferInServerEnabled()); assertTrue(storeInfo.isBlobTransferEnabled()); } + + @Test + public void testFlinkVeniceViewsEnabled() { + StoreInfo storeInfo = new StoreInfo(); + // check default value + assertFalse(storeInfo.isFlinkVeniceViewsEnabled()); + // setting value + storeInfo.setFlinkVeniceViewsEnabled(true); + // check updated value + assertTrue(storeInfo.isFlinkVeniceViewsEnabled()); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java index e3f5b654ed3..6588d384f99 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java @@ -1038,6 +1038,7 @@ private void testUpdateConfigs(ControllerClient parentControllerClient, Controll testUpdateCompactionEnabled(parentControllerClient, childControllerClient); testUpdateCompactionThreshold(parentControllerClient, childControllerClient); testUpdateEnumSchemaEvolution(parentControllerClient, childControllerClient); + testUpdateStoreFlinkVeniceViewsEnable(parentControllerClient, childControllerClient); } /** @@ -1182,6 +1183,14 @@ private void testUpdateEnumSchemaEvolution(ControllerClient parentClient, Contro response -> Assert.assertTrue(response.getStore().isEnumSchemaEvolutionAllowed())); } + private void testUpdateStoreFlinkVeniceViewsEnable(ControllerClient parentClient, ControllerClient childClient) { + testUpdateConfig( + parentClient, + childClient, + params -> params.setFlinkVeniceViewsEnabled(true), + response -> Assert.assertTrue(response.getStore().isFlinkVeniceViewsEnabled())); + } + private void testAddBadValueSchema(ControllerClient parentControllerClient) { // Adding store String storeName = Utils.getUniqueString("test_store"); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index fd66b01853b..0df4b36726f 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -5855,6 +5855,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto Optional> storeLifecycleHooks = params.getStoreLifecycleHooks(); Optional keyUrnCompressionEnabled = params.getKeyUrnCompressionEnabled(); Optional> keyUrnFields = params.getKeyUrnFields(); + Optional flinkVeniceViewsEnabled = params.getFlinkVeniceViewsEnabled(); final Optional newHybridStoreConfig; if (hybridRewindSeconds.isPresent() || hybridOffsetLagThreshold.isPresent() || hybridTimeLagThreshold.isPresent() @@ -6221,6 +6222,11 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto return store; })); + flinkVeniceViewsEnabled.ifPresent(aBool -> storeMetadataUpdate(clusterName, storeName, (store, resources) -> { + store.setFlinkVeniceViewsEnabled(aBool); + return store; + })); + LOGGER.info("Finished updating store: {} in cluster: {}", storeName, clusterName); } catch (VeniceException e) { LOGGER.error( diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 459dd7b43ef..45670d451d6 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -28,6 +28,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.ENUM_SCHEMA_EVOLUTION_ALLOWED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ETLED_PROXY_USER_ACCOUNT; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ETL_STRATEGY; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.FLINK_VENICE_VIEWS_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.FUTURE_VERSION_ETL_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.GLOBAL_RT_DIV_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.HYBRID_STORE_DISK_QUOTA_ENABLED; @@ -2677,6 +2678,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa Optional latestSupersetSchemaId = params.getLatestSupersetSchemaId(); Optional unusedSchemaDeletionEnabled = params.getUnusedSchemaDeletionEnabled(); Optional> storeLifecycleHooks = params.getStoreLifecycleHooks(); + Optional flinkVeniceViewsEnabled = params.getFlinkVeniceViewsEnabled(); /** * Check whether parent controllers will only propagate the update configs to child controller, or all unchanged @@ -2777,6 +2779,11 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa updatedConfigsList.add(STORE_VIEW); } + if (flinkVeniceViewsEnabled.isPresent()) { + setStore.flinkVeniceViewsEnabled = flinkVeniceViewsEnabled.get(); + updatedConfigsList.add(FLINK_VENICE_VIEWS_ENABLED); + } + // Only update fields that are set, other fields will be read from the original store's partitioner config. PartitionerConfig updatedPartitionerConfig = VeniceHelixAdmin.mergeNewSettingsIntoOldPartitionerConfig( currStore, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java index 3d59a957d19..88985320936 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java @@ -512,6 +512,7 @@ private void handleSetStore(UpdateStore message) { .setTargetRegionSwapWaitTime(message.targetSwapRegionWaitTime) .setIsDavinciHeartbeatReported(message.isDaVinciHeartBeatReported) .setGlobalRtDivEnabled(message.globalRtDivEnabled) + .setFlinkVeniceViewsEnabled(message.flinkVeniceViewsEnabled) .setEnumSchemaEvolutionAllowed(message.enumSchemaEvolutionAllowed) .setKeyUrnCompressionEnabled(message.keyUrnCompressionEnabled) .setKeyUrnFields(message.keyUrnFields.stream().map(Object::toString).collect(Collectors.toList())); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index 9759b70fc4e..8bc7b7c42da 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -3337,6 +3337,50 @@ public void testUpdateStoreETLConfig() { Assert.assertEquals(etlStoreConfigRecord.etlStrategy, VeniceETLStrategy.EXTERNAL_SERVICE.getValue()); } + @Test + public void testUpdateStoreFlinkVeniceViewsEnable() { + String storeName = Utils.getUniqueString("testUpdateStoreFlinkVeniceViewsEnable"); + Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); + doReturn(store).when(internalAdmin).getStore(clusterName, storeName); + + when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) + .thenReturn( + AdminTopicMetadataAccessor.generateMetadataMap( + Optional.of(1L), + Optional.of(-1L), + Optional.of(1L), + Optional.of(LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION))); + + parentAdmin.updateStore(clusterName, storeName, new UpdateStoreQueryParams().setFlinkVeniceViewsEnabled(true)); + + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor valueCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor schemaCaptor = ArgumentCaptor.forClass(Integer.class); + verify(veniceWriter).put( + keyCaptor.capture(), + valueCaptor.capture(), + schemaCaptor.capture(), + any(), + any(), + anyLong(), + any(), + any(), + any(), + any()); + + byte[] keyBytes = keyCaptor.getValue(); + byte[] valueBytes = valueCaptor.getValue(); + int schemaId = schemaCaptor.getValue(); + assertEquals(schemaId, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); + assertEquals(keyBytes.length, 0); + + AdminOperation adminMessage = adminOperationSerializer.deserialize(ByteBuffer.wrap(valueBytes), schemaId); + assertEquals(adminMessage.operationType, AdminMessageType.UPDATE_STORE.getValue()); + + UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion; + assertTrue(updateStore.flinkVeniceViewsEnabled); + } + private Store setupForStoreViewConfigUpdateTest(String storeName) { Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); store.setActiveActiveReplicationEnabled(true);