diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java index 6384747153..d90938b88d 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java @@ -402,13 +402,16 @@ public boolean getAutosizingEnabled() { } /** - * Check if a given config parameter is an internal autosizing related config, based on - * its name having the prefix "job.autosizing" + * Check if a given config parameter is an internal autosizing related sizing config, based on + * if it maps to one of the autosizing related sizing configs. + * @param configParam the config param to determine - * @return true if the config is related to autosizing, false otherwise + * @return true if the config is related to sizing via autosizing, false otherwise */ - public static boolean isAutosizingConfig(String configParam) { - return configParam.startsWith(JOB_AUTOSIZING_CONFIG_PREFIX); + public static boolean isAutosizingSizingConfig(String configParam) { + return configParam.equals(JOB_AUTOSIZING_CONTAINER_COUNT) || configParam.equals(JOB_AUTOSIZING_CONTAINER_THREAD_POOL_SIZE) + || configParam.equals(JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB) || configParam.equals(JOB_AUTOSIZING_CONTAINER_MEMORY_MB) + || configParam.equals(JOB_AUTOSIZING_CONTAINER_MAX_CORES); } public boolean getJMXEnabled() { @@ -490,4 +493,4 @@ public int getElasticityFactor() { } return elasticityFactor; } -} \ No newline at end of file +} diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala index 04626632c1..51fd7762e2 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala @@ -133,14 +133,14 @@ object CoordinatorStreamUtil extends Logging { * Reads and returns launch config persisted in coordinator stream. Only job.auto sizing configs are currently supported. * @param config full job config * @param metadataStore an instance of the instantiated MetadataStore - * @return empty config if auto sizing is disabled, otherwise auto sizing related configs. + * @return empty config if auto sizing is disabled, otherwise auto sizing related sizing configs. */ def readLaunchConfigFromCoordinatorStream(config: Config, metadataStore: MetadataStore): Config = { if (!config.getBoolean(JobConfig.JOB_AUTOSIZING_ENABLED, false)) { new MapConfig() } else { val config = readConfigFromCoordinatorStream(metadataStore) - val launchConfig = config.asScala.filterKeys(key => JobConfig.isAutosizingConfig(key)).asJava + val launchConfig = config.asScala.filterKeys(key => JobConfig.isAutosizingSizingConfig(key)).asJava new MapConfig(launchConfig) } @@ -196,7 +196,7 @@ object CoordinatorStreamUtil extends Logging { val jobConfig = new JobConfig(config) if (jobConfig.getAutosizingEnabled) { // If autosizing is enabled, we retain auto-sizing related configs - keysToRemove = keysToRemove.filter(configKey => !JobConfig.isAutosizingConfig(configKey)) + keysToRemove = keysToRemove.filter(configKey => !JobConfig.isAutosizingSizingConfig(configKey)) } if (jobConfig.getApplicationMasterHighAvailabilityEnabled) {