Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,7 @@ subprojects {
// Protocol changes should pin the current version via this override and remove the override in a follow-up PR
// when actually using the new protocol. Example to pin KME to v12 when introducing v13:
// project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v12', PathValidation.DIRECTORY)
def versionOverrides = [
project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v37', PathValidation.DIRECTORY),
project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v92', PathValidation.DIRECTORY)
]
def versionOverrides = []

def schemaDirs = [sourceDir]
sourceDir.eachDir { typeDir ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,8 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) {
Utils.parseStoreLifecycleHooksListFromString(storeLifecycleHooksStr, Arg.STORE_LIFECYCLE_HOOKS_LIST.toString());
params.setStoreLifecycleHooks(lifecycleHooksList);

booleanParam(cmd, Arg.FLINK_VENICE_VIEWS_ENABLED, p -> params.setFlinkVeniceViewsEnabled(p), argSet);

/**
* {@link Arg#REPLICATE_ALL_CONFIGS} doesn't require parameters; once specified, it means true.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ public enum Arg {
), VIEW_NAME("view-name", "vn", true, "Name of a store view"),
VIEW_CLASS("view-class", "vc", true, "Name of a store view class"),
VIEW_PARAMS("view-params", "vp", true, "Additional parameter map of a store view class"),
FLINK_VENICE_VIEWS_ENABLED("flink-venice-views-enabled", "fvve", true, "Enable flink-based views"),
REMOVE_VIEW("remove-view", "rv", false, "Optional config to specify to disable certain store view"),
PARTITION_DETAIL_ENABLED(
"partition-detail-enabled", "pde", true, "A flag to indicate whether to retrieve partition details"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static com.linkedin.venice.Arg.FABRIC;
import static com.linkedin.venice.Arg.FABRIC_A;
import static com.linkedin.venice.Arg.FABRIC_B;
import static com.linkedin.venice.Arg.FLINK_VENICE_VIEWS_ENABLED;
import static com.linkedin.venice.Arg.FORCE;
import static com.linkedin.venice.Arg.FUTURE_VERSION_ETL_ENABLED;
import static com.linkedin.venice.Arg.GLOBAL_RT_DIV_ENABLED;
Expand Down Expand Up @@ -326,7 +327,7 @@ public enum Command {
NEARLINE_PRODUCER_COMPRESSION_ENABLED, NEARLINE_PRODUCER_COUNT_PER_WRITER, TARGET_SWAP_REGION,
TARGET_SWAP_REGION_WAIT_TIME, DAVINCI_HEARTBEAT_REPORTED, ENABLE_STORE_MIGRATION, GLOBAL_RT_DIV_ENABLED,
ENUM_SCHEMA_EVOLUTION_ALLOWED, STORE_LIFECYCLE_HOOKS_LIST, BLOB_TRANSFER_IN_SERVER_ENABLED,
KEY_URN_COMPRESSION_EANBLED, KEY_URN_FIELDS }
KEY_URN_COMPRESSION_EANBLED, KEY_URN_FIELDS, FLINK_VENICE_VIEWS_ENABLED }
),
UPDATE_CLUSTER_CONFIG(
"update-cluster-config", "Update live cluster configs", new Arg[] { URL, CLUSTER },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ public static void recover(
.setTargetRegionSwap(deletedStore.getTargetSwapRegion())
.setTargetRegionSwapWaitTime(deletedStore.getTargetSwapRegionWaitTime())
.setIsDavinciHeartbeatReported(deletedStore.getIsDavinciHeartbeatReported())
.setGlobalRtDivEnabled(deletedStore.isGlobalRtDivEnabled());
.setGlobalRtDivEnabled(deletedStore.isGlobalRtDivEnabled())
.setFlinkVeniceViewsEnabled(deletedStore.isFlinkVeniceViewsEnabled());
System.out.println(
"Updating store: " + storeName + " in cluster: " + recoverCluster + " with params: "
+ updateParams.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -103,7 +104,8 @@ public void testAdminUpdateStoreArg() throws ParseException, IOException {
"true", "--venice-etl-strategy", "EXTERNAL_WITH_VENICE_TRIGGER", "--partitioner-params",
"{\"" + K1 + "\":\"" + V1 + "\",\"" + K2 + "\":\"" + V2 + "\",\"" + K3 + "\":\"" + V3 + "\"}",
"--store-lifecycle-hooks-list",
"[{\"storeLifecycleHooksClassName\":\"com.example.MyHook1\",\"storeLifecycleHooksParams\":{\"paramA\":\"valueA\",\"paramB\":\"valueB\"}},{\"storeLifecycleHooksClassName\":\"com.example.MyHook2\",\"storeLifecycleHooksParams\":{\"foo\":\"bar\"}}]" };
"[{\"storeLifecycleHooksClassName\":\"com.example.MyHook1\",\"storeLifecycleHooksParams\":{\"paramA\":\"valueA\",\"paramB\":\"valueB\"}},{\"storeLifecycleHooksClassName\":\"com.example.MyHook2\",\"storeLifecycleHooksParams\":{\"foo\":\"bar\"}}]",
"--flink-venice-views-enabled", "true" };

CommandLine commandLine = AdminTool.getCommandLine(args);
UpdateStoreQueryParams params = AdminTool.getUpdateStoreQueryParams(commandLine);
Expand Down Expand Up @@ -132,6 +134,8 @@ public void testAdminUpdateStoreArg() throws ParseException, IOException {
Assert.assertTrue(params.getStoreLifecycleHooks().isPresent());
List<LifecycleHooksRecord> lifecycleHooksRecords = params.getStoreLifecycleHooks().get();
assertEquals(lifecycleHooksRecords.size(), 2);
assertTrue(params.getFlinkVeniceViewsEnabled().isPresent());
assertTrue(params.getFlinkVeniceViewsEnabled().get());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class ControllerApiConstants {
public static final String STORE_VIEW_CLASS = "store_view_class";
public static final String STORE_VIEW_PARAMS = "store_view_params";
public static final String DISABLE_STORE_VIEW = "disable_store_view";
public static final String FLINK_VENICE_VIEWS_ENABLED = "flink_venice_views_enabled";

public static final String NATIVE_REPLICATION_ENABLED = "native_replication_enabled";
public static final String PUSH_STREAM_SOURCE_ADDRESS = "push_stream_source_address";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.ENUM_SCHEMA_EVOLUTION_ALLOWED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.ETLED_PROXY_USER_ACCOUNT;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.ETL_STRATEGY;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.FLINK_VENICE_VIEWS_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.FUTURE_VERSION_ETL_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.GLOBAL_RT_DIV_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.HYBRID_STORE_DISK_QUOTA_ENABLED;
Expand Down Expand Up @@ -181,7 +182,8 @@ public UpdateStoreQueryParams(StoreInfo srcStore, boolean storeMigrating) {
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())))
.setKeyUrnCompressionEnabled(srcStore.isKeyUrnCompressionEnabled())
.setKeyUrnFields(srcStore.getKeyUrnFields());
.setKeyUrnFields(srcStore.getKeyUrnFields())
.setFlinkVeniceViewsEnabled(srcStore.isFlinkVeniceViewsEnabled());

if (srcStore.getReplicationMetadataVersionId() != -1) {
updateStoreQueryParams.setReplicationMetadataVersionID(srcStore.getReplicationMetadataVersionId());
Expand Down Expand Up @@ -542,6 +544,14 @@ public Optional<Map<String, String>> getStoreViews() {
return getStringMap(STORE_VIEW);
}

public UpdateStoreQueryParams setFlinkVeniceViewsEnabled(boolean flinkVeniceViewsEnabled) {
return putBoolean(FLINK_VENICE_VIEWS_ENABLED, flinkVeniceViewsEnabled);
}

public Optional<Boolean> getFlinkVeniceViewsEnabled() {
return getBoolean(FLINK_VENICE_VIEWS_ENABLED);
}

public UpdateStoreQueryParams setPushStreamSourceAddress(String pushStreamSourceAddress) {
return putString(PUSH_STREAM_SOURCE_ADDRESS, pushStreamSourceAddress);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,16 @@ public Map<String, ViewConfig> getViewConfigs() {
.collect(Collectors.toMap(Map.Entry::getKey, e -> new ReadOnlyViewConfig(e.getValue()))));
}

@Override
public boolean isFlinkVeniceViewsEnabled() {
return this.delegate.isFlinkVeniceViewsEnabled();
}

@Override
public void setFlinkVeniceViewsEnabled(boolean flinkVeniceViewsEnabled) {
throw new UnsupportedOperationException();
}

@Override
public void setViewConfigs(Map<String, ViewConfig> viewConfigList) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ static boolean isSystemStore(String storeName) {

void setViewConfigs(Map<String, ViewConfig> viewConfigMap);

boolean isFlinkVeniceViewsEnabled();

void setFlinkVeniceViewsEnabled(boolean flinkVeniceViewsEnabled);

boolean isHybrid();

CompressionStrategy getCompressionStrategy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public static StoreInfo fromStore(Store store) {
storeInfo.setLatestVersionPromoteToCurrentTimestamp(store.getLatestVersionPromoteToCurrentTimestamp());
storeInfo.setKeyUrnCompressionEnabled(store.isKeyUrnCompressionEnabled());
storeInfo.setKeyUrnFields(store.getKeyUrnFields());
storeInfo.setFlinkVeniceViewsEnabled(store.isFlinkVeniceViewsEnabled());
return storeInfo;
}

Expand Down Expand Up @@ -376,6 +377,7 @@ public static StoreInfo fromStore(Store store) {
private long getLatestVersionPromoteToCurrentTimestamp;
private boolean keyUrnCompressionEnabled = false;
private List<String> keyUrnFields = new ArrayList<>();
private boolean flinkVeniceViewsEnabled = false;

public StoreInfo() {
}
Expand Down Expand Up @@ -978,6 +980,14 @@ public void setEnumSchemaEvolutionAllowed(boolean enumSchemaEvolutionAllowed) {
this.enumSchemaEvolutionAllowed = enumSchemaEvolutionAllowed;
}

public boolean isFlinkVeniceViewsEnabled() {
return flinkVeniceViewsEnabled;
}

public void setFlinkVeniceViewsEnabled(boolean flinkVeniceViewsEnabled) {
this.flinkVeniceViewsEnabled = flinkVeniceViewsEnabled;
}

public List<LifecycleHooksRecord> getStoreLifecycleHooks() {
return this.storeLifecycleHooks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,16 @@ public void setViewConfigs(Map<String, ViewConfig> viewConfigList) {
throwUnsupportedOperationException("setViewConfig");
}

@Override
public boolean isFlinkVeniceViewsEnabled() {
return zkSharedStore.isFlinkVeniceViewsEnabled();
}

@Override
public void setFlinkVeniceViewsEnabled(boolean flinkVeniceViewsEnabled) {
throwUnsupportedOperationException("setFlinkVeniceViewsEnabled");
}

@Override
public boolean isHybrid() {
return zkSharedStore.isHybrid();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ public ZKStore(Store store) {
setStoreLifecycleHooks(store.getStoreLifecycleHooks());
setKeyUrnCompressionEnabled(store.isKeyUrnCompressionEnabled());
setKeyUrnFields(store.getKeyUrnFields());
setFlinkVeniceViewsEnabled(store.isFlinkVeniceViewsEnabled());

for (Version storeVersion: store.getVersions()) {
forceAddVersion(storeVersion.cloneVersion(), true);
Expand Down Expand Up @@ -513,6 +514,16 @@ public void setViewConfigs(Map<String, ViewConfig> viewConfigList) {
}
}

@Override
public boolean isFlinkVeniceViewsEnabled() {
return this.storeProperties.flinkVeniceViewsEnabled;
}

@Override
public void setFlinkVeniceViewsEnabled(boolean flinkVeniceViewsEnabled) {
this.storeProperties.flinkVeniceViewsEnabled = flinkVeniceViewsEnabled;
}

@Override
public boolean isHybrid() {
return this.storeProperties.hybridConfig != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public enum AvroProtocolDefinition {
*
* TODO: Move AdminOperation to venice-common module so that we can properly reference it here.
*/
ADMIN_OPERATION(92, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),
ADMIN_OPERATION(93, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),

/**
* Single chunk of a large multi-chunk value. Just a bunch of bytes.
Expand Down Expand Up @@ -148,7 +148,7 @@ public enum AvroProtocolDefinition {
/**
* Value schema for metadata system store.
*/
METADATA_SYSTEM_SCHEMA_STORE(37, StoreMetaValue.class),
METADATA_SYSTEM_SCHEMA_STORE(38, StoreMetaValue.class),

/*
Value Schema for Parent Controller Metadata system store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,15 @@ public void testBlobTransferStoreLevelConfigs() {
assertEquals(ActivationState.ENABLED.name(), storeInfo.getBlobTransferInServerEnabled());
assertTrue(storeInfo.isBlobTransferEnabled());
}

@Test
public void testFlinkVeniceViewsEnabled() {
StoreInfo storeInfo = new StoreInfo();
// check default value
assertFalse(storeInfo.isFlinkVeniceViewsEnabled());
// setting value
storeInfo.setFlinkVeniceViewsEnabled(true);
// check updated value
assertTrue(storeInfo.isFlinkVeniceViewsEnabled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ private void testUpdateConfigs(ControllerClient parentControllerClient, Controll
testUpdateCompactionEnabled(parentControllerClient, childControllerClient);
testUpdateCompactionThreshold(parentControllerClient, childControllerClient);
testUpdateEnumSchemaEvolution(parentControllerClient, childControllerClient);
testUpdateStoreFlinkVeniceViewsEnable(parentControllerClient, childControllerClient);
}

/**
Expand Down Expand Up @@ -1182,6 +1183,14 @@ private void testUpdateEnumSchemaEvolution(ControllerClient parentClient, Contro
response -> Assert.assertTrue(response.getStore().isEnumSchemaEvolutionAllowed()));
}

private void testUpdateStoreFlinkVeniceViewsEnable(ControllerClient parentClient, ControllerClient childClient) {
testUpdateConfig(
parentClient,
childClient,
params -> params.setFlinkVeniceViewsEnabled(true),
response -> Assert.assertTrue(response.getStore().isFlinkVeniceViewsEnabled()));
}

private void testAddBadValueSchema(ControllerClient parentControllerClient) {
// Adding store
String storeName = Utils.getUniqueString("test_store");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5855,6 +5855,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto
Optional<List<LifecycleHooksRecord>> storeLifecycleHooks = params.getStoreLifecycleHooks();
Optional<Boolean> keyUrnCompressionEnabled = params.getKeyUrnCompressionEnabled();
Optional<List<String>> keyUrnFields = params.getKeyUrnFields();
Optional<Boolean> flinkVeniceViewsEnabled = params.getFlinkVeniceViewsEnabled();

final Optional<HybridStoreConfig> newHybridStoreConfig;
if (hybridRewindSeconds.isPresent() || hybridOffsetLagThreshold.isPresent() || hybridTimeLagThreshold.isPresent()
Expand Down Expand Up @@ -6221,6 +6222,11 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto
return store;
}));

flinkVeniceViewsEnabled.ifPresent(aBool -> storeMetadataUpdate(clusterName, storeName, (store, resources) -> {
store.setFlinkVeniceViewsEnabled(aBool);
return store;
}));

LOGGER.info("Finished updating store: {} in cluster: {}", storeName, clusterName);
} catch (VeniceException e) {
LOGGER.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.ENUM_SCHEMA_EVOLUTION_ALLOWED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.ETLED_PROXY_USER_ACCOUNT;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.ETL_STRATEGY;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.FLINK_VENICE_VIEWS_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.FUTURE_VERSION_ETL_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.GLOBAL_RT_DIV_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.HYBRID_STORE_DISK_QUOTA_ENABLED;
Expand Down Expand Up @@ -2677,6 +2678,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
Optional<Integer> latestSupersetSchemaId = params.getLatestSupersetSchemaId();
Optional<Boolean> unusedSchemaDeletionEnabled = params.getUnusedSchemaDeletionEnabled();
Optional<List<LifecycleHooksRecord>> storeLifecycleHooks = params.getStoreLifecycleHooks();
Optional<Boolean> flinkVeniceViewsEnabled = params.getFlinkVeniceViewsEnabled();

/**
* Check whether parent controllers will only propagate the update configs to child controller, or all unchanged
Expand Down Expand Up @@ -2777,6 +2779,11 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
updatedConfigsList.add(STORE_VIEW);
}

if (flinkVeniceViewsEnabled.isPresent()) {
setStore.flinkVeniceViewsEnabled = flinkVeniceViewsEnabled.get();
updatedConfigsList.add(FLINK_VENICE_VIEWS_ENABLED);
}

// Only update fields that are set, other fields will be read from the original store's partitioner config.
PartitionerConfig updatedPartitionerConfig = VeniceHelixAdmin.mergeNewSettingsIntoOldPartitionerConfig(
currStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ private void handleSetStore(UpdateStore message) {
.setTargetRegionSwapWaitTime(message.targetSwapRegionWaitTime)
.setIsDavinciHeartbeatReported(message.isDaVinciHeartBeatReported)
.setGlobalRtDivEnabled(message.globalRtDivEnabled)
.setFlinkVeniceViewsEnabled(message.flinkVeniceViewsEnabled)
.setEnumSchemaEvolutionAllowed(message.enumSchemaEvolutionAllowed)
.setKeyUrnCompressionEnabled(message.keyUrnCompressionEnabled)
.setKeyUrnFields(message.keyUrnFields.stream().map(Object::toString).collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3337,6 +3337,50 @@ public void testUpdateStoreETLConfig() {
Assert.assertEquals(etlStoreConfigRecord.etlStrategy, VeniceETLStrategy.EXTERNAL_SERVICE.getValue());
}

@Test
public void testUpdateStoreFlinkVeniceViewsEnable() {
String storeName = Utils.getUniqueString("testUpdateStoreFlinkVeniceViewsEnable");
Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis());
doReturn(store).when(internalAdmin).getStore(clusterName, storeName);

when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null)
.thenReturn(
AdminTopicMetadataAccessor.generateMetadataMap(
Optional.of(1L),
Optional.of(-1L),
Optional.of(1L),
Optional.of(LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION)));

parentAdmin.updateStore(clusterName, storeName, new UpdateStoreQueryParams().setFlinkVeniceViewsEnabled(true));

ArgumentCaptor<byte[]> keyCaptor = ArgumentCaptor.forClass(byte[].class);
ArgumentCaptor<byte[]> valueCaptor = ArgumentCaptor.forClass(byte[].class);
ArgumentCaptor<Integer> schemaCaptor = ArgumentCaptor.forClass(Integer.class);
verify(veniceWriter).put(
keyCaptor.capture(),
valueCaptor.capture(),
schemaCaptor.capture(),
any(),
any(),
anyLong(),
any(),
any(),
any(),
any());

byte[] keyBytes = keyCaptor.getValue();
byte[] valueBytes = valueCaptor.getValue();
int schemaId = schemaCaptor.getValue();
assertEquals(schemaId, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
assertEquals(keyBytes.length, 0);

AdminOperation adminMessage = adminOperationSerializer.deserialize(ByteBuffer.wrap(valueBytes), schemaId);
assertEquals(adminMessage.operationType, AdminMessageType.UPDATE_STORE.getValue());

UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion;
assertTrue(updateStore.flinkVeniceViewsEnabled);
}

private Store setupForStoreViewConfigUpdateTest(String storeName) {
Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis());
store.setActiveActiveReplicationEnabled(true);
Expand Down
Loading