Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 219 additions & 103 deletions integration-tests/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tracing = { version = "0.1.41", default-features = false }
tracing-subscriber = { version = "0.3.19", default-features = false }
hex = "0.4.3"
clap = { version = "^4.5.4", features = ["derive"] }
serde_json = "1"

# Direct dependencies kept only for the embedded `mining_device` module.
# Remove this block when removing:
Expand Down
67 changes: 64 additions & 3 deletions integration-tests/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,23 @@ pub async fn start_pool(
supported_extensions: Vec<u16>,
required_extensions: Vec<u16>,
enable_monitoring: bool,
) -> (PoolSv2, SocketAddr, Option<SocketAddr>) {
start_pool_with_network_override(
template_provider_config,
supported_extensions,
required_extensions,
enable_monitoring,
None,
)
.await
}

pub async fn start_pool_with_network_override(
template_provider_config: TemplateProviderType,
supported_extensions: Vec<u16>,
required_extensions: Vec<u16>,
enable_monitoring: bool,
network: Option<String>,
) -> (PoolSv2, SocketAddr, Option<SocketAddr>) {
use pool_sv2::config::PoolConfig;
let listening_address = get_available_address();
Expand Down Expand Up @@ -163,7 +180,8 @@ pub async fn start_pool(
monitoring_address,
monitoring_cache_refresh_secs,
None, // no JDS
);
)
.with_network(network);
let pool = PoolSv2::new(config);
let pool_clone = pool.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -198,6 +216,26 @@ pub fn start_jdc(
required_extensions: Vec<u16>,
enable_monitoring: bool,
jdc_mode: Option<ConfigJDCMode>,
) -> (JobDeclaratorClient, SocketAddr, Option<SocketAddr>) {
start_jdc_with_network_override(
pool,
template_provider_config,
supported_extensions,
required_extensions,
enable_monitoring,
jdc_mode,
None,
)
}

pub fn start_jdc_with_network_override(
pool: &[(SocketAddr, SocketAddr)], // (pool_address, jds_address)
template_provider_config: TemplateProviderType,
supported_extensions: Vec<u16>,
required_extensions: Vec<u16>,
enable_monitoring: bool,
jdc_mode: Option<ConfigJDCMode>,
network: Option<String>,
) -> (JobDeclaratorClient, SocketAddr, Option<SocketAddr>) {
use jd_client_sv2::config::{JobDeclaratorClientConfig, PoolConfig, ProtocolConfig, Upstream};
let jdc_address = get_available_address();
Expand Down Expand Up @@ -261,7 +299,8 @@ pub fn start_jdc(
required_extensions,
monitoring_address,
monitoring_cache_refresh_secs,
);
)
.with_network(network);
let ret = jd_client_sv2::JobDeclaratorClient::new(jd_client_proxy);
let ret_clone = ret.clone();
tokio::spawn(async move { ret_clone.start().await });
Expand Down Expand Up @@ -338,6 +377,27 @@ pub async fn start_sv2_translator(
required_extensions: Vec<u16>,
job_keepalive_interval_secs: Option<u16>,
enable_monitoring: bool,
) -> (TranslatorSv2, SocketAddr, Option<SocketAddr>) {
start_sv2_translator_with_upstream_monitoring(
upstreams,
aggregate_channels,
supported_extensions,
required_extensions,
job_keepalive_interval_secs,
enable_monitoring,
None,
)
.await
}

pub async fn start_sv2_translator_with_upstream_monitoring(
upstreams: &[SocketAddr],
aggregate_channels: bool,
supported_extensions: Vec<u16>,
required_extensions: Vec<u16>,
job_keepalive_interval_secs: Option<u16>,
enable_monitoring: bool,
upstream_monitoring_url: Option<String>,
) -> (TranslatorSv2, SocketAddr, Option<SocketAddr>) {
let job_keepalive_interval_secs = job_keepalive_interval_secs.unwrap_or(60);
let upstreams = upstreams
Expand Down Expand Up @@ -395,7 +455,8 @@ pub async fn start_sv2_translator(
required_extensions,
monitoring_address,
monitoring_cache_refresh_secs,
);
)
.with_upstream_monitoring_url(upstream_monitoring_url);
let translator_v2 = translator_sv2::TranslatorSv2::new(config);
let clone_translator_v2 = translator_v2.clone();
tokio::spawn(async move {
Expand Down
136 changes: 136 additions & 0 deletions integration-tests/tests/monitoring_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,139 @@ async fn block_found_detected_in_pool_metrics() {

shutdown_all!(pool, jdc, tproxy);
}

// ---------------------------------------------------------------------------
// 5. Pool exposes network via config override; translator inherits it on connect.
// ---------------------------------------------------------------------------
#[tokio::test]
async fn global_info_network_from_config_override() {
start_tracing();
let (_tp, tp_addr) = start_template_provider(None, DifficultyLevel::Low);
let (_pool, pool_addr, pool_monitoring) = start_pool_with_network_override(
sv2_tp_config(tp_addr),
vec![],
vec![],
true,
Some("regtest".to_string()),
)
.await;
let pool_mon = pool_monitoring.expect("pool monitoring should be enabled");

// Pool global endpoint must report network = "regtest"
let body = fetch_api(pool_mon, "/api/v1/global").await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(
json["network"], "regtest",
"pool global info should expose network"
);

// Start translator with upstream_monitoring_url pointing at pool's monitoring server.
// MonitoringServer fetches network from pool once at startup (no polling loop).
let upstream_monitoring_url = format!("http://{}", pool_mon);
let (_tproxy, _tproxy_addr, tproxy_monitoring) = start_sv2_translator_with_upstream_monitoring(
&[pool_addr],
false,
vec![],
vec![],
None,
true,
Some(upstream_monitoring_url),
)
.await;
let tproxy_mon = tproxy_monitoring.expect("tproxy monitoring should be enabled");

// Wait up to 30 seconds for network to propagate (generous for slow CI machines).
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
loop {
let body = fetch_api(tproxy_mon, "/api/v1/global").await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
if json["network"] == "regtest" {
break;
}
if std::time::Instant::now() >= deadline {
panic!(
"tproxy global info did not propagate network within timeout; got: {}",
json
);
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
}

// ---------------------------------------------------------------------------
// 6. JDC exposes network via config override in GlobalInfo.
// ---------------------------------------------------------------------------
#[tokio::test]
async fn global_info_network_jdc_from_config_override() {
start_tracing();
let (tp, tp_addr) = start_template_provider(None, DifficultyLevel::Low);
let (_pool, pool_addr, jds_addr, _pool_monitoring) =
start_pool_with_jds(tp.bitcoin_core(), vec![], vec![], false).await;

let (jdc, _jdc_addr, jdc_monitoring) = start_jdc_with_network_override(
&[(pool_addr, jds_addr)],
sv2_tp_config(tp_addr),
vec![],
vec![],
true,
None,
Some("regtest".to_string()),
);
let jdc_mon = jdc_monitoring.expect("jdc monitoring should be enabled");

// Wait up to 30 seconds for JDC to connect and serve the monitoring endpoint.
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
loop {
let body = fetch_api(jdc_mon, "/api/v1/global").await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
if json["network"] == "regtest" {
break;
}
if std::time::Instant::now() >= deadline {
panic!(
"jdc global info did not expose network within timeout; got: {}",
json
);
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}

shutdown_all!(jdc);
}

// ---------------------------------------------------------------------------
// 7. Translator starts cleanly when upstream_monitoring_url is unreachable.
// ---------------------------------------------------------------------------
#[tokio::test]
async fn global_info_network_unreachable_upstream() {
start_tracing();
let (_tp, tp_addr) = start_template_provider(None, DifficultyLevel::Low);
let (_pool, pool_addr, _pool_monitoring) =
start_pool(sv2_tp_config(tp_addr), vec![], vec![], false).await;

// Point translator at a port where nothing is listening.
let dead_url = Some("http://127.0.0.1:19999".to_string());
let (_tproxy, _tproxy_addr, tproxy_monitoring) = start_sv2_translator_with_upstream_monitoring(
&[pool_addr],
false,
vec![],
vec![],
None,
true,
dead_url,
)
.await;
let tproxy_mon = tproxy_monitoring.expect("tproxy monitoring should be enabled");

// Give the fetch attempt time to fail and the server to stabilise.
tokio::time::sleep(std::time::Duration::from_secs(3)).await;

// Translator must still serve the monitoring endpoint; network must be null.
let body = fetch_api(tproxy_mon, "/api/v1/global").await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(
json["network"].is_null(),
"network should be null when upstream is unreachable; got: {}",
json
);
}
Loading