Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ transforms:
execution_engine_new_payload: .event.name == "EXECUTION_ENGINE_NEW_PAYLOAD"
execution_engine_get_blobs: .event.name == "EXECUTION_ENGINE_GET_BLOBS"
execution_block_metrics: .event.name == "EXECUTION_BLOCK_METRICS"
execution_state_size_delta: .event.name == "EXECUTION_STATE_SIZE_DELTA"
execution_mpt_depth: .event.name == "EXECUTION_MPT_DEPTH"
libp2p_trace_connected: .event.name == "LIBP2P_TRACE_CONNECTED"
libp2p_trace_disconnected: .event.name == "LIBP2P_TRACE_DISCONNECTED"
libp2p_trace_add_peer: .event.name == "LIBP2P_TRACE_ADD_PEER"
Expand Down Expand Up @@ -541,6 +543,38 @@ sinks:
enabled: true
encoding:
codec: json
execution_state_size_delta_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.execution_state_size_delta
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: execution-state-size-delta
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
execution_mpt_depth_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.execution_mpt_depth
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: execution-mpt-depth
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
beacon_api_eth_v2_beacon_block_kafka:
type: kafka
buffer:
Expand Down
146 changes: 146 additions & 0 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,26 @@ sources:
codec: json
topics:
- "execution-block-metrics"
execution_state_size_delta_kafka:
type: kafka
bootstrap_servers: "${KAFKA_BROKERS}"
group_id: xatu-vector-kafka-clickhouse-execution-state-size-delta
key_field: "event.id"
auto_offset_reset: earliest
decoding:
codec: json
topics:
- "execution-state-size-delta"
execution_mpt_depth_kafka:
type: kafka
bootstrap_servers: "${KAFKA_BROKERS}"
group_id: xatu-vector-kafka-clickhouse-execution-mpt-depth
key_field: "event.id"
auto_offset_reset: earliest
decoding:
codec: json
topics:
- "execution-mpt-depth"
beacon_api_eth_v2_beacon_block_events_kafka:
type: kafka
bootstrap_servers: "${KAFKA_BROKERS}"
Expand Down Expand Up @@ -211,6 +231,8 @@ transforms:
- execution_engine_new_payload_kafka
- execution_engine_get_blobs_kafka
- execution_block_metrics_kafka
- execution_state_size_delta_kafka
- execution_mpt_depth_kafka
- beacon_api_eth_v2_beacon_block_events_kafka
- beacon_api_eth_v1_beacon_blob_sidecar_kafka
- beacon_api_eth_v1_proposer_kafka
Expand Down Expand Up @@ -457,6 +479,8 @@ transforms:
execution_engine_new_payload: .event.name == "EXECUTION_ENGINE_NEW_PAYLOAD"
execution_engine_get_blobs: .event.name == "EXECUTION_ENGINE_GET_BLOBS"
execution_block_metrics: .event.name == "EXECUTION_BLOCK_METRICS"
execution_state_size_delta: .event.name == "EXECUTION_STATE_SIZE_DELTA"
execution_mpt_depth: .event.name == "EXECUTION_MPT_DEPTH"
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED"
eth_v3_validator_block: .event.name == "BEACON_API_ETH_V3_VALIDATOR_BLOCK"
Expand Down Expand Up @@ -519,6 +543,8 @@ transforms:
- xatu_server_events_router.execution_engine_new_payload
- xatu_server_events_router.execution_engine_get_blobs
- xatu_server_events_router.execution_block_metrics
- xatu_server_events_router.execution_state_size_delta
- xatu_server_events_router.execution_mpt_depth
metrics:
- type: counter
field: event.name
Expand Down Expand Up @@ -2980,6 +3006,86 @@ transforms:
del(.event)
del(.meta)
del(.data)
execution_state_size_delta_formatted:
type: remap
inputs:
- xatu_server_events_router.execution_state_size_delta
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
.block_number = to_int(.data.block_number) ?? 0
.state_root = .data.state_root
.parent_state_root = .data.parent_state_root
.account_delta = to_int(.data.account_delta) ?? 0
.account_bytes_delta = to_int(.data.account_bytes_delta) ?? 0
.account_trienode_delta = to_int(.data.account_trienode_delta) ?? 0
.account_trienode_bytes_delta = to_int(.data.account_trienode_bytes_delta) ?? 0
.contract_code_delta = to_int(.data.contract_code_delta) ?? 0
.contract_code_bytes_delta = to_int(.data.contract_code_bytes_delta) ?? 0
.storage_delta = to_int(.data.storage_delta) ?? 0
.storage_bytes_delta = to_int(.data.storage_bytes_delta) ?? 0
.storage_trienode_delta = to_int(.data.storage_trienode_delta) ?? 0
.storage_trienode_bytes_delta = to_int(.data.storage_trienode_bytes_delta) ?? 0
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
del(.data)
del(.event_date_time)
del(.source_type)
del(.timestamp)
del(.path)
del(.offset)
del(.partition)
del(.topic)
execution_mpt_depth_formatted:
type: remap
inputs:
- xatu_server_events_router.execution_mpt_depth
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
.block_number = to_int(.data.block_number) ?? 0
.state_root = .data.state_root
.parent_state_root = .data.parent_state_root
.total_account_written_nodes = to_int(.data.total_account_written_nodes) ?? 0
.total_account_written_bytes = to_int(.data.total_account_written_bytes) ?? 0
.total_account_deleted_nodes = to_int(.data.total_account_deleted_nodes) ?? 0
.total_account_deleted_bytes = to_int(.data.total_account_deleted_bytes) ?? 0
.total_storage_written_nodes = to_int(.data.total_storage_written_nodes) ?? 0
.total_storage_written_bytes = to_int(.data.total_storage_written_bytes) ?? 0
.total_storage_deleted_nodes = to_int(.data.total_storage_deleted_nodes) ?? 0
.total_storage_deleted_bytes = to_int(.data.total_storage_deleted_bytes) ?? 0
.account_written_nodes = .data.account_written_nodes
.account_written_bytes = .data.account_written_bytes
.account_deleted_nodes = .data.account_deleted_nodes
.account_deleted_bytes = .data.account_deleted_bytes
.storage_written_nodes = .data.storage_written_nodes
.storage_written_bytes = .data.storage_written_bytes
.storage_deleted_nodes = .data.storage_deleted_nodes
.storage_deleted_bytes = .data.storage_deleted_bytes
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
del(.data)
del(.event_date_time)
del(.source_type)
del(.timestamp)
del(.path)
del(.offset)
del(.partition)
del(.topic)
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -3888,3 +3994,43 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: false
execution_state_size_delta_clickhouse:
type: clickhouse
inputs:
- execution_state_size_delta_formatted
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: execution_state_size_delta
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: true
execution_mpt_depth_clickhouse:
type: clickhouse
inputs:
- execution_mpt_depth_formatted
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: execution_mpt_depth
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: true
2 changes: 2 additions & 0 deletions deploy/local/docker-compose/xatu-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ services:
filter:
eventNames:
- EXECUTION_BLOCK_METRICS
- EXECUTION_STATE_SIZE_DELTA
- EXECUTION_MPT_DEPTH
config:
address: http://xatu-vector-http-kafka:9005
maxQueueSize: 50000
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS execution_state_size_delta ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS execution_state_size_delta_local ON CLUSTER '{cluster}';
60 changes: 60 additions & 0 deletions deploy/migrations/clickhouse/106_execution_state_size_delta.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
CREATE TABLE execution_state_size_delta_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`block_number` UInt64 COMMENT 'The block number' CODEC(DoubleDelta, ZSTD(1)),
`state_root` FixedString(66) COMMENT 'State root hash of the execution layer at this block' Codec(ZSTD(1)),
`parent_state_root` FixedString(66) COMMENT 'State root hash of the execution layer at the parent block' Codec(ZSTD(1)),
`account_delta` Int64 COMMENT 'The delta in the number of accounts in the state' Codec(DoubleDelta, ZSTD(1)),
`account_bytes_delta` Int64 COMMENT 'The delta in the number of bytes used by account data' Codec(DoubleDelta, ZSTD(1)),
`account_trienode_delta` Int64 COMMENT 'The delta in the number of trie nodes in the account trie' Codec(DoubleDelta, ZSTD(1)),
`account_trienode_bytes_delta` Int64 COMMENT 'The delta in the number of bytes used by account trie nodes' Codec(DoubleDelta, ZSTD(1)),
`contract_code_delta` Int64 COMMENT 'The delta in the number of contract codes stored' Codec(DoubleDelta, ZSTD(1)),
`contract_code_bytes_delta` Int64 COMMENT 'The delta in the number of bytes used by contract code' Codec(DoubleDelta, ZSTD(1)),
`storage_delta` Int64 COMMENT 'The delta in the number of storage slots in the state' Codec(DoubleDelta, ZSTD(1)),
`storage_bytes_delta` Int64 COMMENT 'The delta in the number of bytes used by storage data' Codec(DoubleDelta, ZSTD(1)),
`storage_trienode_delta` Int64 COMMENT 'The delta in the number of trie nodes in the storage trie' Codec(DoubleDelta, ZSTD(1)),
`storage_trienode_bytes_delta` Int64 COMMENT 'The delta in the number of bytes used by storage trie nodes' Codec(DoubleDelta, ZSTD(1)),
-- Standard metadata fields
`meta_client_name` LowCardinality(String) COMMENT 'Name of the client that generated the event',
`meta_client_id` String COMMENT 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.' Codec(ZSTD(1)),
`meta_client_version` LowCardinality(String) COMMENT 'Version of the client that generated the event',
`meta_client_implementation` LowCardinality(String) COMMENT 'Implementation of the client that generated the event',
`meta_client_os` LowCardinality(String) COMMENT 'Operating system of the client that generated the event',
`meta_client_ip` Nullable(IPv6) COMMENT 'IP address of the client that generated the event' Codec(ZSTD(1)),
`meta_client_geo_city` LowCardinality(String) COMMENT 'City of the client that generated the event' Codec(ZSTD(1)),
`meta_client_geo_country` LowCardinality(String) COMMENT 'Country of the client that generated the event' Codec(ZSTD(1)),
`meta_client_geo_country_code` LowCardinality(String) COMMENT 'Country code of the client that generated the event' Codec(ZSTD(1)),
`meta_client_geo_continent_code` LowCardinality(String) COMMENT 'Continent code of the client that generated the event' Codec(ZSTD(1)),
`meta_client_geo_longitude` Nullable(Float64) COMMENT 'Longitude of the client that generated the event' Codec(ZSTD(1)),
`meta_client_geo_latitude` Nullable(Float64) COMMENT 'Latitude of the client that generated the event' Codec(ZSTD(1)),
`meta_client_geo_autonomous_system_number` Nullable(UInt32) COMMENT 'Autonomous system number of the client that generated the event' Codec(ZSTD(1)),
`meta_client_geo_autonomous_system_organization` Nullable(String) COMMENT 'Autonomous system organization of the client that generated the event' Codec(ZSTD(1)),
`meta_network_id` Int32 COMMENT 'Ethereum network ID' Codec(DoubleDelta, ZSTD(1)),
`meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name',
`meta_execution_version` LowCardinality(String) COMMENT 'Execution client version that generated the event',
`meta_execution_version_major` LowCardinality(String) COMMENT 'Execution client major version that generated the event',
`meta_execution_version_minor` LowCardinality(String) COMMENT 'Execution client minor version that generated the event',
`meta_execution_version_patch` LowCardinality(String) COMMENT 'Execution client patch version that generated the event',
`meta_execution_implementation` LowCardinality(String) COMMENT 'Execution client implementation that generated the event',
`meta_labels` Map(String, String) COMMENT 'Labels associated with the event' Codec(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/{database}/tables/{table}/{shard}',
'{replica}',
updated_date_time
) PARTITION BY intDiv(block_number, 5000000)
ORDER BY (
block_number,
meta_network_name,
meta_client_name,
state_root
) COMMENT 'Contains execution layer state size delta metrics including account, contract code, and storage data measurements at specific block heights.';
CREATE TABLE execution_state_size_delta ON CLUSTER '{cluster}' AS default.execution_state_size_delta_local ENGINE = Distributed(
'{cluster}',
default,
execution_state_size_delta_local,
cityHash64(
block_number,
meta_network_name,
meta_client_name,
state_root
)
);
2 changes: 2 additions & 0 deletions deploy/migrations/clickhouse/107_execution_mpt_depth.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS execution_mpt_depth ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS execution_mpt_depth_local ON CLUSTER '{cluster}';
Loading