From 7d950a5c533903c6e6d97dc1d001f93de1c2e20b Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 13 Feb 2026 11:55:31 +1000 Subject: [PATCH] Add node resource hourly/daily aggregation tables Add host spec external model and 9 transformation models for aggregating observoor node resource data into hourly and daily granularities: - fct_node_host_spec: Hardware specs from observoor host_specs - fct_node_cpu_utilization_hourly/daily: CPU usage aggregated per node - fct_node_memory_usage_hourly/daily: Memory usage aggregated per node - fct_node_disk_io_hourly/daily: Disk I/O aggregated per node and r/w - fct_node_network_io_hourly/daily: Network I/O per node, port, direction Hourly models aggregate slot-level per-process data from existing fct_node_*_by_process tables. Daily models re-aggregate from hourly using weighted averages (by slot_count). --- migrations/074_fct_node_host_spec.down.sql | 2 + migrations/074_fct_node_host_spec.up.sql | 53 ++++++++++++++++ ...5_fct_node_cpu_utilization_hourly.down.sql | 2 + ...075_fct_node_cpu_utilization_hourly.up.sql | 32 ++++++++++ .../076_fct_node_memory_usage_hourly.down.sql | 2 + .../076_fct_node_memory_usage_hourly.up.sql | 32 ++++++++++ .../077_fct_node_disk_io_hourly.down.sql | 2 + migrations/077_fct_node_disk_io_hourly.up.sql | 31 ++++++++++ .../078_fct_node_network_io_hourly.down.sql | 2 + .../078_fct_node_network_io_hourly.up.sql | 32 ++++++++++ ...79_fct_node_cpu_utilization_daily.down.sql | 2 + .../079_fct_node_cpu_utilization_daily.up.sql | 32 ++++++++++ .../080_fct_node_memory_usage_daily.down.sql | 2 + .../080_fct_node_memory_usage_daily.up.sql | 32 ++++++++++ .../081_fct_node_disk_io_daily.down.sql | 2 + migrations/081_fct_node_disk_io_daily.up.sql | 31 ++++++++++ .../082_fct_node_network_io_daily.down.sql | 2 + .../082_fct_node_network_io_daily.up.sql | 32 ++++++++++ models/external/observoor_host_specs.sql | 32 ++++++++++ .../fct_node_cpu_utilization_daily.sql | 62 +++++++++++++++++++ .../fct_node_cpu_utilization_hourly.sql | 59 ++++++++++++++++++ .../fct_node_disk_io_daily.sql | 59 ++++++++++++++++++ .../fct_node_disk_io_hourly.sql | 55 ++++++++++++++++ models/transformations/fct_node_host_spec.sql | 58 +++++++++++++++++ .../fct_node_memory_usage_daily.sql | 61 ++++++++++++++++++ .../fct_node_memory_usage_hourly.sql | 57 +++++++++++++++++ .../fct_node_network_io_daily.sql | 61 ++++++++++++++++++ .../fct_node_network_io_hourly.sql | 58 +++++++++++++++++ 28 files changed, 887 insertions(+) create mode 100644 migrations/074_fct_node_host_spec.down.sql create mode 100644 migrations/074_fct_node_host_spec.up.sql create mode 100644 migrations/075_fct_node_cpu_utilization_hourly.down.sql create mode 100644 migrations/075_fct_node_cpu_utilization_hourly.up.sql create mode 100644 migrations/076_fct_node_memory_usage_hourly.down.sql create mode 100644 migrations/076_fct_node_memory_usage_hourly.up.sql create mode 100644 migrations/077_fct_node_disk_io_hourly.down.sql create mode 100644 migrations/077_fct_node_disk_io_hourly.up.sql create mode 100644 migrations/078_fct_node_network_io_hourly.down.sql create mode 100644 migrations/078_fct_node_network_io_hourly.up.sql create mode 100644 migrations/079_fct_node_cpu_utilization_daily.down.sql create mode 100644 migrations/079_fct_node_cpu_utilization_daily.up.sql create mode 100644 migrations/080_fct_node_memory_usage_daily.down.sql create mode 100644 migrations/080_fct_node_memory_usage_daily.up.sql create mode 100644 migrations/081_fct_node_disk_io_daily.down.sql create mode 100644 migrations/081_fct_node_disk_io_daily.up.sql create mode 100644 migrations/082_fct_node_network_io_daily.down.sql create mode 100644 migrations/082_fct_node_network_io_daily.up.sql create mode 100644 models/external/observoor_host_specs.sql create mode 100644 models/transformations/fct_node_cpu_utilization_daily.sql create mode 100644 models/transformations/fct_node_cpu_utilization_hourly.sql create mode 100644 models/transformations/fct_node_disk_io_daily.sql create mode 100644 models/transformations/fct_node_disk_io_hourly.sql create mode 100644 models/transformations/fct_node_host_spec.sql create mode 100644 models/transformations/fct_node_memory_usage_daily.sql create mode 100644 models/transformations/fct_node_memory_usage_hourly.sql create mode 100644 models/transformations/fct_node_network_io_daily.sql create mode 100644 models/transformations/fct_node_network_io_hourly.sql diff --git a/migrations/074_fct_node_host_spec.down.sql b/migrations/074_fct_node_host_spec.down.sql new file mode 100644 index 00000000..957f2c8b --- /dev/null +++ b/migrations/074_fct_node_host_spec.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_host_spec ON CLUSTER '{cluster}'; +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_host_spec_local ON CLUSTER '{cluster}'; diff --git a/migrations/074_fct_node_host_spec.up.sql b/migrations/074_fct_node_host_spec.up.sql new file mode 100644 index 00000000..fca60710 --- /dev/null +++ b/migrations/074_fct_node_host_spec.up.sql @@ -0,0 +1,53 @@ +-- Time-history fact table for node host hardware specifications +CREATE TABLE `${NETWORK_NAME}`.fct_node_host_spec_local ON CLUSTER '{cluster}' ( + `updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)), + `wallclock_slot_start_date_time` DateTime64(3) COMMENT 'The wall clock time when the slot started' CODEC(DoubleDelta, ZSTD(1)), + `meta_client_name` LowCardinality(String) COMMENT 'Name of the observoor client that collected the data', + `meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name', + `node_class` LowCardinality(String) COMMENT 'Node classification for filtering (e.g. eip7870)', + `host_id` String COMMENT 'Unique host identifier' CODEC(ZSTD(1)), + `kernel_release` LowCardinality(String) COMMENT 'OS kernel release version', + `os_name` LowCardinality(String) COMMENT 'Operating system name', + `architecture` LowCardinality(String) COMMENT 'CPU architecture (e.g. x86_64, aarch64)', + `cpu_model` String COMMENT 'CPU model name' CODEC(ZSTD(1)), + `cpu_vendor` LowCardinality(String) COMMENT 'CPU vendor (e.g. GenuineIntel, AuthenticAMD)', + `cpu_online_cores` UInt16 COMMENT 'Number of online CPU cores' CODEC(ZSTD(1)), + `cpu_logical_cores` UInt16 COMMENT 'Number of logical CPU cores' CODEC(ZSTD(1)), + `cpu_physical_cores` UInt16 COMMENT 'Number of physical CPU cores' CODEC(ZSTD(1)), + `cpu_performance_cores` UInt16 COMMENT 'Number of performance cores (hybrid CPUs)' CODEC(ZSTD(1)), + `cpu_efficiency_cores` UInt16 COMMENT 'Number of efficiency cores (hybrid CPUs)' CODEC(ZSTD(1)), + `cpu_unknown_type_cores` UInt16 COMMENT 'Number of cores with unknown type' CODEC(ZSTD(1)), + `cpu_core_types` Array(UInt8) COMMENT 'Core type identifiers per core', + `cpu_core_type_labels` Array(LowCardinality(String)) COMMENT 'Core type labels per core', + `cpu_max_freq_khz` Array(UInt64) COMMENT 'Maximum frequency per core in kHz', + `cpu_base_freq_khz` Array(UInt64) COMMENT 'Base frequency per core in kHz', + `memory_total_bytes` UInt64 COMMENT 'Total system memory in bytes' CODEC(ZSTD(1)), + `memory_type` LowCardinality(String) COMMENT 'Memory type (e.g. DDR4, DDR5)', + `memory_speed_mts` UInt32 COMMENT 'Memory speed in MT/s' CODEC(ZSTD(1)), + `memory_dimm_count` UInt16 COMMENT 'Number of memory DIMMs' CODEC(ZSTD(1)), + `memory_dimm_sizes_bytes` Array(UInt64) COMMENT 'Size of each DIMM in bytes', + `memory_dimm_types` Array(LowCardinality(String)) COMMENT 'Type of each DIMM', + `memory_dimm_speeds_mts` Array(UInt32) COMMENT 'Speed of each DIMM in MT/s', + `disk_count` UInt16 COMMENT 'Number of disk devices' CODEC(ZSTD(1)), + `disk_total_bytes` UInt64 COMMENT 'Total disk capacity in bytes' CODEC(ZSTD(1)), + `disk_names` Array(String) COMMENT 'Device names of each disk', + `disk_models` Array(String) COMMENT 'Model names of each disk', + `disk_sizes_bytes` Array(UInt64) COMMENT 'Size of each disk in bytes', + `disk_rotational` Array(UInt8) COMMENT 'Whether each disk is rotational (1) or SSD (0)' +) ENGINE = ReplicatedReplacingMergeTree( + '/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', + '{replica}', + `updated_date_time` +) PARTITION BY toStartOfMonth(wallclock_slot_start_date_time) +ORDER BY + (wallclock_slot_start_date_time, meta_client_name) +SETTINGS + deduplicate_merge_projection_mode = 'rebuild' +COMMENT 'Node host hardware specifications over time with node classification'; + +CREATE TABLE `${NETWORK_NAME}`.fct_node_host_spec ON CLUSTER '{cluster}' AS `${NETWORK_NAME}`.fct_node_host_spec_local ENGINE = Distributed( + '{cluster}', + '${NETWORK_NAME}', + fct_node_host_spec_local, + cityHash64(wallclock_slot_start_date_time, meta_client_name) +); diff --git a/migrations/075_fct_node_cpu_utilization_hourly.down.sql b/migrations/075_fct_node_cpu_utilization_hourly.down.sql new file mode 100644 index 00000000..e6bc6a52 --- /dev/null +++ b/migrations/075_fct_node_cpu_utilization_hourly.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_cpu_utilization_hourly ON CLUSTER '{cluster}'; +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_cpu_utilization_hourly_local ON CLUSTER '{cluster}'; diff --git a/migrations/075_fct_node_cpu_utilization_hourly.up.sql b/migrations/075_fct_node_cpu_utilization_hourly.up.sql new file mode 100644 index 00000000..18a0db89 --- /dev/null +++ b/migrations/075_fct_node_cpu_utilization_hourly.up.sql @@ -0,0 +1,32 @@ +-- Hourly aggregation of node CPU utilization across all processes +CREATE TABLE `${NETWORK_NAME}`.fct_node_cpu_utilization_hourly_local ON CLUSTER '{cluster}' ( + `updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)), + `hour_start_date_time` DateTime COMMENT 'Start of the hour period' CODEC(DoubleDelta, ZSTD(1)), + `meta_client_name` LowCardinality(String) COMMENT 'Name of the observoor client that collected the data', + `meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name', + `node_class` LowCardinality(String) COMMENT 'Node classification for filtering (e.g. eip7870)', + `system_cores` UInt16 COMMENT 'Total system CPU cores' CODEC(ZSTD(1)), + `slot_count` UInt32 COMMENT 'Number of slots in this hour' CODEC(ZSTD(1)), + `avg_core_pct` Float32 COMMENT 'Average total CPU core utilization percentage' CODEC(ZSTD(1)), + `min_core_pct` Float32 COMMENT 'Minimum total CPU core utilization percentage' CODEC(ZSTD(1)), + `max_core_pct` Float32 COMMENT 'Maximum total CPU core utilization percentage' CODEC(ZSTD(1)), + `p50_core_pct` Float32 COMMENT '50th percentile total CPU core utilization' CODEC(ZSTD(1)), + `p95_core_pct` Float32 COMMENT '95th percentile total CPU core utilization' CODEC(ZSTD(1)) +) ENGINE = ReplicatedReplacingMergeTree( + '/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', + '{replica}', + `updated_date_time` +) PARTITION BY toStartOfMonth(hour_start_date_time) +ORDER BY (hour_start_date_time, meta_client_name) +SETTINGS + deduplicate_merge_projection_mode = 'rebuild' +COMMENT 'Hourly aggregated node CPU utilization statistics per node'; + +CREATE TABLE `${NETWORK_NAME}`.fct_node_cpu_utilization_hourly ON CLUSTER '{cluster}' +AS `${NETWORK_NAME}`.fct_node_cpu_utilization_hourly_local +ENGINE = Distributed( + '{cluster}', + '${NETWORK_NAME}', + fct_node_cpu_utilization_hourly_local, + cityHash64(hour_start_date_time, meta_client_name) +); diff --git a/migrations/076_fct_node_memory_usage_hourly.down.sql b/migrations/076_fct_node_memory_usage_hourly.down.sql new file mode 100644 index 00000000..3860386e --- /dev/null +++ b/migrations/076_fct_node_memory_usage_hourly.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_memory_usage_hourly ON CLUSTER '{cluster}'; +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_memory_usage_hourly_local ON CLUSTER '{cluster}'; diff --git a/migrations/076_fct_node_memory_usage_hourly.up.sql b/migrations/076_fct_node_memory_usage_hourly.up.sql new file mode 100644 index 00000000..7edf7473 --- /dev/null +++ b/migrations/076_fct_node_memory_usage_hourly.up.sql @@ -0,0 +1,32 @@ +-- Hourly aggregation of node memory usage across all processes +CREATE TABLE `${NETWORK_NAME}`.fct_node_memory_usage_hourly_local ON CLUSTER '{cluster}' ( + `updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)), + `hour_start_date_time` DateTime COMMENT 'Start of the hour period' CODEC(DoubleDelta, ZSTD(1)), + `meta_client_name` LowCardinality(String) COMMENT 'Name of the observoor client that collected the data', + `meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name', + `node_class` LowCardinality(String) COMMENT 'Node classification for filtering (e.g. eip7870)', + `slot_count` UInt32 COMMENT 'Number of slots in this hour' CODEC(ZSTD(1)), + `avg_vm_rss_bytes` UInt64 COMMENT 'Average total RSS memory in bytes' CODEC(ZSTD(1)), + `min_vm_rss_bytes` UInt64 COMMENT 'Minimum total RSS memory in bytes' CODEC(ZSTD(1)), + `max_vm_rss_bytes` UInt64 COMMENT 'Maximum total RSS memory in bytes' CODEC(ZSTD(1)), + `avg_rss_anon_bytes` UInt64 COMMENT 'Average total anonymous RSS memory in bytes' CODEC(ZSTD(1)), + `avg_rss_file_bytes` UInt64 COMMENT 'Average total file-backed RSS memory in bytes' CODEC(ZSTD(1)), + `avg_vm_swap_bytes` UInt64 COMMENT 'Average total swap memory in bytes' CODEC(ZSTD(1)) +) ENGINE = ReplicatedReplacingMergeTree( + '/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', + '{replica}', + `updated_date_time` +) PARTITION BY toStartOfMonth(hour_start_date_time) +ORDER BY (hour_start_date_time, meta_client_name) +SETTINGS + deduplicate_merge_projection_mode = 'rebuild' +COMMENT 'Hourly aggregated node memory usage statistics per node'; + +CREATE TABLE `${NETWORK_NAME}`.fct_node_memory_usage_hourly ON CLUSTER '{cluster}' +AS `${NETWORK_NAME}`.fct_node_memory_usage_hourly_local +ENGINE = Distributed( + '{cluster}', + '${NETWORK_NAME}', + fct_node_memory_usage_hourly_local, + cityHash64(hour_start_date_time, meta_client_name) +); diff --git a/migrations/077_fct_node_disk_io_hourly.down.sql b/migrations/077_fct_node_disk_io_hourly.down.sql new file mode 100644 index 00000000..90cb7b08 --- /dev/null +++ b/migrations/077_fct_node_disk_io_hourly.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_disk_io_hourly ON CLUSTER '{cluster}'; +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_disk_io_hourly_local ON CLUSTER '{cluster}'; diff --git a/migrations/077_fct_node_disk_io_hourly.up.sql b/migrations/077_fct_node_disk_io_hourly.up.sql new file mode 100644 index 00000000..a9066c83 --- /dev/null +++ b/migrations/077_fct_node_disk_io_hourly.up.sql @@ -0,0 +1,31 @@ +-- Hourly aggregation of node disk I/O across all processes +CREATE TABLE `${NETWORK_NAME}`.fct_node_disk_io_hourly_local ON CLUSTER '{cluster}' ( + `updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)), + `hour_start_date_time` DateTime COMMENT 'Start of the hour period' CODEC(DoubleDelta, ZSTD(1)), + `meta_client_name` LowCardinality(String) COMMENT 'Name of the observoor client that collected the data', + `meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name', + `node_class` LowCardinality(String) COMMENT 'Node classification for filtering (e.g. eip7870)', + `rw` LowCardinality(String) COMMENT 'Read or write operation', + `slot_count` UInt32 COMMENT 'Number of slots in this hour' CODEC(ZSTD(1)), + `sum_io_bytes` Float64 COMMENT 'Total bytes transferred in this hour' CODEC(ZSTD(1)), + `avg_io_bytes` Float32 COMMENT 'Average bytes transferred per slot' CODEC(ZSTD(1)), + `sum_io_ops` UInt64 COMMENT 'Total I/O operations in this hour' CODEC(ZSTD(1)), + `avg_io_ops` UInt32 COMMENT 'Average I/O operations per slot' CODEC(ZSTD(1)) +) ENGINE = ReplicatedReplacingMergeTree( + '/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', + '{replica}', + `updated_date_time` +) PARTITION BY toStartOfMonth(hour_start_date_time) +ORDER BY (hour_start_date_time, meta_client_name, rw) +SETTINGS + deduplicate_merge_projection_mode = 'rebuild' +COMMENT 'Hourly aggregated node disk I/O statistics per node and read/write direction'; + +CREATE TABLE `${NETWORK_NAME}`.fct_node_disk_io_hourly ON CLUSTER '{cluster}' +AS `${NETWORK_NAME}`.fct_node_disk_io_hourly_local +ENGINE = Distributed( + '{cluster}', + '${NETWORK_NAME}', + fct_node_disk_io_hourly_local, + cityHash64(hour_start_date_time, meta_client_name) +); diff --git a/migrations/078_fct_node_network_io_hourly.down.sql b/migrations/078_fct_node_network_io_hourly.down.sql new file mode 100644 index 00000000..045145c5 --- /dev/null +++ b/migrations/078_fct_node_network_io_hourly.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_network_io_hourly ON CLUSTER '{cluster}'; +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_network_io_hourly_local ON CLUSTER '{cluster}'; diff --git a/migrations/078_fct_node_network_io_hourly.up.sql b/migrations/078_fct_node_network_io_hourly.up.sql new file mode 100644 index 00000000..79ea348a --- /dev/null +++ b/migrations/078_fct_node_network_io_hourly.up.sql @@ -0,0 +1,32 @@ +-- Hourly aggregation of node network I/O across all processes +CREATE TABLE `${NETWORK_NAME}`.fct_node_network_io_hourly_local ON CLUSTER '{cluster}' ( + `updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)), + `hour_start_date_time` DateTime COMMENT 'Start of the hour period' CODEC(DoubleDelta, ZSTD(1)), + `meta_client_name` LowCardinality(String) COMMENT 'Name of the observoor client that collected the data', + `meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name', + `node_class` LowCardinality(String) COMMENT 'Node classification for filtering (e.g. eip7870)', + `port_label` LowCardinality(String) COMMENT 'Port classification (e.g. cl_p2p_tcp, el_json_rpc)', + `direction` LowCardinality(String) COMMENT 'Traffic direction: tx or rx', + `slot_count` UInt32 COMMENT 'Number of slots in this hour' CODEC(ZSTD(1)), + `sum_io_bytes` Float64 COMMENT 'Total bytes transferred in this hour' CODEC(ZSTD(1)), + `avg_io_bytes` Float32 COMMENT 'Average bytes transferred per slot' CODEC(ZSTD(1)), + `sum_io_count` UInt64 COMMENT 'Total packet count in this hour' CODEC(ZSTD(1)), + `avg_io_count` UInt32 COMMENT 'Average packet count per slot' CODEC(ZSTD(1)) +) ENGINE = ReplicatedReplacingMergeTree( + '/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', + '{replica}', + `updated_date_time` +) PARTITION BY toStartOfMonth(hour_start_date_time) +ORDER BY (hour_start_date_time, meta_client_name, port_label, direction) +SETTINGS + deduplicate_merge_projection_mode = 'rebuild' +COMMENT 'Hourly aggregated node network I/O statistics per node, port, and direction'; + +CREATE TABLE `${NETWORK_NAME}`.fct_node_network_io_hourly ON CLUSTER '{cluster}' +AS `${NETWORK_NAME}`.fct_node_network_io_hourly_local +ENGINE = Distributed( + '{cluster}', + '${NETWORK_NAME}', + fct_node_network_io_hourly_local, + cityHash64(hour_start_date_time, meta_client_name) +); diff --git a/migrations/079_fct_node_cpu_utilization_daily.down.sql b/migrations/079_fct_node_cpu_utilization_daily.down.sql new file mode 100644 index 00000000..47132017 --- /dev/null +++ b/migrations/079_fct_node_cpu_utilization_daily.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_cpu_utilization_daily ON CLUSTER '{cluster}'; +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_cpu_utilization_daily_local ON CLUSTER '{cluster}'; diff --git a/migrations/079_fct_node_cpu_utilization_daily.up.sql b/migrations/079_fct_node_cpu_utilization_daily.up.sql new file mode 100644 index 00000000..dd129a95 --- /dev/null +++ b/migrations/079_fct_node_cpu_utilization_daily.up.sql @@ -0,0 +1,32 @@ +-- Daily aggregation of node CPU utilization +CREATE TABLE `${NETWORK_NAME}`.fct_node_cpu_utilization_daily_local ON CLUSTER '{cluster}' ( + `updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)), + `day_start_date` Date COMMENT 'Start of the day period' CODEC(DoubleDelta, ZSTD(1)), + `meta_client_name` LowCardinality(String) COMMENT 'Name of the observoor client that collected the data', + `meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name', + `node_class` LowCardinality(String) COMMENT 'Node classification for filtering (e.g. eip7870)', + `system_cores` UInt16 COMMENT 'Total system CPU cores' CODEC(ZSTD(1)), + `hour_count` UInt32 COMMENT 'Number of source hourly slots in this day' CODEC(ZSTD(1)), + `avg_core_pct` Float32 COMMENT 'Weighted average total CPU core utilization percentage' CODEC(ZSTD(1)), + `min_core_pct` Float32 COMMENT 'Minimum total CPU core utilization percentage' CODEC(ZSTD(1)), + `max_core_pct` Float32 COMMENT 'Maximum total CPU core utilization percentage' CODEC(ZSTD(1)), + `p50_core_pct` Float32 COMMENT 'Weighted 50th percentile total CPU core utilization' CODEC(ZSTD(1)), + `p95_core_pct` Float32 COMMENT 'Maximum of hourly 95th percentile CPU utilization' CODEC(ZSTD(1)) +) ENGINE = ReplicatedReplacingMergeTree( + '/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', + '{replica}', + `updated_date_time` +) PARTITION BY toYYYYMM(day_start_date) +ORDER BY (day_start_date, meta_client_name) +SETTINGS + deduplicate_merge_projection_mode = 'rebuild' +COMMENT 'Daily aggregated node CPU utilization statistics per node'; + +CREATE TABLE `${NETWORK_NAME}`.fct_node_cpu_utilization_daily ON CLUSTER '{cluster}' +AS `${NETWORK_NAME}`.fct_node_cpu_utilization_daily_local +ENGINE = Distributed( + '{cluster}', + '${NETWORK_NAME}', + fct_node_cpu_utilization_daily_local, + cityHash64(day_start_date, meta_client_name) +); diff --git a/migrations/080_fct_node_memory_usage_daily.down.sql b/migrations/080_fct_node_memory_usage_daily.down.sql new file mode 100644 index 00000000..8389bdb6 --- /dev/null +++ b/migrations/080_fct_node_memory_usage_daily.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_memory_usage_daily ON CLUSTER '{cluster}'; +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_memory_usage_daily_local ON CLUSTER '{cluster}'; diff --git a/migrations/080_fct_node_memory_usage_daily.up.sql b/migrations/080_fct_node_memory_usage_daily.up.sql new file mode 100644 index 00000000..5db0fdf2 --- /dev/null +++ b/migrations/080_fct_node_memory_usage_daily.up.sql @@ -0,0 +1,32 @@ +-- Daily aggregation of node memory usage +CREATE TABLE `${NETWORK_NAME}`.fct_node_memory_usage_daily_local ON CLUSTER '{cluster}' ( + `updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)), + `day_start_date` Date COMMENT 'Start of the day period' CODEC(DoubleDelta, ZSTD(1)), + `meta_client_name` LowCardinality(String) COMMENT 'Name of the observoor client that collected the data', + `meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name', + `node_class` LowCardinality(String) COMMENT 'Node classification for filtering (e.g. eip7870)', + `hour_count` UInt32 COMMENT 'Number of source hourly slots in this day' CODEC(ZSTD(1)), + `avg_vm_rss_bytes` UInt64 COMMENT 'Weighted average total RSS memory in bytes' CODEC(ZSTD(1)), + `min_vm_rss_bytes` UInt64 COMMENT 'Minimum total RSS memory in bytes' CODEC(ZSTD(1)), + `max_vm_rss_bytes` UInt64 COMMENT 'Maximum total RSS memory in bytes' CODEC(ZSTD(1)), + `avg_rss_anon_bytes` UInt64 COMMENT 'Weighted average total anonymous RSS memory in bytes' CODEC(ZSTD(1)), + `avg_rss_file_bytes` UInt64 COMMENT 'Weighted average total file-backed RSS memory in bytes' CODEC(ZSTD(1)), + `avg_vm_swap_bytes` UInt64 COMMENT 'Weighted average total swap memory in bytes' CODEC(ZSTD(1)) +) ENGINE = ReplicatedReplacingMergeTree( + '/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', + '{replica}', + `updated_date_time` +) PARTITION BY toYYYYMM(day_start_date) +ORDER BY (day_start_date, meta_client_name) +SETTINGS + deduplicate_merge_projection_mode = 'rebuild' +COMMENT 'Daily aggregated node memory usage statistics per node'; + +CREATE TABLE `${NETWORK_NAME}`.fct_node_memory_usage_daily ON CLUSTER '{cluster}' +AS `${NETWORK_NAME}`.fct_node_memory_usage_daily_local +ENGINE = Distributed( + '{cluster}', + '${NETWORK_NAME}', + fct_node_memory_usage_daily_local, + cityHash64(day_start_date, meta_client_name) +); diff --git a/migrations/081_fct_node_disk_io_daily.down.sql b/migrations/081_fct_node_disk_io_daily.down.sql new file mode 100644 index 00000000..f617ea32 --- /dev/null +++ b/migrations/081_fct_node_disk_io_daily.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_disk_io_daily ON CLUSTER '{cluster}'; +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_disk_io_daily_local ON CLUSTER '{cluster}'; diff --git a/migrations/081_fct_node_disk_io_daily.up.sql b/migrations/081_fct_node_disk_io_daily.up.sql new file mode 100644 index 00000000..2a4c9b04 --- /dev/null +++ b/migrations/081_fct_node_disk_io_daily.up.sql @@ -0,0 +1,31 @@ +-- Daily aggregation of node disk I/O +CREATE TABLE `${NETWORK_NAME}`.fct_node_disk_io_daily_local ON CLUSTER '{cluster}' ( + `updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)), + `day_start_date` Date COMMENT 'Start of the day period' CODEC(DoubleDelta, ZSTD(1)), + `meta_client_name` LowCardinality(String) COMMENT 'Name of the observoor client that collected the data', + `meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name', + `node_class` LowCardinality(String) COMMENT 'Node classification for filtering (e.g. eip7870)', + `rw` LowCardinality(String) COMMENT 'Read or write operation', + `hour_count` UInt32 COMMENT 'Number of source hourly slots in this day' CODEC(ZSTD(1)), + `sum_io_bytes` Float64 COMMENT 'Total bytes transferred in this day' CODEC(ZSTD(1)), + `avg_io_bytes` Float32 COMMENT 'Weighted average bytes transferred per slot' CODEC(ZSTD(1)), + `sum_io_ops` UInt64 COMMENT 'Total I/O operations in this day' CODEC(ZSTD(1)), + `avg_io_ops` UInt32 COMMENT 'Weighted average I/O operations per slot' CODEC(ZSTD(1)) +) ENGINE = ReplicatedReplacingMergeTree( + '/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', + '{replica}', + `updated_date_time` +) PARTITION BY toYYYYMM(day_start_date) +ORDER BY (day_start_date, meta_client_name, rw) +SETTINGS + deduplicate_merge_projection_mode = 'rebuild' +COMMENT 'Daily aggregated node disk I/O statistics per node and read/write direction'; + +CREATE TABLE `${NETWORK_NAME}`.fct_node_disk_io_daily ON CLUSTER '{cluster}' +AS `${NETWORK_NAME}`.fct_node_disk_io_daily_local +ENGINE = Distributed( + '{cluster}', + '${NETWORK_NAME}', + fct_node_disk_io_daily_local, + cityHash64(day_start_date, meta_client_name) +); diff --git a/migrations/082_fct_node_network_io_daily.down.sql b/migrations/082_fct_node_network_io_daily.down.sql new file mode 100644 index 00000000..78a693a9 --- /dev/null +++ b/migrations/082_fct_node_network_io_daily.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_network_io_daily ON CLUSTER '{cluster}'; +DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_node_network_io_daily_local ON CLUSTER '{cluster}'; diff --git a/migrations/082_fct_node_network_io_daily.up.sql b/migrations/082_fct_node_network_io_daily.up.sql new file mode 100644 index 00000000..1d4880f1 --- /dev/null +++ b/migrations/082_fct_node_network_io_daily.up.sql @@ -0,0 +1,32 @@ +-- Daily aggregation of node network I/O +CREATE TABLE `${NETWORK_NAME}`.fct_node_network_io_daily_local ON CLUSTER '{cluster}' ( + `updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)), + `day_start_date` Date COMMENT 'Start of the day period' CODEC(DoubleDelta, ZSTD(1)), + `meta_client_name` LowCardinality(String) COMMENT 'Name of the observoor client that collected the data', + `meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name', + `node_class` LowCardinality(String) COMMENT 'Node classification for filtering (e.g. eip7870)', + `port_label` LowCardinality(String) COMMENT 'Port classification (e.g. cl_p2p_tcp, el_json_rpc)', + `direction` LowCardinality(String) COMMENT 'Traffic direction: tx or rx', + `hour_count` UInt32 COMMENT 'Number of source hourly slots in this day' CODEC(ZSTD(1)), + `sum_io_bytes` Float64 COMMENT 'Total bytes transferred in this day' CODEC(ZSTD(1)), + `avg_io_bytes` Float32 COMMENT 'Weighted average bytes transferred per slot' CODEC(ZSTD(1)), + `sum_io_count` UInt64 COMMENT 'Total packet count in this day' CODEC(ZSTD(1)), + `avg_io_count` UInt32 COMMENT 'Weighted average packet count per slot' CODEC(ZSTD(1)) +) ENGINE = ReplicatedReplacingMergeTree( + '/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', + '{replica}', + `updated_date_time` +) PARTITION BY toYYYYMM(day_start_date) +ORDER BY (day_start_date, meta_client_name, port_label, direction) +SETTINGS + deduplicate_merge_projection_mode = 'rebuild' +COMMENT 'Daily aggregated node network I/O statistics per node, port, and direction'; + +CREATE TABLE `${NETWORK_NAME}`.fct_node_network_io_daily ON CLUSTER '{cluster}' +AS `${NETWORK_NAME}`.fct_node_network_io_daily_local +ENGINE = Distributed( + '{cluster}', + '${NETWORK_NAME}', + fct_node_network_io_daily_local, + cityHash64(day_start_date, meta_client_name) +); diff --git a/models/external/observoor_host_specs.sql b/models/external/observoor_host_specs.sql new file mode 100644 index 00000000..cdd806c7 --- /dev/null +++ b/models/external/observoor_host_specs.sql @@ -0,0 +1,32 @@ +--- +database: observoor +table: host_specs +cache: + incremental_scan_interval: 5s + full_scan_interval: 24h +interval: + type: slot +lag: 12 +--- +SELECT + {{ if .cache.is_incremental_scan }} + '{{ .cache.previous_min }}' as min, + {{ else }} + toUnixTimestamp(min(wallclock_slot_start_date_time)) as min, + {{ end }} + toUnixTimestamp(max(wallclock_slot_start_date_time)) as max +FROM {{ .self.helpers.from }} +WHERE + meta_network_name = '{{ .env.NETWORK }}' + + -- previous_max if incremental scan and is set, otherwise default/env + {{- $ts := default "0" .env.EXTERNAL_MODEL_MIN_TIMESTAMP -}} + {{- if .cache.is_incremental_scan -}} + {{- if .cache.previous_max -}} + {{- $ts = .cache.previous_max -}} + {{- end -}} + {{- end }} + AND wallclock_slot_start_date_time >= fromUnixTimestamp({{ $ts }}) + {{- if .cache.is_incremental_scan }} + AND wallclock_slot_start_date_time <= fromUnixTimestamp({{ $ts }}) + {{ default "100000" .env.EXTERNAL_MODEL_SCAN_SIZE_TIMESTAMP }} + {{- end }} diff --git a/models/transformations/fct_node_cpu_utilization_daily.sql b/models/transformations/fct_node_cpu_utilization_daily.sql new file mode 100644 index 00000000..6460e9d8 --- /dev/null +++ b/models/transformations/fct_node_cpu_utilization_daily.sql @@ -0,0 +1,62 @@ +--- +table: fct_node_cpu_utilization_daily +type: incremental +interval: + type: slot + max: 604800 +fill: + direction: "tail" + allow_gap_skipping: false +schedules: + forwardfill: "@every 1h" + backfill: "@every 30s" +tags: + - daily + - observoor + - cpu +dependencies: + - "{{transformation}}.fct_node_cpu_utilization_hourly" +--- +-- Daily aggregation of node CPU utilization from hourly data. +-- Re-aggregates hourly stats into daily using weighted averages (by slot_count). +INSERT INTO `{{ .self.database }}`.`{{ .self.table }}` +WITH + day_bounds AS ( + SELECT + toDate(min(hour_start_date_time)) AS min_day, + toDate(max(hour_start_date_time)) AS max_day + FROM {{ index .dep "{{transformation}}" "fct_node_cpu_utilization_hourly" "helpers" "from" }} FINAL + WHERE hour_start_date_time BETWEEN fromUnixTimestamp({{ .bounds.start }}) AND fromUnixTimestamp({{ .bounds.end }}) + ), + hours_in_days AS ( + SELECT + hour_start_date_time, + meta_client_name, + meta_network_name, + node_class, + system_cores, + slot_count, + avg_core_pct, + min_core_pct, + max_core_pct, + p50_core_pct, + p95_core_pct + FROM {{ index .dep "{{transformation}}" "fct_node_cpu_utilization_hourly" "helpers" "from" }} FINAL + WHERE toDate(hour_start_date_time) >= (SELECT min_day FROM day_bounds) + AND toDate(hour_start_date_time) <= (SELECT max_day FROM day_bounds) + ) +SELECT + fromUnixTimestamp({{ .task.start }}) AS updated_date_time, + toDate(hour_start_date_time) AS day_start_date, + meta_client_name, + any(meta_network_name) AS meta_network_name, + any(node_class) AS node_class, + max(system_cores) AS system_cores, + sum(slot_count) AS hour_count, + round(sum(avg_core_pct * slot_count) / sum(slot_count), 4) AS avg_core_pct, + round(min(min_core_pct), 4) AS min_core_pct, + round(max(max_core_pct), 4) AS max_core_pct, + round(sum(p50_core_pct * slot_count) / sum(slot_count), 4) AS p50_core_pct, + round(max(p95_core_pct), 4) AS p95_core_pct +FROM hours_in_days +GROUP BY toDate(hour_start_date_time), meta_client_name diff --git a/models/transformations/fct_node_cpu_utilization_hourly.sql b/models/transformations/fct_node_cpu_utilization_hourly.sql new file mode 100644 index 00000000..509ac34a --- /dev/null +++ b/models/transformations/fct_node_cpu_utilization_hourly.sql @@ -0,0 +1,59 @@ +--- +table: fct_node_cpu_utilization_hourly +type: incremental +interval: + type: slot + max: 25200 +schedules: + forwardfill: "@every 5m" + backfill: "@every 30s" +tags: + - hourly + - observoor + - cpu +dependencies: + - "{{transformation}}.fct_node_cpu_utilization_by_process" +--- +-- Hourly aggregation of node CPU utilization. +-- Sums across processes per (slot, node) to get total node CPU, then aggregates into hourly stats. +-- +-- This query expands the slot range to complete hour boundaries to handle partial +-- hour aggregations at the head of incremental processing. +INSERT INTO `{{ .self.database }}`.`{{ .self.table }}` +WITH + hour_bounds AS ( + SELECT + toStartOfHour(min(wallclock_slot_start_date_time)) AS min_hour, + toStartOfHour(max(wallclock_slot_start_date_time)) AS max_hour + FROM {{ index .dep "{{transformation}}" "fct_node_cpu_utilization_by_process" "helpers" "from" }} FINAL + WHERE wallclock_slot_start_date_time BETWEEN fromUnixTimestamp({{ .bounds.start }}) AND fromUnixTimestamp({{ .bounds.end }}) + ), + -- Sum across processes per (slot, node) to get total node CPU + slot_node_totals AS ( + SELECT + wallclock_slot_start_date_time, + meta_client_name, + any(meta_network_name) AS meta_network_name, + any(node_class) AS node_class, + max(system_cores) AS system_cores, + sum(mean_core_pct) AS total_mean_core_pct + FROM {{ index .dep "{{transformation}}" "fct_node_cpu_utilization_by_process" "helpers" "from" }} FINAL + WHERE wallclock_slot_start_date_time >= (SELECT min_hour FROM hour_bounds) + AND wallclock_slot_start_date_time < (SELECT max_hour FROM hour_bounds) + INTERVAL 1 HOUR + GROUP BY wallclock_slot_start_date_time, meta_client_name + ) +SELECT + fromUnixTimestamp({{ .task.start }}) AS updated_date_time, + toStartOfHour(wallclock_slot_start_date_time) AS hour_start_date_time, + meta_client_name, + any(meta_network_name) AS meta_network_name, + any(node_class) AS node_class, + max(system_cores) AS system_cores, + count() AS slot_count, + round(avg(total_mean_core_pct), 4) AS avg_core_pct, + round(min(total_mean_core_pct), 4) AS min_core_pct, + round(max(total_mean_core_pct), 4) AS max_core_pct, + round(quantile(0.50)(total_mean_core_pct), 4) AS p50_core_pct, + round(quantile(0.95)(total_mean_core_pct), 4) AS p95_core_pct +FROM slot_node_totals +GROUP BY toStartOfHour(wallclock_slot_start_date_time), meta_client_name diff --git a/models/transformations/fct_node_disk_io_daily.sql b/models/transformations/fct_node_disk_io_daily.sql new file mode 100644 index 00000000..701a33bd --- /dev/null +++ b/models/transformations/fct_node_disk_io_daily.sql @@ -0,0 +1,59 @@ +--- +table: fct_node_disk_io_daily +type: incremental +interval: + type: slot + max: 604800 +fill: + direction: "tail" + allow_gap_skipping: false +schedules: + forwardfill: "@every 1h" + backfill: "@every 30s" +tags: + - daily + - observoor + - disk +dependencies: + - "{{transformation}}.fct_node_disk_io_hourly" +--- +-- Daily aggregation of node disk I/O from hourly data. +INSERT INTO `{{ .self.database }}`.`{{ .self.table }}` +WITH + day_bounds AS ( + SELECT + toDate(min(hour_start_date_time)) AS min_day, + toDate(max(hour_start_date_time)) AS max_day + FROM {{ index .dep "{{transformation}}" "fct_node_disk_io_hourly" "helpers" "from" }} FINAL + WHERE hour_start_date_time BETWEEN fromUnixTimestamp({{ .bounds.start }}) AND fromUnixTimestamp({{ .bounds.end }}) + ), + hours_in_days AS ( + SELECT + hour_start_date_time, + meta_client_name, + meta_network_name, + node_class, + rw, + slot_count, + sum_io_bytes, + avg_io_bytes, + sum_io_ops, + avg_io_ops + FROM {{ index .dep "{{transformation}}" "fct_node_disk_io_hourly" "helpers" "from" }} FINAL + WHERE toDate(hour_start_date_time) >= (SELECT min_day FROM day_bounds) + AND toDate(hour_start_date_time) <= (SELECT max_day FROM day_bounds) + ) +SELECT + fromUnixTimestamp({{ .task.start }}) AS updated_date_time, + toDate(hour_start_date_time) AS day_start_date, + meta_client_name, + any(meta_network_name) AS meta_network_name, + any(node_class) AS node_class, + rw, + sum(slot_count) AS hour_count, + sum(sum_io_bytes) AS sum_io_bytes, + round(sum(avg_io_bytes * slot_count) / sum(slot_count), 4) AS avg_io_bytes, + sum(sum_io_ops) AS sum_io_ops, + toUInt32(round(sum(avg_io_ops * slot_count) / sum(slot_count))) AS avg_io_ops +FROM hours_in_days +GROUP BY toDate(hour_start_date_time), meta_client_name, rw diff --git a/models/transformations/fct_node_disk_io_hourly.sql b/models/transformations/fct_node_disk_io_hourly.sql new file mode 100644 index 00000000..f6e23856 --- /dev/null +++ b/models/transformations/fct_node_disk_io_hourly.sql @@ -0,0 +1,55 @@ +--- +table: fct_node_disk_io_hourly +type: incremental +interval: + type: slot + max: 25200 +schedules: + forwardfill: "@every 5m" + backfill: "@every 30s" +tags: + - hourly + - observoor + - disk +dependencies: + - "{{transformation}}.fct_node_disk_io_by_process" +--- +-- Hourly aggregation of node disk I/O. +-- Sums across processes per (slot, node, rw) to get total node disk I/O, then aggregates into hourly stats. +INSERT INTO `{{ .self.database }}`.`{{ .self.table }}` +WITH + hour_bounds AS ( + SELECT + toStartOfHour(min(wallclock_slot_start_date_time)) AS min_hour, + toStartOfHour(max(wallclock_slot_start_date_time)) AS max_hour + FROM {{ index .dep "{{transformation}}" "fct_node_disk_io_by_process" "helpers" "from" }} FINAL + WHERE wallclock_slot_start_date_time BETWEEN fromUnixTimestamp({{ .bounds.start }}) AND fromUnixTimestamp({{ .bounds.end }}) + ), + slot_node_totals AS ( + SELECT + wallclock_slot_start_date_time, + meta_client_name, + any(meta_network_name) AS meta_network_name, + any(node_class) AS node_class, + rw, + sum(io_bytes) AS total_io_bytes, + sum(io_ops) AS total_io_ops + FROM {{ index .dep "{{transformation}}" "fct_node_disk_io_by_process" "helpers" "from" }} FINAL + WHERE wallclock_slot_start_date_time >= (SELECT min_hour FROM hour_bounds) + AND wallclock_slot_start_date_time < (SELECT max_hour FROM hour_bounds) + INTERVAL 1 HOUR + GROUP BY wallclock_slot_start_date_time, meta_client_name, rw + ) +SELECT + fromUnixTimestamp({{ .task.start }}) AS updated_date_time, + toStartOfHour(wallclock_slot_start_date_time) AS hour_start_date_time, + meta_client_name, + any(meta_network_name) AS meta_network_name, + any(node_class) AS node_class, + rw, + count() AS slot_count, + sum(total_io_bytes) AS sum_io_bytes, + round(avg(total_io_bytes), 4) AS avg_io_bytes, + sum(total_io_ops) AS sum_io_ops, + toUInt32(round(avg(total_io_ops))) AS avg_io_ops +FROM slot_node_totals +GROUP BY toStartOfHour(wallclock_slot_start_date_time), meta_client_name, rw diff --git a/models/transformations/fct_node_host_spec.sql b/models/transformations/fct_node_host_spec.sql new file mode 100644 index 00000000..0eeedb4b --- /dev/null +++ b/models/transformations/fct_node_host_spec.sql @@ -0,0 +1,58 @@ +--- +table: fct_node_host_spec +type: incremental +interval: + type: slot + max: 50000 +schedules: + forwardfill: "@every 5s" + backfill: "@every 30s" +tags: + - slot + - observoor + - host +dependencies: + - "observoor.host_specs" +--- +INSERT INTO + `{{ .self.database }}`.`{{ .self.table }}` +SELECT + fromUnixTimestamp({{ .task.start }}) as updated_date_time, + wallclock_slot_start_date_time, + meta_client_name, + meta_network_name, + CASE + WHEN positionCaseInsensitive(meta_client_name, '7870') > 0 THEN 'eip7870' + ELSE '' + END AS node_class, + host_id, + kernel_release, + os_name, + architecture, + cpu_model, + cpu_vendor, + cpu_online_cores, + cpu_logical_cores, + cpu_physical_cores, + cpu_performance_cores, + cpu_efficiency_cores, + cpu_unknown_type_cores, + cpu_core_types, + cpu_core_type_labels, + cpu_max_freq_khz, + cpu_base_freq_khz, + memory_total_bytes, + memory_type, + memory_speed_mts, + memory_dimm_count, + memory_dimm_sizes_bytes, + memory_dimm_types, + memory_dimm_speeds_mts, + disk_count, + disk_total_bytes, + disk_names, + disk_models, + disk_sizes_bytes, + disk_rotational +FROM {{ index .dep "observoor" "host_specs" "helpers" "from" }} FINAL +WHERE wallclock_slot_start_date_time BETWEEN fromUnixTimestamp({{ .bounds.start }}) AND fromUnixTimestamp({{ .bounds.end }}) diff --git a/models/transformations/fct_node_memory_usage_daily.sql b/models/transformations/fct_node_memory_usage_daily.sql new file mode 100644 index 00000000..549e2fd6 --- /dev/null +++ b/models/transformations/fct_node_memory_usage_daily.sql @@ -0,0 +1,61 @@ +--- +table: fct_node_memory_usage_daily +type: incremental +interval: + type: slot + max: 604800 +fill: + direction: "tail" + allow_gap_skipping: false +schedules: + forwardfill: "@every 1h" + backfill: "@every 30s" +tags: + - daily + - observoor + - memory +dependencies: + - "{{transformation}}.fct_node_memory_usage_hourly" +--- +-- Daily aggregation of node memory usage from hourly data. +INSERT INTO `{{ .self.database }}`.`{{ .self.table }}` +WITH + day_bounds AS ( + SELECT + toDate(min(hour_start_date_time)) AS min_day, + toDate(max(hour_start_date_time)) AS max_day + FROM {{ index .dep "{{transformation}}" "fct_node_memory_usage_hourly" "helpers" "from" }} FINAL + WHERE hour_start_date_time BETWEEN fromUnixTimestamp({{ .bounds.start }}) AND fromUnixTimestamp({{ .bounds.end }}) + ), + hours_in_days AS ( + SELECT + hour_start_date_time, + meta_client_name, + meta_network_name, + node_class, + slot_count, + avg_vm_rss_bytes, + min_vm_rss_bytes, + max_vm_rss_bytes, + avg_rss_anon_bytes, + avg_rss_file_bytes, + avg_vm_swap_bytes + FROM {{ index .dep "{{transformation}}" "fct_node_memory_usage_hourly" "helpers" "from" }} FINAL + WHERE toDate(hour_start_date_time) >= (SELECT min_day FROM day_bounds) + AND toDate(hour_start_date_time) <= (SELECT max_day FROM day_bounds) + ) +SELECT + fromUnixTimestamp({{ .task.start }}) AS updated_date_time, + toDate(hour_start_date_time) AS day_start_date, + meta_client_name, + any(meta_network_name) AS meta_network_name, + any(node_class) AS node_class, + sum(slot_count) AS hour_count, + toUInt64(round(sum(avg_vm_rss_bytes * slot_count) / sum(slot_count))) AS avg_vm_rss_bytes, + min(min_vm_rss_bytes) AS min_vm_rss_bytes, + max(max_vm_rss_bytes) AS max_vm_rss_bytes, + toUInt64(round(sum(avg_rss_anon_bytes * slot_count) / sum(slot_count))) AS avg_rss_anon_bytes, + toUInt64(round(sum(avg_rss_file_bytes * slot_count) / sum(slot_count))) AS avg_rss_file_bytes, + toUInt64(round(sum(avg_vm_swap_bytes * slot_count) / sum(slot_count))) AS avg_vm_swap_bytes +FROM hours_in_days +GROUP BY toDate(hour_start_date_time), meta_client_name diff --git a/models/transformations/fct_node_memory_usage_hourly.sql b/models/transformations/fct_node_memory_usage_hourly.sql new file mode 100644 index 00000000..45a408f3 --- /dev/null +++ b/models/transformations/fct_node_memory_usage_hourly.sql @@ -0,0 +1,57 @@ +--- +table: fct_node_memory_usage_hourly +type: incremental +interval: + type: slot + max: 25200 +schedules: + forwardfill: "@every 5m" + backfill: "@every 30s" +tags: + - hourly + - observoor + - memory +dependencies: + - "{{transformation}}.fct_node_memory_usage_by_process" +--- +-- Hourly aggregation of node memory usage. +-- Sums across processes per (slot, node) to get total node memory, then aggregates into hourly stats. +INSERT INTO `{{ .self.database }}`.`{{ .self.table }}` +WITH + hour_bounds AS ( + SELECT + toStartOfHour(min(wallclock_slot_start_date_time)) AS min_hour, + toStartOfHour(max(wallclock_slot_start_date_time)) AS max_hour + FROM {{ index .dep "{{transformation}}" "fct_node_memory_usage_by_process" "helpers" "from" }} FINAL + WHERE wallclock_slot_start_date_time BETWEEN fromUnixTimestamp({{ .bounds.start }}) AND fromUnixTimestamp({{ .bounds.end }}) + ), + slot_node_totals AS ( + SELECT + wallclock_slot_start_date_time, + meta_client_name, + any(meta_network_name) AS meta_network_name, + any(node_class) AS node_class, + sum(vm_rss_bytes) AS total_vm_rss_bytes, + sum(rss_anon_bytes) AS total_rss_anon_bytes, + sum(rss_file_bytes) AS total_rss_file_bytes, + sum(vm_swap_bytes) AS total_vm_swap_bytes + FROM {{ index .dep "{{transformation}}" "fct_node_memory_usage_by_process" "helpers" "from" }} FINAL + WHERE wallclock_slot_start_date_time >= (SELECT min_hour FROM hour_bounds) + AND wallclock_slot_start_date_time < (SELECT max_hour FROM hour_bounds) + INTERVAL 1 HOUR + GROUP BY wallclock_slot_start_date_time, meta_client_name + ) +SELECT + fromUnixTimestamp({{ .task.start }}) AS updated_date_time, + toStartOfHour(wallclock_slot_start_date_time) AS hour_start_date_time, + meta_client_name, + any(meta_network_name) AS meta_network_name, + any(node_class) AS node_class, + count() AS slot_count, + toUInt64(round(avg(total_vm_rss_bytes))) AS avg_vm_rss_bytes, + min(total_vm_rss_bytes) AS min_vm_rss_bytes, + max(total_vm_rss_bytes) AS max_vm_rss_bytes, + toUInt64(round(avg(total_rss_anon_bytes))) AS avg_rss_anon_bytes, + toUInt64(round(avg(total_rss_file_bytes))) AS avg_rss_file_bytes, + toUInt64(round(avg(total_vm_swap_bytes))) AS avg_vm_swap_bytes +FROM slot_node_totals +GROUP BY toStartOfHour(wallclock_slot_start_date_time), meta_client_name diff --git a/models/transformations/fct_node_network_io_daily.sql b/models/transformations/fct_node_network_io_daily.sql new file mode 100644 index 00000000..ceef3449 --- /dev/null +++ b/models/transformations/fct_node_network_io_daily.sql @@ -0,0 +1,61 @@ +--- +table: fct_node_network_io_daily +type: incremental +interval: + type: slot + max: 604800 +fill: + direction: "tail" + allow_gap_skipping: false +schedules: + forwardfill: "@every 1h" + backfill: "@every 30s" +tags: + - daily + - observoor + - network +dependencies: + - "{{transformation}}.fct_node_network_io_hourly" +--- +-- Daily aggregation of node network I/O from hourly data. +INSERT INTO `{{ .self.database }}`.`{{ .self.table }}` +WITH + day_bounds AS ( + SELECT + toDate(min(hour_start_date_time)) AS min_day, + toDate(max(hour_start_date_time)) AS max_day + FROM {{ index .dep "{{transformation}}" "fct_node_network_io_hourly" "helpers" "from" }} FINAL + WHERE hour_start_date_time BETWEEN fromUnixTimestamp({{ .bounds.start }}) AND fromUnixTimestamp({{ .bounds.end }}) + ), + hours_in_days AS ( + SELECT + hour_start_date_time, + meta_client_name, + meta_network_name, + node_class, + port_label, + direction, + slot_count, + sum_io_bytes, + avg_io_bytes, + sum_io_count, + avg_io_count + FROM {{ index .dep "{{transformation}}" "fct_node_network_io_hourly" "helpers" "from" }} FINAL + WHERE toDate(hour_start_date_time) >= (SELECT min_day FROM day_bounds) + AND toDate(hour_start_date_time) <= (SELECT max_day FROM day_bounds) + ) +SELECT + fromUnixTimestamp({{ .task.start }}) AS updated_date_time, + toDate(hour_start_date_time) AS day_start_date, + meta_client_name, + any(meta_network_name) AS meta_network_name, + any(node_class) AS node_class, + port_label, + direction, + sum(slot_count) AS hour_count, + sum(sum_io_bytes) AS sum_io_bytes, + round(sum(avg_io_bytes * slot_count) / sum(slot_count), 4) AS avg_io_bytes, + sum(sum_io_count) AS sum_io_count, + toUInt32(round(sum(avg_io_count * slot_count) / sum(slot_count))) AS avg_io_count +FROM hours_in_days +GROUP BY toDate(hour_start_date_time), meta_client_name, port_label, direction diff --git a/models/transformations/fct_node_network_io_hourly.sql b/models/transformations/fct_node_network_io_hourly.sql new file mode 100644 index 00000000..42309393 --- /dev/null +++ b/models/transformations/fct_node_network_io_hourly.sql @@ -0,0 +1,58 @@ +--- +table: fct_node_network_io_hourly +type: incremental +interval: + type: slot + max: 25200 +schedules: + forwardfill: "@every 5m" + backfill: "@every 30s" +tags: + - hourly + - observoor + - network +dependencies: + - "{{transformation}}.fct_node_network_io_by_process" +--- +-- Hourly aggregation of node network I/O. +-- Sums across processes per (slot, node, port_label, direction) to get total node network I/O, +-- then aggregates into hourly stats. +INSERT INTO `{{ .self.database }}`.`{{ .self.table }}` +WITH + hour_bounds AS ( + SELECT + toStartOfHour(min(wallclock_slot_start_date_time)) AS min_hour, + toStartOfHour(max(wallclock_slot_start_date_time)) AS max_hour + FROM {{ index .dep "{{transformation}}" "fct_node_network_io_by_process" "helpers" "from" }} FINAL + WHERE wallclock_slot_start_date_time BETWEEN fromUnixTimestamp({{ .bounds.start }}) AND fromUnixTimestamp({{ .bounds.end }}) + ), + slot_node_totals AS ( + SELECT + wallclock_slot_start_date_time, + meta_client_name, + any(meta_network_name) AS meta_network_name, + any(node_class) AS node_class, + port_label, + direction, + sum(io_bytes) AS total_io_bytes, + sum(io_count) AS total_io_count + FROM {{ index .dep "{{transformation}}" "fct_node_network_io_by_process" "helpers" "from" }} FINAL + WHERE wallclock_slot_start_date_time >= (SELECT min_hour FROM hour_bounds) + AND wallclock_slot_start_date_time < (SELECT max_hour FROM hour_bounds) + INTERVAL 1 HOUR + GROUP BY wallclock_slot_start_date_time, meta_client_name, port_label, direction + ) +SELECT + fromUnixTimestamp({{ .task.start }}) AS updated_date_time, + toStartOfHour(wallclock_slot_start_date_time) AS hour_start_date_time, + meta_client_name, + any(meta_network_name) AS meta_network_name, + any(node_class) AS node_class, + port_label, + direction, + count() AS slot_count, + sum(total_io_bytes) AS sum_io_bytes, + round(avg(total_io_bytes), 4) AS avg_io_bytes, + sum(total_io_count) AS sum_io_count, + toUInt32(round(avg(total_io_count))) AS avg_io_count +FROM slot_node_totals +GROUP BY toStartOfHour(wallclock_slot_start_date_time), meta_client_name, port_label, direction