Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,12 @@ data class JobCreateRequest(
* 镜像地址,容器任务需要
* */
val image: String? = null,
/**
* k8s任务容器command
*/
var command: List<String>? = null,
/**
* k8s任务容器cmd
*/
var cmdFileName: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,12 @@ data class JobInfo(
* 镜像地址
* */
var image: String? = null,
/**
* k8s任务容器command
*/
var command: List<String>? = null,
/**
* k8s任务容器cmd
*/
var cmdFileName: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,12 @@ data class JobUpdateRequest(
* 调度类型
*/
val scheduleType: Int? = null,
/**
* k8s任务容器command
*/
var command: List<String>? = null,
/**
* k8s任务容器cmd
*/
var cmdFileName: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ data class TriggerParam(
var broadcastTotal: Int = 0,
var workerAddress: String? = null,
var updateTime: LocalDateTime,
var command: List<String>? = null,
var cmdFileName: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ fun TJobInfo.convert(): JobInfo {
nextTriggerTime = nextTriggerTime,
source = source,
image = image,
command = command,
cmdFileName = cmdFileName,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ data class TJobInfo(
* 镜像地址
* */
var image: String? = null,
/**
* k8s任务容器command
*/
var command: List<String>? = null,
/**
* k8s任务容器cmd
*/
var cmdFileName: String? = null,
) {
companion object {
const val NEXT_TRIGGER_TIME_IDX = "nextTriggerTime_idx"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ open class DefaultJobManager(
updateTime = LocalDateTime.now(),
source = source,
image = finalImage,
command = command,
cmdFileName = cmdFileName,
)

// 一次性任务,不主动触发
Expand Down Expand Up @@ -192,6 +194,12 @@ open class DefaultJobManager(
maxRetryCount?.let {
jobInfo.maxRetryCount = it
}
command?.let {
jobInfo.command = it
}
cmdFileName?.let {
jobInfo.cmdFileName = it
}
jobInfo.updateTime = LocalDateTime.now()
jobProvider.updateJob(jobInfo).also {
logger.info("update job[${jobInfo.id}] success")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ class DefaultJobScheduler(
source = job.source,
image = job.image,
jobMode = job.jobMode,
command = job.command,
cmdFileName = job.cmdFileName,
)
// 3. 选择worker地址
require(group.registryList.isNotEmpty()) { "没有可用的worker地址" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,12 @@ data class JobContext(
* k8s任务使用的镜像
* */
var image: String?,
/**
* k8s任务容器command
*/
var command: List<String>? = null,
/**
* k8s任务容器cmd
*/
var cmdFileName: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class K8sShellHandler(
) : JobHandler {
override fun execute(context: JobContext): JobExecutionResult {
with(context) {
val cmd = cmdFileName ?: CMD
val podName = "schedule-shell-$logId"
var createdPod = false
val api = CoreV1Api(client)
Expand All @@ -55,7 +56,7 @@ class K8sShellHandler(
metadata {
name = configMapName
}
data = mapOf(CMD to source)
data = mapOf(cmd to source)
}
api.createNamespacedConfigMap(namespace, configMapBody, null, null, null, null)
logger.info("Created configmap $configMapName")
Expand All @@ -68,13 +69,13 @@ class K8sShellHandler(
containers {
name = logId
image = context.image
command = listOf(BASH_CMD, CMD)
command = context.command ?: listOf(BASH_CMD, cmd)
workingDir = WORK_SPACE
setEnv(context)
volumeMounts {
name = "shell-$logId"
mountPath = "$WORK_SPACE/$CMD"
subPath = CMD
subPath = cmd
readOnly = true
}
resources {
Expand All @@ -96,8 +97,8 @@ class K8sShellHandler(
name = configMapName
items = listOf(
newKeyToPath {
key = CMD
path = CMD
key = cmd
path = cmd
},
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ class JobThread(private val serverRpcClient: ServerRpcClient) : Thread() {
source = if (source != null) String(base64Decoder.decode(param.source)) else null,
image = if (param.image != null) param.image else null,
updateTime = param.updateTime,
command = command,
cmdFileName = cmdFileName,
)
}
}
Expand Down