diff --git a/src/main/java/io/pixelsdb/pixels/sink/freshness/FreshnessClient.java b/src/main/java/io/pixelsdb/pixels/sink/freshness/FreshnessClient.java index da35c54..8445970 100644 --- a/src/main/java/io/pixelsdb/pixels/sink/freshness/FreshnessClient.java +++ b/src/main/java/io/pixelsdb/pixels/sink/freshness/FreshnessClient.java @@ -144,9 +144,12 @@ protected Connection createNewConnection(long queryTimestamp) throws SQLExceptio properties.setProperty("user", trinoUser); if (config.isSinkMonitorFreshnessEmbedSnapshot()) { - String catalogName = "pixels"; - String sessionPropValue = String.format("%s.query_snapshot_timestamp:%d", catalogName, queryTimestamp); - properties.setProperty("sessionProperties", sessionPropValue); + String catalog = parseCatalogFromUrl(trinoJdbcUrl); + if (catalog != null && catalog.equalsIgnoreCase("pixels")) + { + String sessionPropValue = String.format("%s.query_snapshot_timestamp:%d", catalog, queryTimestamp); + properties.setProperty("sessionProperties", sessionPropValue); + } } return DriverManager.getConnection(trinoJdbcUrl, properties); } @@ -410,4 +413,34 @@ private List getStaticList() { return config.getSinkMonitorFreshnessEmbedTableList(); } + + private String parseCatalogFromUrl(String url) + { + if (url == null || !url.startsWith("jdbc:trino://")) + { + return null; + } + String withoutProtocol = url.substring("jdbc:trino://".length()); + int slashIndex = withoutProtocol.indexOf('/'); + if (slashIndex == -1) + { + return null; + } + String remaining = withoutProtocol.substring(slashIndex + 1); + + int nextSlash = remaining.indexOf('/'); + int questionMark = remaining.indexOf('?'); + + int endIndex = remaining.length(); + if (nextSlash != -1) + { + endIndex = nextSlash; + } + if (questionMark != -1 && questionMark < endIndex) + { + endIndex = questionMark; + } + + return remaining.substring(0, endIndex); + } } \ No newline at end of file diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PixelsPollingServiceImpl.java b/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PixelsPollingServiceImpl.java index 0893476..038888a 100644 --- a/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PixelsPollingServiceImpl.java +++ b/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PixelsPollingServiceImpl.java @@ -30,6 +30,7 @@ import io.pixelsdb.pixels.sink.util.MetricsFacade; import io.pixelsdb.pixels.sink.util.rateLimiter.FlushRateLimiter; import io.pixelsdb.pixels.sink.util.rateLimiter.FlushRateLimiterFactory; +import io.pixelsdb.pixels.sink.freshness.FreshnessClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +70,7 @@ public void pollEvents(SinkProto.PollRequest request, StreamObserver records = new ArrayList<>(pollBatchSize); try