diff --git a/src/core-common/src/main/resources/metadata-jdbc-h2.properties b/src/core-common/src/main/resources/metadata-jdbc-h2.properties index 21a9ddd38da..806f1955c9f 100644 --- a/src/core-common/src/main/resources/metadata-jdbc-h2.properties +++ b/src/core-common/src/main/resources/metadata-jdbc-h2.properties @@ -75,6 +75,37 @@ create.queryhistoryrealization.store.tableindex2=CREATE INDEX %s_ix2 ON %s ( mod create.queryhistoryrealization.store.tableindex3=CREATE INDEX %s_ix3 ON %s ( query_first_day_of_month ); create.queryhistoryrealization.store.tableindex4=CREATE INDEX %s_ix4 ON %s ( query_first_day_of_week ); create.queryhistoryrealization.store.tableindex5=CREATE INDEX %s_ix5 ON %s ( query_day ); + +#Job Metrics Store +create.jobmetrics.store.table=CREATE TABLE IF NOT EXISTS `%s` ( \ + id bigint not null auto_increment, \ + `job_id` VARCHAR(255) , \ + `submitter` VARCHAR(100), \ + `model` VARCHAR(128), \ + `job_type` VARCHAR(100), \ + `job_engine` VARCHAR(255), \ + `project_name` VARCHAR(100), \ + `job_state` VARCHAR(50), \ + `error_type` VARCHAR(128), \ + `error_info` TEXT, \ + `duration` BIGINT, \ + `per_bytes_time_cost` DOUBLE, \ + `model_size` BIGINT, \ + `wait_time` BIGINT, \ + `build_time` BIGINT, \ + `build_first_day_of_month` BIGINT, \ + `build_first_day_of_week` BIGINT, \ + `build_day` BIGINT, \ + `build_date` DATE, \ + primary key (`id`,`project_name`) \ +); +create.jobmetrics.store.tableindex1=CREATE INDEX %s_ix1 ON %s ( build_time ); +create.jobmetrics.store.tableindex2=CREATE INDEX %s_ix2 ON %s ( model ); +create.jobmetrics.store.tableindex3=CREATE INDEX %s_ix3 ON %s ( build_first_day_of_month ); +create.jobmetrics.store.tableindex4=CREATE INDEX %s_ix4 ON %s ( build_first_day_of_week ); +create.jobmetrics.store.tableindex5=CREATE INDEX %s_ix5 ON %s ( build_day ); + + # RAW RECOMMENDATION STORE create.rawrecommendation.store.table=CREATE TABLE IF NOT EXISTS `%s` ( \ `id` int not null auto_increment, \ diff --git a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties index 48299736ce6..4284fd8dd64 100644 --- a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties +++ b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties @@ -148,6 +148,35 @@ create.streamingjobrecord.store.table=CREATE TABLE IF NOT EXISTS %s ( \ create.streamingjobrecord.store.tableindex1=CREATE INDEX %s_idx1 ON %s USING btree ( job_id ); create.streamingjobrecord.store.tableindex2=CREATE INDEX %s_idx2 ON %s USING btree ( create_time ); +#Job Metrics Store +create.jobmetrics.store.table=CREATE TABLE IF NOT EXISTS `%s` ( \ + id bigint not null auto_increment, \ + `job_id` VARCHAR(255) , \ + `submitter` VARCHAR(100), \ + `model` VARCHAR(128), \ + `job_type` VARCHAR(100), \ + `job_engine` VARCHAR(255), \ + `project_name` VARCHAR(100), \ + `job_state` VARCHAR(50), \ + `error_type` VARCHAR(128), \ + `error_info` TEXT, \ + `duration` BIGINT, \ + `per_bytes_time_cost` DOUBLE, \ + `model_size` BIGINT, \ + `wait_time` BIGINT, \ + `build_time` BIGINT, \ + `build_first_day_of_month` BIGINT, \ + `build_first_day_of_week` BIGINT, \ + `build_day` BIGINT, \ + `build_date` DATE, \ + primary key (`id`,`project_name`) \ +); +create.jobmetrics.store.tableindex1=CREATE INDEX %s_ix1 ON %s ( build_time ); +create.jobmetrics.store.tableindex2=CREATE INDEX %s_ix2 ON %s ( model ); +create.jobmetrics.store.tableindex3=CREATE INDEX %s_ix3 ON %s ( build_first_day_of_month ); +create.jobmetrics.store.tableindex4=CREATE INDEX %s_ix4 ON %s ( build_first_day_of_week ); +create.jobmetrics.store.tableindex5=CREATE INDEX %s_ix5 ON %s ( build_day ); + # RAW RECOMMENDATION STORE create.rawrecommendation.store.table=CREATE TABLE IF NOT EXISTS `%s` ( \ `id` int not null auto_increment, \ diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java index e30644b5b3f..78b3c463494 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java @@ -18,8 +18,11 @@ package org.apache.kylin.job.execution; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Comparator; +import java.util.Date; import java.util.List; import java.util.Locale; import java.util.Map; @@ -35,10 +38,15 @@ import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.common.scheduler.JobFinishedNotifier; +import org.apache.kylin.common.util.StringHelper; +import org.apache.kylin.common.util.TimeUtil; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.exception.ExecuteRuntimeException; import org.apache.kylin.job.exception.JobStoppedException; import org.apache.kylin.job.exception.PersistentException; +import org.apache.kylin.job.metrics.JobMetrics; +import org.apache.kylin.job.metrics.RDBMJobMetricsDAO; +import org.apache.kylin.job.util.MailNotificationUtil; import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import org.apache.kylin.guava30.shaded.common.collect.Lists; @@ -229,15 +237,16 @@ protected void onExecuteFinished(ExecuteResult result) throws JobStoppedExceptio switch (state) { case SUCCEED: updateToFinalState(ExecutableState.SUCCEED, this::afterUpdateOutput, result.getShortErrMsg()); - onStatusChange(ExecutableState.SUCCEED); + onStatusChange(ExecutableState.SUCCEED, context, result); + updateJobMetrics(state, context, result); break; case DISCARDED: updateToFinalState(ExecutableState.DISCARDED, this::onExecuteDiscardHook, result.getShortErrMsg()); - onStatusChange(ExecutableState.DISCARDED); + onStatusChange(ExecutableState.DISCARDED, context, result); break; case SUICIDAL: updateToFinalState(ExecutableState.SUICIDAL, this::onExecuteSuicidalHook, result.getShortErrMsg()); - onStatusChange(ExecutableState.SUICIDAL); + onStatusChange(ExecutableState.SUICIDAL, context, result); break; case ERROR: case PAUSED: @@ -260,7 +269,8 @@ protected void onExecuteFinished(ExecuteResult result) throws JobStoppedExceptio } updateJobOutput(getProject(), getId(), state, info, output, shortErrMsg, hook); if (state == ExecutableState.ERROR) { - onStatusChange(ExecutableState.ERROR); + onStatusChange(ExecutableState.ERROR, context, result); + updateJobMetrics(state, context, result); } break; default: @@ -411,7 +421,68 @@ protected void afterUpdateOutput(String jobId) { // just implement it } - protected void onStatusChange(ExecutableState state) { + protected void onStatusChange(ExecutableState state, ExecutableContext context, ExecuteResult result) { super.notifyUserStatusChange(state); } + + protected void updateJobMetrics(ExecutableState state, ExecutableContext context, ExecuteResult result) { + JobMetrics jobMetrics = new JobMetrics(); + jobMetrics.setJobId(getId()); + jobMetrics.setJobType(getJobType().toString()); + jobMetrics.setJobState(state.toStringState()); + jobMetrics.setProjectName(getProject()); + jobMetrics.setModel(getTargetSubjectAlias()); + jobMetrics.setSubmitter(StringHelper.noBlank(getSubmitter(), "missing submitter")); + jobMetrics.setJobEngine(MailNotificationUtil.getLocalHostName()); + + jobMetrics.setBuildTime(getJobEndTime()); + long dayStart = TimeUtil.getDayStart(getJobEndTime()); + Date date = new Date(dayStart); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd", Locale.getDefault(Locale.Category.FORMAT)); + String buildDate = sdf.format(date); + try { + jobMetrics.setBuildDate(sdf.parse(buildDate)); + } catch (ParseException e) { + logger.error("time format is error"); + } + jobMetrics.setBuildDay(dayStart); + jobMetrics.setBuildFirstDayOfWeek(TimeUtil.getWeekStart(getJobEndTime())); + jobMetrics.setBuildFirstDayOfMonth(TimeUtil.getMonthStart(getJobEndTime())); + + if (state == ExecutableState.SUCCEED) { + jobMetrics.setDuration(getDuration()); + jobMetrics.setWaitTime(getWaitTime()); + jobMetrics.setModelSize(getByteSize()); + jobMetrics.setPerBytesTimeCost(getPerBytesTimeCost(getByteSize(), getDuration())); + } else if (state == ExecutableState.ERROR) { + AbstractExecutable errorTask = null; + Output errorOutput; + List tasks = getTasks(); + for (AbstractExecutable task : tasks) { + errorOutput = getManager().getOutput(task.getId()); + if (errorOutput.getState() == ExecutableState.ERROR) { + errorTask = task; + break; + } + } + if (errorTask != null) { + jobMetrics.setErrorType(errorTask.getName()); + jobMetrics.setErrorInfo(result.getShortErrMsg()); + } + } + updateJobMetrics(jobMetrics); + } + + private void updateJobMetrics(JobMetrics jobMetrics) { + RDBMJobMetricsDAO jobMetricsDAO = RDBMJobMetricsDAO.getInstance(); + jobMetricsDAO.insert(jobMetrics); + + } + + private static double getPerBytesTimeCost(long byteSize, long time) { + if (byteSize <= 0) { + return 0; + } + return time * 1.0 / byteSize; + } } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/metrics/JdbcJobMetricsStore.java b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JdbcJobMetricsStore.java new file mode 100644 index 00000000000..b5ae46aaa61 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JdbcJobMetricsStore.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.metrics; + + + + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.SqlSessionFactory; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; +import org.apache.kylin.common.persistence.metadata.JdbcDataSource; +import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil; +import org.apache.kylin.common.util.TimeUtil; +import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; +import org.apache.kylin.job.metrics.util.JobMetricsUtil; +import org.mybatis.dynamic.sql.SqlBuilder; +import org.mybatis.dynamic.sql.insert.render.InsertStatementProvider; +import org.mybatis.dynamic.sql.render.RenderingStrategies; +import org.mybatis.dynamic.sql.select.render.SelectStatementProvider; + +import javax.sql.DataSource; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.TimeZone; +import java.util.stream.Collectors; + +import static org.mybatis.dynamic.sql.SqlBuilder.avg; +import static org.mybatis.dynamic.sql.SqlBuilder.count; +import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo; +import static org.mybatis.dynamic.sql.SqlBuilder.isGreaterThanOrEqualTo; +import static org.mybatis.dynamic.sql.SqlBuilder.isLessThanOrEqualTo; +import static org.mybatis.dynamic.sql.SqlBuilder.select; +import static org.mybatis.dynamic.sql.SqlBuilder.sum; + +@Slf4j +public class JdbcJobMetricsStore { + + public static final String MONTH = "month"; + public static final String WEEK = "week"; + public static final String DAY = "day"; + public static final String COUNT = "count"; + public static final String JOB_METRICS_SUFFIX = "job_metrics"; + + + private JobMetricsTable jobMetricsTable; + + @VisibleForTesting + @Getter + private final SqlSessionFactory sqlSessionFactory; + + private final DataSource dataSource; + + String jobMetricsTableName; + + public JdbcJobMetricsStore(KylinConfig kylinConfig) throws Exception { + StorageURL url = kylinConfig.getMetadataUrl(); + Properties props = JdbcUtil.datasourceParameters(url); + dataSource = JdbcDataSource.getDataSource(props); + jobMetricsTableName = StorageURL.replaceUrl(url) + "_" + JOB_METRICS_SUFFIX; + jobMetricsTable = new JobMetricsTable(jobMetricsTableName); + sqlSessionFactory = JobMetricsUtil.getSqlSessionFactory(dataSource, jobMetricsTableName); + } + + public int insert(JobMetrics jobMetrics) { + try(SqlSession session = sqlSessionFactory.openSession()) { + JobMetricsMapper mapper = session.getMapper(JobMetricsMapper.class); + InsertStatementProvider statementProvider = getInsertJobMetricsProvider(jobMetrics); + int rows = mapper.insert(statementProvider); + + if (rows > 0) { + log.info("Insert one Job Metric(job id:{}) into database.", jobMetrics.getJobId()); + } + session.commit(); + return rows; + } + } + + + public List JobCountAndTotalBuildCost(long startTime, long endTime, String project) { + try (SqlSession session = sqlSessionFactory.openSession()) { + JobMetricsStatisticsMapper mapper = session.getMapper(JobMetricsStatisticsMapper.class); + SelectStatementProvider statementProvider = select(count(jobMetricsTable.id).as(COUNT), + sum(jobMetricsTable.modelSize).as("model_size"), + sum(jobMetricsTable.duration).as("duration")).from(jobMetricsTable) + .where(jobMetricsTable.buildDay, isGreaterThanOrEqualTo(startTime)) + .and(jobMetricsTable.buildDay, isLessThanOrEqualTo(endTime)) + .and(jobMetricsTable.projectName, isEqualTo(project)) + .build().render(RenderingStrategies.MYBATIS3); + return mapper.selectMany(statementProvider); + } + } + + public List jobCountByModel(long startTime, long endTime, String project) { + try (SqlSession session = sqlSessionFactory.openSession()) { + JobMetricsStatisticsMapper mapper = session.getMapper(JobMetricsStatisticsMapper.class); + SelectStatementProvider statementProvider = select(count(jobMetricsTable.id).as(COUNT), + (jobMetricsTable.model).as("model")) + .from(jobMetricsTable) + .where(jobMetricsTable.buildDay, isGreaterThanOrEqualTo(startTime)) + .and(jobMetricsTable.buildDay, isLessThanOrEqualTo(endTime)) + .and(jobMetricsTable.projectName, isEqualTo(project)) + .groupBy(jobMetricsTable.model) + .build().render(RenderingStrategies.MYBATIS3); + return mapper.selectMany(statementProvider); + } + } + + public List jobCountByTime(long startTime, long endTime, String timeDimension, String project) { + try (SqlSession session = sqlSessionFactory.openSession()) { + JobMetricsStatisticsMapper mapper = session.getMapper(JobMetricsStatisticsMapper.class); + SelectStatementProvider statementProvider = jobCountByTimeProvider(startTime, endTime, timeDimension, + project); + return mapper.selectMany(statementProvider); + } + } + + public List jobBuildCostByModel(long startTime, long endTime, String project) { + try (SqlSession session = sqlSessionFactory.openSession()) { + JobMetricsStatisticsMapper mapper = session.getMapper(JobMetricsStatisticsMapper.class); + SelectStatementProvider statementProvider = select(count(jobMetricsTable.id).as(COUNT), + (jobMetricsTable.model).as("model"), + avg(jobMetricsTable.modelSize).as("model_size"), + avg(jobMetricsTable.duration).as("duration")) + .from(jobMetricsTable) + .where(jobMetricsTable.buildDay, isGreaterThanOrEqualTo(startTime)) + .and(jobMetricsTable.buildDay, isLessThanOrEqualTo(endTime)) + .and(jobMetricsTable.projectName, isEqualTo(project)) + .groupBy(jobMetricsTable.model) + .build().render(RenderingStrategies.MYBATIS3); + return mapper.selectMany(statementProvider); + } + } + + public List jobBuildCostByTime(long startTime, long endTime, String timeDimension, String project) { + try (SqlSession session = sqlSessionFactory.openSession()) { + JobMetricsStatisticsMapper mapper = session.getMapper(JobMetricsStatisticsMapper.class); + SelectStatementProvider statementProvider = jobBuildCostByTimeProvider(startTime, endTime, timeDimension, project); + return mapper.selectMany(statementProvider); + } + } + + private SelectStatementProvider jobCountByTimeProvider(long startTime, long endTime, String timeDimension, + String project) { + if (timeDimension.equalsIgnoreCase(MONTH)) { + return select(jobMetricsTable.buildFirstDayOfMonth.as("time"), count(jobMetricsTable.id).as(COUNT)) // + .from(jobMetricsTable) // + .where(jobMetricsTable.buildDay, isGreaterThanOrEqualTo(startTime)) // + .and(jobMetricsTable.buildDay, isLessThanOrEqualTo(endTime)) // + .and(jobMetricsTable.projectName, isEqualTo(project)) // + .groupBy(jobMetricsTable.buildFirstDayOfMonth) // + .build().render(RenderingStrategies.MYBATIS3); + } else if (timeDimension.equalsIgnoreCase(WEEK)) { + return select(jobMetricsTable.buildFirstDayOfWeek.as("time"), count(jobMetricsTable.id).as(COUNT)) // + .from(jobMetricsTable) // + .where(jobMetricsTable.buildDay, isGreaterThanOrEqualTo(startTime)) // + .and(jobMetricsTable.buildDay, isLessThanOrEqualTo(endTime)) // + .and(jobMetricsTable.projectName, isEqualTo(project)) // + .groupBy(jobMetricsTable.buildFirstDayOfWeek) // + .build().render(RenderingStrategies.MYBATIS3); + } else if (timeDimension.equalsIgnoreCase(DAY)) { + return select(jobMetricsTable.buildDay.as("time"), count(jobMetricsTable.id).as(COUNT)) // + .from(jobMetricsTable) // + .where(jobMetricsTable.buildDay, isGreaterThanOrEqualTo(startTime)) // + .and(jobMetricsTable.buildDay, isLessThanOrEqualTo(endTime)) // + .and(jobMetricsTable.projectName, isEqualTo(project)) // + .groupBy(jobMetricsTable.buildDay) // + .build().render(RenderingStrategies.MYBATIS3); + } else { + throw new IllegalStateException("Unsupported time window!"); + } + } + + private SelectStatementProvider jobBuildCostByTimeProvider(long startTime, long endTime, String timeDimension, + String project) { + if (timeDimension.equalsIgnoreCase(MONTH)) { + return select(jobMetricsTable.buildFirstDayOfMonth.as("time"), + avg(jobMetricsTable.duration).as("duration"), // + avg(jobMetricsTable.modelSize).as("model_size"), + count(jobMetricsTable.id).as(COUNT)) + .from(jobMetricsTable) // + .where(jobMetricsTable.buildDay, isGreaterThanOrEqualTo(startTime)) // + .and(jobMetricsTable.buildDay, isLessThanOrEqualTo(endTime)) // + .and(jobMetricsTable.projectName, isEqualTo(project)) // + .groupBy(jobMetricsTable.buildFirstDayOfMonth) // + .build().render(RenderingStrategies.MYBATIS3); + } else if (timeDimension.equalsIgnoreCase(WEEK)) { + return select(jobMetricsTable.buildFirstDayOfWeek.as("time"), + avg(jobMetricsTable.duration).as("duration"), // + avg(jobMetricsTable.modelSize).as("model_size"), + count(jobMetricsTable.id).as(COUNT)) + .from(jobMetricsTable) // + .where(jobMetricsTable.buildDay, isGreaterThanOrEqualTo(startTime)) // + .and(jobMetricsTable.buildDay, isLessThanOrEqualTo(endTime)) // + .and(jobMetricsTable.projectName, isEqualTo(project)) // + .groupBy(jobMetricsTable.buildFirstDayOfWeek) // + .build().render(RenderingStrategies.MYBATIS3); + } else if (timeDimension.equalsIgnoreCase(DAY)) { + return select(jobMetricsTable.buildDay.as("time"), + avg(jobMetricsTable.duration).as("duration"), // + avg(jobMetricsTable.modelSize).as("model_size"), + count(jobMetricsTable.id).as(COUNT)) + .from(jobMetricsTable) // + .where(jobMetricsTable.buildDay, isGreaterThanOrEqualTo(startTime)) // + .and(jobMetricsTable.buildDay, isLessThanOrEqualTo(endTime)) // + .and(jobMetricsTable.projectName, isEqualTo(project)) // + .groupBy(jobMetricsTable.buildDay) // + .build().render(RenderingStrategies.MYBATIS3); + } else { + throw new IllegalStateException("Unsupported time window!"); + } + } + + public static void fillZeroForJobStatistics(List jobStatistics, long startTime, long endTime, + String dimension) { + if (!dimension.equalsIgnoreCase(DAY) && !dimension.equalsIgnoreCase(WEEK)) { + return; + } + if (dimension.equalsIgnoreCase(WEEK)) { + startTime = TimeUtil.getWeekStart(startTime); + endTime = TimeUtil.getWeekStart(endTime); + } + Set instantSet = jobStatistics.stream().map(JobMetricsStatistics::getTime).collect(Collectors.toSet()); + int rawOffsetTime = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()).getRawOffset(); + + long startOffSetTime = Instant.ofEpochMilli(startTime).plusMillis(rawOffsetTime).toEpochMilli(); + Instant startInstant = Instant.ofEpochMilli(startOffSetTime - startOffSetTime % (1000 * 60 * 60 * 24)); + long endOffSetTime = Instant.ofEpochMilli(endTime).plusMillis(rawOffsetTime).toEpochMilli(); + Instant endInstant = Instant.ofEpochMilli(endOffSetTime - endOffSetTime % (1000 * 60 * 60 * 24)); + while (!startInstant.isAfter(endInstant)) { + if (!instantSet.contains(startInstant)) { + JobMetricsStatistics zeroStatistics = new JobMetricsStatistics(); + zeroStatistics.setCount(0); + zeroStatistics.setTime(startInstant); + jobStatistics.add(zeroStatistics); + } + if (dimension.equalsIgnoreCase(DAY)) { + startInstant = startInstant.plus(Duration.ofDays(1)); + } else if (dimension.equalsIgnoreCase(WEEK)) { + startInstant = startInstant.plus(Duration.ofDays(7)); + } + } + } + + + + InsertStatementProvider getInsertJobMetricsProvider(JobMetrics jobMetrics) { + return SqlBuilder.insert(jobMetrics).into(jobMetricsTable) + .map(jobMetricsTable.jobId).toPropertyWhenPresent("jobId", jobMetrics::getJobId) + .map(jobMetricsTable.jobType).toPropertyWhenPresent("jobType", jobMetrics::getJobType) + .map(jobMetricsTable.duration).toPropertyWhenPresent("duration", jobMetrics::getDuration) + .map(jobMetricsTable.submitter).toPropertyWhenPresent("submitter", jobMetrics::getSubmitter) + .map(jobMetricsTable.model).toPropertyWhenPresent("model", jobMetrics::getModel) + .map(jobMetricsTable.projectName).toPropertyWhenPresent("projectName", jobMetrics::getProjectName) + .map(jobMetricsTable.buildTime).toPropertyWhenPresent("buildTime", jobMetrics::getBuildTime) + .map(jobMetricsTable.modelSize).toPropertyWhenPresent("modelSize", jobMetrics::getModelSize) + .map(jobMetricsTable.waitTime).toPropertyWhenPresent("waitTime", jobMetrics::getWaitTime) + .map(jobMetricsTable.buildFirstDayOfMonth) + .toPropertyWhenPresent("buildFirstDayOfMonth", jobMetrics::getBuildFirstDayOfMonth) + .map(jobMetricsTable.buildFirstDayOfWeek) + .toPropertyWhenPresent("buildFirstDayOfWeek", jobMetrics::getBuildFirstDayOfWeek) + .map(jobMetricsTable.buildDay).toPropertyWhenPresent("buildDay", jobMetrics::getBuildDay) + .map(jobMetricsTable.buildDate).toPropertyWhenPresent("buildDate", jobMetrics::getBuildDate) + .map(jobMetricsTable.jobState).toPropertyWhenPresent("jobState", jobMetrics::getJobState) + .map(jobMetricsTable.errorType).toPropertyWhenPresent("errorType", jobMetrics::getErrorType) + .map(jobMetricsTable.errorInfo).toPropertyWhenPresent("errorInfo", jobMetrics::getErrorInfo) + .map(jobMetricsTable.perBytesTimeCost).toPropertyWhenPresent("perBytesTimeCost", + jobMetrics::getPerBytesTimeCost) + .map(jobMetricsTable.jobEngine).toPropertyWhenPresent("jobEngine", jobMetrics::getJobEngine) + .build().render(RenderingStrategies.MYBATIS3); + } + + +} \ No newline at end of file diff --git a/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetrics.java b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetrics.java new file mode 100644 index 00000000000..5edd1e015bf --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetrics.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.metrics; + +import lombok.Getter; +import lombok.Setter; + +import java.util.Date; + +@Getter +@Setter +public class JobMetrics { + + private String jobId; + + private String submitter; + + private String model; + + private String jobType; + + private String projectName; + + private long duration; + + private String jobState; + + private String errorType; + + private String errorInfo; + + private long modelSize; + + private double perBytesTimeCost; + + private long waitTime; + + private long buildTime; + + private long buildFirstDayOfMonth; + + private long buildFirstDayOfWeek; + + private long buildDay; + + private Date buildDate; + + private String jobEngine; + + public JobMetrics() { + } + +} \ No newline at end of file diff --git a/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsDao.java b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsDao.java new file mode 100644 index 00000000000..fdbfb9272c8 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsDao.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.metrics; + + +import java.util.List; + +public interface JobMetricsDao { + + JobMetricsStatistics getJobCountAndTotalBuildCost(long startTime, long endTime, String project); + + List getJobCountByModel(long startTime, long endTime, String project); + + List getJobCountByTime(long startTime, long endTime, String timeDimension, String project); + + List getJobBuildCostByModel(long startTime, long endTime, String project); + + List getJobBuildCostByTime(long startTime, long endTime, String timeDimension, String project); +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsMapper.java b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsMapper.java new file mode 100644 index 00000000000..fb274d12dd7 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsMapper.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.metrics; + +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; +import org.mybatis.dynamic.sql.delete.render.DeleteStatementProvider; +import org.mybatis.dynamic.sql.insert.render.InsertStatementProvider; +import org.mybatis.dynamic.sql.util.SqlProviderAdapter; + +public interface JobMetricsMapper { + + @DeleteProvider(type = SqlProviderAdapter.class, method = "delete") + int delete(DeleteStatementProvider deleteStatementProvider); + + @InsertProvider(type = SqlProviderAdapter.class, method = "insert") + int insert(InsertStatementProvider insertStatement); +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsStatistics.java b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsStatistics.java new file mode 100644 index 00000000000..6e9c31c974c --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsStatistics.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.metrics; + + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.apache.kylin.shaded.influxdb.org.influxdb.annotation.Column; + +import java.time.Instant; + +@Getter +@Setter +@NoArgsConstructor +public class JobMetricsStatistics { + + @JsonProperty("count") + @Column(name = "count") + private int count; + + @JsonProperty("duration") + @Column(name = "duration") + private long duration; + + @JsonProperty("model_size") + @Column(name = "model_size") + private long modelSize; + + @JsonProperty("time") + @Column(name = "time") + private Instant time; + + @JsonProperty("model") + @Column(name = "model", tag = true) + private String model; +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsStatisticsMapper.java b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsStatisticsMapper.java new file mode 100644 index 00000000000..40de339c4db --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsStatisticsMapper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.metrics; + +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Result; +import org.apache.ibatis.annotations.ResultMap; +import org.apache.ibatis.annotations.Results; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.type.JdbcType; +import org.mybatis.dynamic.sql.select.render.SelectStatementProvider; +import org.mybatis.dynamic.sql.util.SqlProviderAdapter; + +import java.util.List; + +@Mapper +public interface JobMetricsStatisticsMapper { + + @SelectProvider(type = SqlProviderAdapter.class, method = "select") + @Results(id = "JobMetricsStatisticsResult", value = { + @Result(column = "count", property = "count", jdbcType = JdbcType.INTEGER), + @Result(column = "duration", property = "duration", jdbcType = JdbcType.BIGINT), + @Result(column = "model_size", property = "modelSize", jdbcType = JdbcType.BIGINT), + @Result(column = "time", property = "time", jdbcType = JdbcType.BIGINT, typeHandler = JobMetricsTable.InstantHandler.class), + @Result(column = "model", property = "model", jdbcType = JdbcType.VARCHAR) }) + List selectMany(SelectStatementProvider selectStatement); + + @SelectProvider(type = SqlProviderAdapter.class, method = "select") + @ResultMap("JobMetricsStatisticsResult") + JobMetricsStatistics selectOne(SelectStatementProvider selectStatement); + +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsTable.java b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsTable.java new file mode 100644 index 00000000000..3f3a46938f6 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsTable.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.metrics; + +import org.apache.ibatis.type.JdbcType; +import org.apache.ibatis.type.TypeHandler; +import org.apache.kylin.common.KylinConfig; +import org.mybatis.dynamic.sql.SqlColumn; +import org.mybatis.dynamic.sql.SqlTable; + +import java.sql.CallableStatement; +import java.sql.JDBCType; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.util.Date; +import java.util.TimeZone; + +public class JobMetricsTable extends SqlTable { + + public final SqlColumn id = column("id", JDBCType.BIGINT); + public final SqlColumn jobId = column("job_id", JDBCType.VARCHAR); + public final SqlColumn submitter = column("submitter", JDBCType.VARCHAR); + public final SqlColumn model = column("model", JDBCType.VARCHAR); + public final SqlColumn jobType = column("job_type", JDBCType.VARCHAR); + public final SqlColumn projectName = column("project_name", JDBCType.VARCHAR); + public final SqlColumn duration = column("duration", JDBCType.BIGINT); + public final SqlColumn modelSize = column("model_size", JDBCType.BIGINT); + public final SqlColumn waitTime = column("wait_time", JDBCType.BIGINT); + public final SqlColumn buildTime = column("build_time", JDBCType.BIGINT); + public final SqlColumn buildFirstDayOfMonth = column("build_first_day_of_month", JDBCType.BIGINT); + public final SqlColumn buildFirstDayOfWeek = column("build_first_day_of_week", JDBCType.BIGINT); + public final SqlColumn buildDay = column("build_day", JDBCType.BIGINT); + public final SqlColumn buildDate = column("build_date", JDBCType.DATE); + + public final SqlColumn jobState = column("job_state", JDBCType.VARCHAR); + + public final SqlColumn errorType = column("error_type", JDBCType.VARCHAR); + + public final SqlColumn errorInfo = column("error_info", JDBCType.VARCHAR); + + public final SqlColumn perBytesTimeCost = column("per_bytes_time_cost", JDBCType.DOUBLE); + + public final SqlColumn jobEngine = column("job_engine", JDBCType.VARCHAR); + + + public JobMetricsTable(String tableName) { + super(tableName); + } + + public static class InstantHandler implements TypeHandler { + + @Override + public void setParameter(PreparedStatement ps, int i, Instant parameter, JdbcType jdbcType) + throws SQLException { + ps.setLong(i, parameter.toEpochMilli()); + } + + @Override + public Instant getResult(ResultSet rs, String columnName) throws SQLException { + int offset = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()).getRawOffset(); + long offetTime = Instant.ofEpochMilli(rs.getLong(columnName)).plusMillis(offset).toEpochMilli(); + return Instant.ofEpochMilli(offetTime); + } + + @Override + public Instant getResult(ResultSet rs, int columnIndex) throws SQLException { + int offset = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()).getRawOffset(); + long offetTime = Instant.ofEpochMilli(rs.getLong(columnIndex)).plusMillis(offset).toEpochMilli(); + return Instant.ofEpochMilli(offetTime); + } + + @Override + public Instant getResult(CallableStatement cs, int columnIndex) throws SQLException { + int offset = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()).getRawOffset(); + long offetTime = Instant.ofEpochMilli(cs.getLong(columnIndex)).plusMillis(offset).toEpochMilli(); + return Instant.ofEpochMilli(offetTime); + } + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/metrics/RDBMJobMetricsDAO.java b/src/core-job/src/main/java/org/apache/kylin/job/metrics/RDBMJobMetricsDAO.java new file mode 100644 index 00000000000..5242864a34d --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/metrics/RDBMJobMetricsDAO.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.metrics; + +import lombok.val; +import org.apache.commons.collections.CollectionUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.Singletons; +import org.apache.kylin.common.persistence.transaction.UnitOfWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class RDBMJobMetricsDAO implements JobMetricsDao{ + + private static final Logger logger = LoggerFactory.getLogger(RDBMJobMetricsDAO.class); + + private final JdbcJobMetricsStore jdbcJobMetricsStore; + + public RDBMJobMetricsDAO() throws Exception { + val config = KylinConfig.getInstanceFromEnv(); + if (!UnitOfWork.isAlreadyInTransaction()) { + logger.info("Initializing RDBMJobMetricsDAO with KylinConfig Id: {} ", System.identityHashCode(config)); + } + jdbcJobMetricsStore = new JdbcJobMetricsStore(config); + } + + public static RDBMJobMetricsDAO getInstance() { + return Singletons.getInstance(RDBMJobMetricsDAO.class); + } + + public int insert(JobMetrics jobMetrics) { + return jdbcJobMetricsStore.insert(jobMetrics); + } + + @Override + public JobMetricsStatistics getJobCountAndTotalBuildCost(long startTime, long endTime, String project) { + List result = jdbcJobMetricsStore.JobCountAndTotalBuildCost(startTime, endTime, project); + if (CollectionUtils.isEmpty(result)) { + return new JobMetricsStatistics(); + } + return result.get(0); + } + + @Override + public List getJobCountByModel(long startTime, long endTime, String project) { + return jdbcJobMetricsStore.jobCountByModel(startTime, endTime, project); + } + + @Override + public List getJobCountByTime(long startTime, long endTime, String timeDimension, String project) { + return jdbcJobMetricsStore.jobCountByTime(startTime, endTime, timeDimension, project); + } + + @Override + public List getJobBuildCostByModel(long startTime, long endTime, String project) { + return jdbcJobMetricsStore.jobBuildCostByModel(startTime, endTime, project); + } + + @Override + public List getJobBuildCostByTime(long startTime, long endTime, String timeDimension, String project) { + return jdbcJobMetricsStore.jobBuildCostByTime(startTime, endTime, timeDimension, project); + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/metrics/util/JobMetricsUtil.java b/src/core-job/src/main/java/org/apache/kylin/job/metrics/util/JobMetricsUtil.java new file mode 100644 index 00000000000..7f1c1929a5a --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/metrics/util/JobMetricsUtil.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.metrics.util; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.ibatis.jdbc.ScriptRunner; +import org.apache.ibatis.mapping.Environment; +import org.apache.ibatis.session.Configuration; +import org.apache.ibatis.session.SqlSessionFactory; +import org.apache.ibatis.session.SqlSessionFactoryBuilder; +import org.apache.ibatis.transaction.TransactionFactory; +import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory; +import org.apache.ibatis.type.JdbcType; +import org.apache.kylin.common.Singletons; +import org.apache.kylin.common.logging.LogOutputStream; +import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil; +import org.apache.kylin.job.metrics.JobMetricsMapper; +import org.apache.kylin.job.metrics.JobMetricsStatisticsMapper; +import org.apache.kylin.metadata.query.util.JdbcTableUtil; + +import javax.sql.DataSource; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.Charset; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Locale; +import java.util.Properties; + +@Slf4j +public class JobMetricsUtil { + + private static final Charset DEFAULT_CHARSET = Charset.defaultCharset(); + + private static final String CREATE_JOB_METRICS_TABLE = "create.jobmetrics.store.table"; + + private static final String CREATE_JOB_METRICS_INDEX1 = "create.jobmetrics.store.tableindex1"; + + private static final String CREATE_JOB_METRICS_INDEX2 = "create.jobmetrics.store.tableindex2"; + + private static final String CREATE_JOB_METRICS_INDEX3 = "create.jobmetrics.store.tableindex3"; + + private static final String CREATE_JOB_METRICS_INDEX4 = "create.jobmetrics.store.tableindex4"; + + private static final String CREATE_JOB_METRICS_INDEX5 = "create.jobmetrics.store.tableindex5"; + + private JobMetricsUtil() { + } + + public static SqlSessionFactory getSqlSessionFactory(DataSource dataSource, String jobMetricsTableName) { + return Singletons.getInstance("job-metrics-sql-session-factory", SqlSessionFactory.class, clz -> { + TransactionFactory transactionFactory = new JdbcTransactionFactory(); + Environment environment = new Environment("job metrics", transactionFactory, dataSource); + Configuration configuration = new Configuration(environment); + configuration.setUseGeneratedKeys(true); + configuration.setJdbcTypeForNull(JdbcType.NULL); + configuration.addMapper(JobMetricsMapper.class); + configuration.addMapper(JobMetricsStatisticsMapper.class); + createJobMetricsIfNotExist((BasicDataSource) dataSource, jobMetricsTableName); + return new SqlSessionFactoryBuilder().build(configuration); + }); + } + + private static void createJobMetricsIfNotExist(BasicDataSource dataSource, String jobMetricsTableName) + throws SQLException, IOException { + if (JdbcTableUtil.isTableExist(dataSource, jobMetricsTableName)) { + return; + } + try (Connection connection = dataSource.getConnection()) { + Properties properties = JdbcUtil.getProperties(dataSource); + + ScriptRunner sr = new ScriptRunner(connection); + sr.setLogWriter(new PrintWriter(new OutputStreamWriter(new LogOutputStream(log), DEFAULT_CHARSET))); + sr.runScript(new InputStreamReader(new ByteArrayInputStream(// + String.format(Locale.ROOT, properties.getProperty(CREATE_JOB_METRICS_TABLE), jobMetricsTableName) + .getBytes(DEFAULT_CHARSET)), + DEFAULT_CHARSET)); + sr.runScript(new InputStreamReader(new ByteArrayInputStream(// + String.format(Locale.ROOT, properties.getProperty(CREATE_JOB_METRICS_INDEX1), jobMetricsTableName, + jobMetricsTableName).getBytes(DEFAULT_CHARSET)), + DEFAULT_CHARSET)); + sr.runScript(new InputStreamReader(new ByteArrayInputStream(// + String.format(Locale.ROOT, properties.getProperty(CREATE_JOB_METRICS_INDEX2), jobMetricsTableName, + jobMetricsTableName).getBytes(DEFAULT_CHARSET)), + DEFAULT_CHARSET)); + sr.runScript(new InputStreamReader(new ByteArrayInputStream(// + String.format(Locale.ROOT, properties.getProperty(CREATE_JOB_METRICS_INDEX3), jobMetricsTableName, + jobMetricsTableName).getBytes(DEFAULT_CHARSET)), + DEFAULT_CHARSET)); + sr.runScript(new InputStreamReader(new ByteArrayInputStream(// + String.format(Locale.ROOT, properties.getProperty(CREATE_JOB_METRICS_INDEX4), jobMetricsTableName, + jobMetricsTableName).getBytes(DEFAULT_CHARSET)), + DEFAULT_CHARSET)); + sr.runScript(new InputStreamReader(new ByteArrayInputStream(// + String.format(Locale.ROOT, properties.getProperty(CREATE_JOB_METRICS_INDEX5), jobMetricsTableName, + jobMetricsTableName).getBytes(DEFAULT_CHARSET)), + DEFAULT_CHARSET)); + } + } +} diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java index ca2664ec31a..0fd14ac175d 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -23,10 +23,15 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_RESTART_CHECK_SEGMENT_STATUS; import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_STATUS_ILLEGAL; import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_UPDATE_STATUS_FAILED; +import static org.apache.kylin.job.metrics.JdbcJobMetricsStore.fillZeroForJobStatistics; import static org.apache.kylin.query.util.AsyncQueryUtil.ASYNC_QUERY_JOB_ID_PRE; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Field; +import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; @@ -77,6 +82,7 @@ import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.StringHelper; +import org.apache.kylin.common.util.Unsafe; import org.apache.kylin.job.common.JobUtil; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.ExecutableConstants; @@ -96,6 +102,9 @@ import org.apache.kylin.job.execution.NExecutableManager; import org.apache.kylin.job.execution.Output; import org.apache.kylin.job.execution.StageBase; +import org.apache.kylin.job.metrics.JobMetricsDao; +import org.apache.kylin.job.metrics.JobMetricsStatistics; +import org.apache.kylin.job.metrics.RDBMJobMetricsDAO; import org.apache.kylin.metadata.cube.model.NBatchConstants; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflowManager; @@ -173,6 +182,10 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl public static final String EXCEPTION_CODE_PATH = "exception_to_code.json"; public static final String EXCEPTION_CODE_DEFAULT = "KE-030001000"; + private static final String COUNT = "count"; + private static final String MODEL = "model"; + private static final String MONTH = "month"; + public static final String JOB_STEP_PREFIX = "job_step_"; public static final String YARN_APP_SEPARATOR = "_"; public static final String BUILD_JOB_PROFILING_PARAMETER = "kylin.engine.async-profiler-enabled"; @@ -189,6 +202,10 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl jobTypeMap.put("TABLE_SAMPLING", "Sample Table"); } + private JobMetricsDao getJobMetricsDao() { + return RDBMJobMetricsDAO.getInstance(); + } + @Autowired public JobService setAclEvaluate(AclEvaluate aclEvaluate) { this.aclEvaluate = aclEvaluate; @@ -1108,6 +1125,124 @@ public Map getJobDurationPerByte(String project, long startTime, return manager.getDurationPerByteByTime(startTime, endTime, dimension); } + public JobStatisticsResponse getJobMetricsStats(String project, long startTime, long endTime) { + aclEvaluate.checkProjectOperationPermission(project); + JobMetricsDao jobMetricsDao = getJobMetricsDao(); + JobMetricsStatistics metrics = jobMetricsDao.getJobCountAndTotalBuildCost(startTime, endTime, project); + return new JobStatisticsResponse(metrics.getCount(), metrics.getDuration(), metrics.getModelSize()); + } + + public Map getJobMetricsCount(String project, long startTime, long endTime, String dimension) { + aclEvaluate.checkProjectOperationPermission(project); + JobMetricsDao jobMetricsDao = getJobMetricsDao(); + List jobMetricsStatistics; + + if (dimension.equals(MODEL)) { + jobMetricsStatistics = jobMetricsDao.getJobCountByModel(startTime, endTime, project); + return transformJobStatisticsByModel(jobMetricsStatistics, COUNT); + } + jobMetricsStatistics = jobMetricsDao.getJobCountByTime(startTime, endTime, dimension, project); + fillZeroForJobStatistics(jobMetricsStatistics, startTime, endTime, dimension); + return transformJobStatisticsByTime(jobMetricsStatistics, COUNT, dimension); + } + + public Map getJobMetricsBuildCostPerBytes(String project, long startTime, long endTime, String dimension) { + aclEvaluate.checkProjectOperationPermission(project); + JobMetricsDao jobMetricsDao = getJobMetricsDao(); + List jobMetricsStatistics; + + if (dimension.equals(MODEL)) { + jobMetricsStatistics = jobMetricsDao.getJobBuildCostByModel(startTime, endTime, project); + return transformJobBuildCostByModel(jobMetricsStatistics); + } + jobMetricsStatistics = jobMetricsDao.getJobBuildCostByTime(startTime, endTime, dimension, project); + fillZeroForJobStatistics(jobMetricsStatistics, startTime, endTime, dimension); + return transformJobBuildCostByTime(jobMetricsStatistics, dimension); + } + + private Map transformJobStatisticsByModel(List statistics, String fieldName) { + Map result = Maps.newHashMap(); + statistics.forEach(singleStatistics -> result.put(singleStatistics.getModel(), + (Integer) getValueByField(singleStatistics, fieldName))); + + return result; + } + + private Map transformJobBuildCostByModel(List statistics) { + Map result = Maps.newHashMap(); + for (JobMetricsStatistics jobMetricsStatistics : statistics) { + String modelName = jobMetricsStatistics.getModel(); + double modelSize = jobMetricsStatistics.getModelSize(); + double duration = jobMetricsStatistics.getDuration(); + if (modelSize == 0) { + result.put(modelName, .0); + } else { + result.put(modelName, duration / modelSize); + } + } + return result; + } + + private Map transformJobBuildCostByTime(List statistics, String dimension) { + Map result = Maps.newHashMap(); + + statistics.forEach(singleStatistics -> { + double modelSize = singleStatistics.getModelSize(); + double duration = singleStatistics.getDuration(); + double cost = modelSize == 0 ? .0 : duration / modelSize; + + if (dimension.equals(MONTH)) { + TimeZone timeZone = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()); + LocalDate date = singleStatistics.getTime().atZone(timeZone.toZoneId()).toLocalDate(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM", + Locale.getDefault(Locale.Category.FORMAT)); + result.put(date.withDayOfMonth(1).format(formatter), cost); + return; + } + long time = singleStatistics.getTime().toEpochMilli(); + Date date = new Date(time); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd", Locale.getDefault(Locale.Category.FORMAT)); + result.put(sdf.format(date), cost); + }); + + return result; + } + + private Map transformJobStatisticsByTime(List statistics, String fieldName, + String dimension) { + Map result = Maps.newHashMap(); + + statistics.forEach(singleStatistics -> { + if (dimension.equals(MONTH)) { + TimeZone timeZone = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()); + LocalDate date = singleStatistics.getTime().atZone(timeZone.toZoneId()).toLocalDate(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM", + Locale.getDefault(Locale.Category.FORMAT)); + result.put(date.withDayOfMonth(1).format(formatter), (Integer) getValueByField(singleStatistics, fieldName)); + return; + } + long time = singleStatistics.getTime().toEpochMilli(); + Date date = new Date(time); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd", Locale.getDefault(Locale.Category.FORMAT)); + result.put(sdf.format(date), (Integer) getValueByField(singleStatistics, fieldName)); + }); + + return result; + } + + private Object getValueByField(JobMetricsStatistics statistics, String fieldName) { + Object object = null; + try { + Field field = statistics.getClass().getDeclaredField(fieldName); + Unsafe.changeAccessibleObject(field, true); + object = field.get(statistics); + } catch (Exception e) { + logger.error("Error caught when get value from Job statistics {}", e.getMessage()); + } + + return object; + } + public Map getEventsInfoGroupByModel(String project) { aclEvaluate.checkProjectOperationPermission(project); Map result = Maps.newHashMap(); diff --git a/src/job-service/src/main/java/org/apache/kylin/rest/config/JobAppInitializer.java b/src/job-service/src/main/java/org/apache/kylin/rest/config/JobAppInitializer.java index 3c8c0953461..e333d051984 100644 --- a/src/job-service/src/main/java/org/apache/kylin/rest/config/JobAppInitializer.java +++ b/src/job-service/src/main/java/org/apache/kylin/rest/config/JobAppInitializer.java @@ -18,6 +18,7 @@ package org.apache.kylin.rest.config; import org.apache.kylin.common.scheduler.EventBusFactory; +import org.apache.kylin.job.metrics.RDBMJobMetricsDAO; import org.apache.kylin.rest.config.initialize.JobSyncListener; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; @@ -31,5 +32,10 @@ public class JobAppInitializer implements InitializingBean { @Override public void afterPropertiesSet() throws Exception { EventBusFactory.getInstance().register(jobSyncListener, true); + try { + RDBMJobMetricsDAO jobMetricsDAO = RDBMJobMetricsDAO.getInstance(); + } catch (Exception e) { + //todo + } } } diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/DashboardService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/DashboardService.java index 07c1511f93f..ca4b8e9e5a3 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/DashboardService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/DashboardService.java @@ -118,7 +118,7 @@ public MetricsResponse getQueryMetrics(String projectName, String startTime, Str public MetricsResponse getJobMetrics(String projectName, String startTime, String endTime) { MetricsResponse jobMetrics = new MetricsResponse(); - JobStatisticsResponse jobStats = jobService.getJobStats(projectName, convertToTimestamp(startTime), + JobStatisticsResponse jobStats = jobService.getJobMetricsStats(projectName, convertToTimestamp(startTime), convertToTimestamp(endTime)); Float jobCount = (float) jobStats.getCount(); Float jobTotalByteSize = (float) jobStats.getTotalByteSize(); @@ -151,18 +151,18 @@ public MetricsResponse getChartData(String category, String projectName, String } case JOB: { switch (metric) { - case JOB_COUNT: - Map jobCounts = jobService.getJobCount(projectName, _startTime, _endTime, - dimension.toLowerCase()); - MetricsResponse counts = new MetricsResponse(); - jobCounts.forEach((k, v) -> counts.increase(k, Float.valueOf(v))); - return counts; - case AVG_JOB_BUILD_TIME: - Map jobDurationPerByte = jobService.getJobDurationPerByte(projectName, _startTime, - _endTime, dimension.toLowerCase()); - MetricsResponse avgBuild = new MetricsResponse(); - jobDurationPerByte.forEach((k, v) -> avgBuild.increase(k, Float.valueOf(String.valueOf(v)))); - return avgBuild; + case JOB_COUNT: + Map jobCounts = jobService.getJobMetricsCount(projectName, _startTime, _endTime, + dimension.toLowerCase()); + MetricsResponse counts = new MetricsResponse(); + jobCounts.forEach((k, v) -> counts.increase(k, Float.valueOf(v))); + return counts; + case AVG_JOB_BUILD_TIME: + Map jobDurationPerByte = jobService.getJobMetricsBuildCostPerBytes(projectName, + _startTime, _endTime, dimension.toLowerCase()); + MetricsResponse avgBuild = new MetricsResponse(); + jobDurationPerByte.forEach((k, v) -> avgBuild.increase(k, Float.valueOf(String.valueOf(v)))); + return avgBuild; default: throw new UnsupportedQueryException("Metric should be JOB_COUNT or AVG_JOB_BUILD_TIME"); } diff --git a/src/server/pom.xml b/src/server/pom.xml index 1b24a133b6c..558d111fc28 100644 --- a/src/server/pom.xml +++ b/src/server/pom.xml @@ -252,7 +252,7 @@ com.h2database h2 - test + compile