Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions collector/percona_custom_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,76 @@ func TestScrapeCustomQueriesNoFile(t *testing.T) {
})
}

const customQueryReplicationGroupWorker = `
mysql_perf_schema_replication_group_worker:
query: "SELECT channel_name, CAST(worker_id AS CHAR) as worker_id, IO_thread, SQL_thread, transport_time_seconds FROM replication_worker_view"
metrics:
- channel_name:
usage: "LABEL"
description: "The replication channel."
- worker_id:
usage: "LABEL"
description: "The worker thread ID. 0 for single-threaded replication; 1..N for parallel replication workers."
- IO_thread:
usage: "LABEL"
description: "IO thread state."
- SQL_thread:
usage: "LABEL"
description: "SQL thread state."
- transport_time_seconds:
usage: "GAUGE"
description: "Transport time in seconds."
`

func TestScrapeCustomQueriesReplicationGroupWorkerParallelReplication(t *testing.T) {
convey.Convey("Replication group worker with parallel replication", t, func() {
tmpFileName := createTmpFile(t, string(HR), customQueryReplicationGroupWorker)
defer os.Remove(tmpFileName)

*collectCustomQueryHrDirectory = filepath.Dir(tmpFileName)
defer os.Remove(*collectCustomQueryHrDirectory)

db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()

// Simulate parallel replication: same channel_name, multiple worker_ids.
columns := []string{"channel_name", "worker_id", "IO_thread", "SQL_thread", "transport_time_seconds"}
rows := sqlmock.NewRows(columns).
AddRow("default", "1", "ON", "ON", "0.5").
AddRow("default", "2", "ON", "ON", "0.3").
AddRow("default", "3", "ON", "ON", "0.7")
mock.ExpectQuery(sanitizeQuery("SELECT channel_name, CAST(worker_id AS CHAR) as worker_id, IO_thread, SQL_thread, transport_time_seconds FROM replication_worker_view")).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
instance := &instance{db: db}
if err = (ScrapeCustomQuery{Resolution: HR}).Scrape(context.Background(), instance, ch, promslog.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
}()

metricsExpected := []MetricResult{
{labels: labelMap{"channel_name": "default", "worker_id": "1", "IO_thread": "ON", "SQL_thread": "ON"}, value: 0.5, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"channel_name": "default", "worker_id": "2", "IO_thread": "ON", "SQL_thread": "ON"}, value: 0.3, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"channel_name": "default", "worker_id": "3", "IO_thread": "ON", "SQL_thread": "ON"}, value: 0.7, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Each parallel worker produces a unique metric", func() {
for _, expect := range metricsExpected {
got := readMetric(<-ch)
convey.So(got, convey.ShouldResemble, expect)
}
})

if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled expectations: %s", err)
}
})
}

func createTmpFile(t *testing.T, resolution, content string) string {
// Create our Temp File
tempDir := os.TempDir() + "/" + resolution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,14 @@ mysql_perf_schema_5:
description: "Number of transactions which originated on this member and were rolled back by the group."

mysql_perf_schema_replication_group_worker:
query: "/*!80000 SELECT conn_status.channel_name as channel_name,
conn_status.service_state as IO_thread,
applier_status.service_state as SQL_thread,
LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP - LAST_APPLIED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP 'rep_delay_seconds',
LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP - LAST_QUEUED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP 'transport_time_seconds',
LAST_QUEUED_TRANSACTION_END_QUEUE_TIMESTAMP - LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP 'time_RL_seconds',
LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP - LAST_APPLIED_TRANSACTION_START_APPLY_TIMESTAMP 'apply_time_seconds',
if(GTID_SUBTRACT(LAST_QUEUED_TRANSACTION, LAST_APPLIED_TRANSACTION) = '','0' , abs(time_to_sec(if(time_to_sec(APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP)=0,0,timediff(APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP,now()))))) `lag_in_seconds`
FROM performance_schema.replication_connection_status AS conn_status JOIN performance_schema.replication_applier_status_by_worker AS applier_status
ON applier_status.channel_name = conn_status.channel_name WHERE conn_status.service_state = 'ON' ORDER BY lag_in_seconds, lag_in_seconds desc */"
query: "/*!80000 SELECT conn_status.channel_name as channel_name, CAST(applier_status.worker_id AS CHAR) as worker_id, conn_status.service_state as IO_thread, applier_status.service_state as SQL_thread, LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP - LAST_APPLIED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP 'rep_delay_seconds', LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP - LAST_QUEUED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP 'transport_time_seconds', LAST_QUEUED_TRANSACTION_END_QUEUE_TIMESTAMP - LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP 'time_RL_seconds', LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP - LAST_APPLIED_TRANSACTION_START_APPLY_TIMESTAMP 'apply_time_seconds', if(GTID_SUBTRACT(LAST_QUEUED_TRANSACTION, LAST_APPLIED_TRANSACTION) = '','0' , abs(time_to_sec(if(time_to_sec(APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP)=0,0,timediff(APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP,now()))))) `lag_in_seconds` FROM performance_schema.replication_connection_status AS conn_status JOIN performance_schema.replication_applier_status_by_worker AS applier_status ON applier_status.channel_name = conn_status.channel_name WHERE conn_status.service_state = 'ON' */"
metrics:
- channel_name:
usage: "LABEL"
description: "The replication channel which this row is displaying. There is always a default replication channel, and more replication channels can be added."
- worker_id:
usage: "LABEL"
description: "The worker thread ID. 0 for single-threaded replication; 1..N for parallel replication workers."
- IO_thread:
usage: "LABEL"
description: "ON (thread exists and is active or idle), OFF (thread no longer exists), or CONNECTING (thread exists and is connecting to the source)."
Expand All @@ -133,16 +127,14 @@ ON applier_status.channel_name = conn_status.channel_name WHERE conn_status.serv
description: "Lag in seconds from when the LAST transaction is COMMITTED in the Primary and the time on local Replica "

mysql_perf_schema_replication_group_worker_5:
query: "/*!50700 SELECT conn_status.channel_name as channel_name,
conn_status.service_state as IO_thread,
applier_status.service_state as SQL_thread,
1 as info
FROM performance_schema.replication_connection_status AS conn_status JOIN performance_schema.replication_applier_status_by_worker AS applier_status
ON applier_status.channel_name = conn_status.channel_name WHERE conn_status.service_state = 'ON' and (SELECT SUBSTRING(@@VERSION,1,1) = 5) */"
query: "/*!50700 SELECT conn_status.channel_name as channel_name, CAST(applier_status.worker_id AS CHAR) as worker_id, conn_status.service_state as IO_thread, applier_status.service_state as SQL_thread, 1 as info FROM performance_schema.replication_connection_status AS conn_status JOIN performance_schema.replication_applier_status_by_worker AS applier_status ON applier_status.channel_name = conn_status.channel_name WHERE conn_status.service_state = 'ON' and (SELECT SUBSTRING(@@VERSION,1,1) = 5) */"
metrics:
- channel_name:
usage: "LABEL"
description: "The replication channel which this row is displaying. There is always a default replication channel, and more replication channels can be added."
- worker_id:
usage: "LABEL"
description: "The worker thread ID. 0 for single-threaded replication; 1..N for parallel replication workers."
- IO_thread:
usage: "LABEL"
description: "ON (thread exists and is active or idle), OFF (thread no longer exists), or CONNECTING (thread exists and is connecting to the source)."
Expand Down
24 changes: 8 additions & 16 deletions queries-mysqld-group-replication.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,14 @@ mysql_perf_schema_5:
description: "Number of transactions which originated on this member and were rolled back by the group."

mysql_perf_schema_replication_group_worker:
query: "/*!80000 SELECT conn_status.channel_name as channel_name,
conn_status.service_state as IO_thread,
applier_status.service_state as SQL_thread,
LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP - LAST_APPLIED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP 'rep_delay_seconds',
LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP - LAST_QUEUED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP 'transport_time_seconds',
LAST_QUEUED_TRANSACTION_END_QUEUE_TIMESTAMP - LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP 'time_RL_seconds',
LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP - LAST_APPLIED_TRANSACTION_START_APPLY_TIMESTAMP 'apply_time_seconds',
if(GTID_SUBTRACT(LAST_QUEUED_TRANSACTION, LAST_APPLIED_TRANSACTION) = '','0' , abs(time_to_sec(if(time_to_sec(APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP)=0,0,timediff(APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP,now()))))) `lag_in_seconds`
FROM performance_schema.replication_connection_status AS conn_status JOIN performance_schema.replication_applier_status_by_worker AS applier_status
ON applier_status.channel_name = conn_status.channel_name WHERE conn_status.service_state = 'ON' ORDER BY lag_in_seconds, lag_in_seconds desc */"
query: "/*!80000 SELECT conn_status.channel_name as channel_name, CAST(applier_status.worker_id AS CHAR) as worker_id, conn_status.service_state as IO_thread, applier_status.service_state as SQL_thread, LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP - LAST_APPLIED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP 'rep_delay_seconds', LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP - LAST_QUEUED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP 'transport_time_seconds', LAST_QUEUED_TRANSACTION_END_QUEUE_TIMESTAMP - LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP 'time_RL_seconds', LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP - LAST_APPLIED_TRANSACTION_START_APPLY_TIMESTAMP 'apply_time_seconds', if(GTID_SUBTRACT(LAST_QUEUED_TRANSACTION, LAST_APPLIED_TRANSACTION) = '','0' , abs(time_to_sec(if(time_to_sec(APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP)=0,0,timediff(APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP,now()))))) `lag_in_seconds` FROM performance_schema.replication_connection_status AS conn_status JOIN performance_schema.replication_applier_status_by_worker AS applier_status ON applier_status.channel_name = conn_status.channel_name WHERE conn_status.service_state = 'ON' */"
metrics:
- channel_name:
usage: "LABEL"
description: "The replication channel which this row is displaying. There is always a default replication channel, and more replication channels can be added."
- worker_id:
usage: "LABEL"
description: "The worker thread ID. 0 for single-threaded replication; 1..N for parallel replication workers."
- IO_thread:
usage: "LABEL"
description: "ON (thread exists and is active or idle), OFF (thread no longer exists), or CONNECTING (thread exists and is connecting to the source)."
Expand All @@ -133,16 +127,14 @@ ON applier_status.channel_name = conn_status.channel_name WHERE conn_status.serv
description: "Lag in seconds from when the LAST transaction is COMMITTED in the Primary and the time on local Replica "

mysql_perf_schema_replication_group_worker_5:
query: "/*!50700 SELECT conn_status.channel_name as channel_name,
conn_status.service_state as IO_thread,
applier_status.service_state as SQL_thread,
1 as info
FROM performance_schema.replication_connection_status AS conn_status JOIN performance_schema.replication_applier_status_by_worker AS applier_status
ON applier_status.channel_name = conn_status.channel_name WHERE conn_status.service_state = 'ON' and (SELECT SUBSTRING(@@VERSION,1,1) = 5) */"
query: "/*!50700 SELECT conn_status.channel_name as channel_name, CAST(applier_status.worker_id AS CHAR) as worker_id, conn_status.service_state as IO_thread, applier_status.service_state as SQL_thread, 1 as info FROM performance_schema.replication_connection_status AS conn_status JOIN performance_schema.replication_applier_status_by_worker AS applier_status ON applier_status.channel_name = conn_status.channel_name WHERE conn_status.service_state = 'ON' and (SELECT SUBSTRING(@@VERSION,1,1) = 5) */"
metrics:
- channel_name:
usage: "LABEL"
description: "The replication channel which this row is displaying. There is always a default replication channel, and more replication channels can be added."
- worker_id:
usage: "LABEL"
description: "The worker thread ID. 0 for single-threaded replication; 1..N for parallel replication workers."
- IO_thread:
usage: "LABEL"
description: "ON (thread exists and is active or idle), OFF (thread no longer exists), or CONNECTING (thread exists and is connecting to the source)."
Expand Down