Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.epam.pipeline.entity.utils.DateUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import joptsimple.internal.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.collections4.MapUtils;
Expand Down Expand Up @@ -76,6 +77,7 @@
import java.util.stream.Collectors;

@SuppressWarnings("PMD.ConsecutiveLiteralAppends")
@Slf4j
public class PipelineRunDao extends DryRunJdbcDaoSupport {

private Pattern wherePattern = Pattern.compile("@WHERE@");
Expand Down Expand Up @@ -1053,6 +1055,7 @@ public enum PipelineRunParameters {
POD_ID,
PIPELINE_NAME,
NODE_TYPE,
INSTANCE_FALLBACK_TYPES,
NODE_IP,
NODE_ID,
NODE_DISK,
Expand Down Expand Up @@ -1186,6 +1189,10 @@ private static void addInstanceFields(PipelineRun run, MapSqlParameterSource par
instance.map(RunInstance::getCloudProvider).map(CloudProvider::name).orElse(null));
params.addValue(NODE_PLATFORM.name(), instance.map(RunInstance::getNodePlatform).orElse(null));
params.addValue(NODE_POOL_ID.name(), instance.map(RunInstance::getPoolId).orElse(null));
instance.ifPresent(ins ->
params.addValue(INSTANCE_FALLBACK_TYPES.name(),
JsonMapper.convertDataToJsonStringForQuery(ins.getFallbackInstanceTypes()))
);
}

static ResultSetExtractor<Collection<PipelineRun>> getRunGroupExtractor() {
Expand Down Expand Up @@ -1271,6 +1278,18 @@ public static PipelineRun parsePipelineRun(ResultSet rs) throws SQLException {
instance.setNodeId(rs.getString(NODE_ID.name()));
instance.setNodeIP(rs.getString(NODE_IP.name()));
instance.setNodeType(rs.getString(NODE_TYPE.name()));

try {
String fallbackInstanceTypes = rs.getString(INSTANCE_FALLBACK_TYPES.name());
if (fallbackInstanceTypes != null && !fallbackInstanceTypes.equals("{}")) {
final List<String> fallbackInstanceList =
JsonMapper.parseData(fallbackInstanceTypes, new TypeReference<List<String>>() {});
instance.setFallbackInstanceTypes(fallbackInstanceList);
}
} catch (SQLException e) {
log.error("Query doesn't include INSTANCE_FALLBACK_TYPES column!", e);
}

instance.setNodeImage(rs.getString(NODE_IMAGE.name()));
instance.setNodeName(rs.getString(NODE_NAME.name()));
instance.setCloudRegionId(rs.getLong(NODE_CLOUD_REGION.name()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ private void readInstanceId(final RunInstance instance, final String output) {
instance.setNodeId(node[0]);
instance.setNodeIP(node[1]);
instance.setNodeName(node[2]);
} else if (node.length == 4) {
instance.setNodeId(node[0]);
instance.setNodeIP(node[1]);
instance.setNodeName(node[2]);
instance.setNodeType(node[3]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public NodeUpCommand.NodeUpCommandBuilder buildNodeUpCommand(final String nodeUp
.runId(nodeLabel)
.instanceImage(instance.getNodeImage())
.instanceType(instance.getNodeType())
.fallbackInstanceTypes(instance.getFallbackInstanceTypes())
.instanceDisk(String.valueOf(instance.getEffectiveNodeDisk()))
.instancePlatform(instance.getNodePlatform())
.kubeIP(kubeMasterIP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class NodeUpCommand extends AbstractClusterCommand {
private final Set<String> prePulledImages;
private final Map<String, String> runtimeParameters;
private final Map<String, String> tags;
private final List<String> fallbackInstanceTypes;

@Override
protected List<String> buildCommandArguments() {
Expand Down Expand Up @@ -115,6 +116,12 @@ protected List<String> buildCommandArguments() {
commands.add("--tags");
commands.add(key + "=" + value);
});
if (fallbackInstanceTypes != null && !fallbackInstanceTypes.isEmpty()) {
fallbackInstanceTypes.forEach(e -> {
commands.add("--fallback_ins_types");
commands.add(e);
});
}
return commands;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public RunInstance configurationToInstance(PipelineConfiguration configuration)
.map(cloudRegionManager::load)
.orElse(cloudRegionManager.loadDefaultRegion())
.getId());
instance.setFallbackInstanceTypes(configuration.getFallbackInstanceTypes());
return instance;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ public PipelineConfiguration mergeParameters(PipelineStart runVO, PipelineConfig
} else {
configuration.setInstanceType(defaultConfig.getInstanceType());
}
if (runVO.getFallbackInstanceTypes() != null) {
configuration.setFallbackInstanceTypes(runVO.getFallbackInstanceTypes());
}
if (runVO.getTimeout() != null) {
configuration.setTimeout(runVO.getTimeout());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,7 @@ private RunInstance configureRunInstance(PipelineConfiguration configuration, Ab
instance.setEffectiveNodeDisk(Optional.ofNullable(configuration.getEffectiveDiskSize())
.orElse(instance.getNodeDisk()));
instance.setNodeType(configuration.getInstanceType());
instance.setFallbackInstanceTypes(configuration.getFallbackInstanceTypes());
instance.setNodeImage(configuration.getInstanceImage());
Optional.ofNullable(region).map(AbstractCloudRegion::getId).ifPresent(instance::setCloudRegionId);
Optional.ofNullable(region).map(AbstractCloudRegion::getProvider).ifPresent(instance::setCloudProvider);
Expand Down
1 change: 1 addition & 0 deletions api/src/main/resources/dao/filter-dao.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
r.terminating,
r.pod_id,
r.node_type,
r.instance_fallback_types,
r.node_disk,
r.node_ip,
r.node_id,
Expand Down
25 changes: 24 additions & 1 deletion api/src/main/resources/dao/pipeline-run-dao.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
terminating,
pod_id,
node_type,
instance_fallback_types,
node_disk,
node_ip,
node_id,
Expand Down Expand Up @@ -89,6 +90,7 @@
:TERMINATING,
:POD_ID,
:NODE_TYPE,
to_jsonb(:INSTANCE_FALLBACK_TYPES::jsonb),
:NODE_DISK,
:NODE_IP,
:NODE_ID,
Expand Down Expand Up @@ -150,6 +152,7 @@
r.terminating,
r.pod_id,
r.node_type,
r.instance_fallback_types,
r.node_disk,
r.node_ip,
r.node_id,
Expand Down Expand Up @@ -249,6 +252,7 @@
terminating,
pod_id,
node_type,
instance_fallback_types,
node_disk,
node_ip,
node_id,
Expand Down Expand Up @@ -319,6 +323,7 @@
terminating,
pod_id,
node_type,
instance_fallback_types,
node_disk,
node_ip,
node_id,
Expand Down Expand Up @@ -389,6 +394,7 @@
terminating,
pod_id,
node_type,
instance_fallback_types,
node_disk,
node_ip,
node_name,
Expand Down Expand Up @@ -571,7 +577,8 @@
kube_service_enabled = :KUBE_SERVICE_ENABLED,
pipeline_name = :PIPELINE_NAME,
node_start_date = :NODE_START_DATE,
project_id = :PROJECT_ID
project_id = :PROJECT_ID,
instance_fallback_types = to_jsonb(:INSTANCE_FALLBACK_TYPES::jsonb)
WHERE
run_id = :RUN_ID
]]>
Expand Down Expand Up @@ -634,6 +641,7 @@
r.terminating,
r.pod_id,
r.node_type,
r.instance_fallback_types,
r.node_disk,
r.node_ip,
r.node_id,
Expand Down Expand Up @@ -706,6 +714,7 @@
r.terminating,
r.pod_id,
r.node_type,
r.instance_fallback_types,
r.node_disk,
r.node_ip,
r.node_id,
Expand Down Expand Up @@ -778,6 +787,7 @@
r.terminating,
r.pod_id,
r.node_type,
r.instance_fallback_types,
r.node_disk,
r.node_ip,
r.node_id,
Expand Down Expand Up @@ -848,6 +858,7 @@
active_run.terminating,
active_run.pod_id,
active_run.node_type,
active_run.instance_fallback_types,
active_run.node_disk,
active_run.node_ip,
active_run.node_id,
Expand Down Expand Up @@ -1075,6 +1086,7 @@
r.terminating,
r.pod_id,
r.node_type,
r.instance_fallback_types,
r.node_disk,
r.node_ip,
r.node_id,
Expand Down Expand Up @@ -1143,6 +1155,7 @@
r.terminating,
r.pod_id,
r.node_type,
r.instance_fallback_types,
r.node_disk,
r.node_ip,
r.node_id,
Expand Down Expand Up @@ -1211,6 +1224,7 @@
r.terminating,
r.pod_id,
r.node_type,
r.instance_fallback_types,
r.node_disk,
r.node_ip,
r.node_id,
Expand Down Expand Up @@ -1306,6 +1320,7 @@
runs.terminating,
runs.pod_id,
runs.node_type,
runs.instance_fallback_types,
runs.node_disk,
runs.node_ip,
runs.node_id,
Expand Down Expand Up @@ -1595,6 +1610,7 @@
r.terminating,
r.pod_id,
r.node_type,
r.instance_fallback_types,
r.node_disk,
r.node_ip,
r.node_id,
Expand Down Expand Up @@ -1992,6 +2008,7 @@
terminating,
pod_id,
node_type,
instance_fallback_types,
node_disk,
node_ip,
node_id,
Expand Down Expand Up @@ -2062,6 +2079,7 @@
terminating,
pod_id,
node_type,
instance_fallback_types,
node_disk,
node_ip,
node_id,
Expand Down Expand Up @@ -2132,6 +2150,7 @@
terminating,
pod_id,
node_type,
instance_fallback_types,
node_disk,
node_ip,
node_id,
Expand Down Expand Up @@ -2202,6 +2221,7 @@
terminating,
pod_id,
node_type,
instance_fallback_types,
node_disk,
node_ip,
node_id,
Expand Down Expand Up @@ -2282,6 +2302,7 @@
terminating,
pod_id,
node_type,
instance_fallback_types,
node_disk,
node_ip,
node_id,
Expand Down Expand Up @@ -2352,6 +2373,7 @@
terminating,
pod_id,
node_type,
instance_fallback_types,
node_disk,
node_ip,
node_id,
Expand Down Expand Up @@ -2465,6 +2487,7 @@
terminating,
pod_id,
node_type,
instance_fallback_types,
node_disk,
node_ip,
node_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE pipeline.pipeline_run ADD instance_fallback_types jsonb;
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,43 @@ public void shouldLoadRunsByOwnerAndEndDateBeforeAndStatusIn() {
assertThat(runs.size(), is(1));
}

@Test
public void shouldCreatePipelineRunWithFallbacks() {
final PipelineRun run = buildPipelineRun(testPipeline.getId());
List<String> fallbackInstanceTypes = new ArrayList<>();
fallbackInstanceTypes.add(NODE_TYPE);
fallbackInstanceTypes.add(NODE_TYPE_2);
run.getInstance().setFallbackInstanceTypes(fallbackInstanceTypes);

pipelineRunDao.createPipelineRun(run);

final PipelineRun loadedRun = pipelineRunDao.loadPipelineRun(run.getId());

List<String> dbFallbackInstanceTypes = loadedRun.getInstance().getFallbackInstanceTypes();
assertThat(dbFallbackInstanceTypes.size(), is(2));
assertTrue(dbFallbackInstanceTypes.contains(NODE_TYPE));
assertTrue(dbFallbackInstanceTypes.contains(NODE_TYPE_2));
}

@Test
public void shouldUpdateFallbacks() {
PipelineRun run = createTestPipelineRun(testPipeline.getId());
assertTrue(Objects.isNull(run.getInstance().getFallbackInstanceTypes()));

List<String> fallbackInstanceTypes = new ArrayList<>();
fallbackInstanceTypes.add(NODE_TYPE);
fallbackInstanceTypes.add(NODE_TYPE_2);
run.getInstance().setFallbackInstanceTypes(fallbackInstanceTypes);

pipelineRunDao.updateRun(run);
PipelineRun loaded = pipelineRunDao.loadPipelineRun(run.getId());
assertEquals(run.getId(), loaded.getId());
List<String> dbFallbackInstanceTypes = loaded.getInstance().getFallbackInstanceTypes();
assertThat(dbFallbackInstanceTypes.size(), is(2));
assertTrue(dbFallbackInstanceTypes.contains(NODE_TYPE));
assertTrue(dbFallbackInstanceTypes.contains(NODE_TYPE_2));
}

private PipelineRun createTestPipelineRun() {
return createTestPipelineRun(testPipeline.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class PipelineConfiguration implements Cloneable {
@JsonProperty(value = INSTANCE_SIZE)
private String instanceType;

private List<String> fallbackInstanceTypes;

@JsonProperty(value = INSTANCE_IMAGE)
private String instanceImage;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.ToString;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.Objects;
import java.util.Set;

Expand All @@ -38,6 +39,7 @@
@ToString
public class RunInstance {
private String nodeType;
private List<String> fallbackInstanceTypes;
/**
* Node size the was requested by user
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class PipelineStart {
private String version;
private Long timeout;
private String instanceType;
private List<String> fallbackInstanceTypes;
private String instanceImage;
private Integer hddSize;
private String dockerImage;
Expand Down