From fdbbfee163fa5c14a9214d0c757d5f924ab3e275 Mon Sep 17 00:00:00 2001 From: Manaswini Ragamouni Date: Wed, 25 Mar 2026 22:05:00 +0000 Subject: [PATCH] Reapply "Add Target object for external data source support on Monitor and Alert (#916)" (#917) This reverts commit a4d2442208cd57bb937c744e14cc989efa4e50b1. Signed-off-by: Manaswini Ragamouni --- .../commons/alerting/model/Alert.kt | 41 ++++++++-- .../commons/alerting/model/Monitor.kt | 25 +++++- .../commons/alerting/model/Target.kt | 77 +++++++++++++++++++ .../commons/alerting/model/TargetTests.kt | 64 +++++++++++++++ .../commons/alerting/model/XContentTests.kt | 64 +++++++++++++++ 5 files changed, 260 insertions(+), 11 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/Target.kt create mode 100644 src/test/kotlin/org/opensearch/commons/alerting/model/TargetTests.kt diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt index e435c866..6e10b37f 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt @@ -1,5 +1,6 @@ package org.opensearch.commons.alerting.model +import org.opensearch.Version import org.opensearch.common.lucene.uid.Versions import org.opensearch.commons.alerting.alerts.AlertError import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION @@ -43,7 +44,8 @@ data class Alert( val aggregationResultBucket: AggregationResultBucket? = null, val executionId: String? = null, val associatedAlertIds: List, - val clusters: List? = null + val clusters: List? = null, + val target: Target? = null ) : Writeable, ToXContent { init { @@ -125,7 +127,8 @@ data class Alert( workflowId = workflowId ?: "", workflowName = "", associatedAlertIds = emptyList(), - clusters = clusters + clusters = clusters, + target = monitor.target ) constructor( @@ -164,7 +167,8 @@ data class Alert( workflowId = workflowId ?: "", workflowName = "", associatedAlertIds = emptyList(), - clusters = clusters + clusters = clusters, + target = monitor.target ) constructor( @@ -204,7 +208,8 @@ data class Alert( workflowId = workflowId ?: "", workflowName = "", associatedAlertIds = emptyList(), - clusters = clusters + clusters = clusters, + target = monitor.target ) constructor( @@ -246,7 +251,8 @@ data class Alert( workflowId = workflowId ?: "", workflowName = "", associatedAlertIds = emptyList(), - clusters = clusters + clusters = clusters, + target = monitor.target ) constructor( @@ -285,7 +291,8 @@ data class Alert( workflowId = workflowId ?: "", executionId = executionId, associatedAlertIds = emptyList(), - clusters = clusters + clusters = clusters, + target = monitor.target ) enum class State { @@ -329,7 +336,12 @@ data class Alert( aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null, executionId = sin.readOptionalString(), associatedAlertIds = sin.readStringList(), - clusters = sin.readOptionalStringList() + clusters = sin.readOptionalStringList(), + target = if (sin.version.onOrAfter(Version.V_3_6_0)) { + if (sin.readBoolean()) Target(sin) else null + } else { + null + } ) fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED) @@ -368,6 +380,10 @@ data class Alert( out.writeOptionalString(executionId) out.writeStringCollection(associatedAlertIds) out.writeOptionalStringArray(clusters?.toTypedArray()) + if (out.version.onOrAfter(Version.V_3_6_0)) { + out.writeBoolean(target != null) + target?.writeTo(out) + } } companion object { @@ -399,6 +415,7 @@ data class Alert( const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH const val CLUSTERS_FIELD = "clusters" + const val TARGET_FIELD = "target" const val NO_ID = "" const val NO_VERSION = Versions.NOT_FOUND @@ -430,6 +447,7 @@ data class Alert( var aggAlertBucket: AggregationResultBucket? = null val associatedAlertIds = mutableListOf() val clusters = mutableListOf() + var target: Target? = null ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { val fieldName = xcp.currentName() @@ -505,6 +523,11 @@ data class Alert( clusters.add(xcp.text()) } } + TARGET_FIELD -> target = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + null + } else { + Target.parse(xcp) + } } } @@ -534,7 +557,8 @@ data class Alert( workflowId = workflowId, workflowName = workflowName, associatedAlertIds = associatedAlertIds, - clusters = if (clusters.size > 0) clusters else null + clusters = if (clusters.size > 0) clusters else null, + target = target ) } @@ -586,6 +610,7 @@ data class Alert( aggregationResultBucket?.innerXContent(builder) if (!clusters.isNullOrEmpty()) builder.field(CLUSTERS_FIELD, clusters.toTypedArray()) + if (target != null) builder.field(TARGET_FIELD, target) builder.endObject() return builder diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt index 268a5a37..63cc3f0b 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt @@ -44,7 +44,8 @@ data class Monitor( val dataSources: DataSources = DataSources(), val deleteQueryIndexInEveryRun: Boolean? = false, val shouldCreateSingleAlertForFindings: Boolean? = false, - val owner: String? = "alerting" + val owner: String? = "alerting", + val target: Target? = null ) : ScheduledJob { override val type = MONITOR_TYPE @@ -121,7 +122,12 @@ data class Monitor( } else { false }, - owner = sin.readOptionalString() + owner = sin.readOptionalString(), + target = if (sin.version.onOrAfter(Version.V_3_6_0)) { + if (sin.readBoolean()) Target(sin) else null + } else { + null + } ) // This enum classifies different Monitors @@ -183,6 +189,7 @@ data class Monitor( builder.field(DELETE_QUERY_INDEX_IN_EVERY_RUN_FIELD, deleteQueryIndexInEveryRun) builder.field(SHOULD_CREATE_SINGLE_ALERT_FOR_FINDINGS_FIELD, shouldCreateSingleAlertForFindings) builder.field(OWNER_FIELD, owner) + if (target != null) builder.field(TARGET_FIELD, target) if (params.paramAsBoolean("with_type", false)) builder.endObject() return builder.endObject() } @@ -240,6 +247,10 @@ data class Monitor( out.writeOptionalBoolean(shouldCreateSingleAlertForFindings) } out.writeOptionalString(owner) + if (out.version.onOrAfter(Version.V_3_6_0)) { + out.writeBoolean(target != null) + target?.writeTo(out) + } } companion object { @@ -262,6 +273,7 @@ data class Monitor( const val DELETE_QUERY_INDEX_IN_EVERY_RUN_FIELD = "delete_query_index_in_every_run" const val SHOULD_CREATE_SINGLE_ALERT_FOR_FINDINGS_FIELD = "should_create_single_alert_for_findings" const val OWNER_FIELD = "owner" + const val TARGET_FIELD = "target" val MONITOR_TYPE_PATTERN = Pattern.compile("[a-zA-Z0-9_]{5,25}") // This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all @@ -292,6 +304,7 @@ data class Monitor( var deleteQueryIndexInEveryRun = false var delegateMonitor = false var owner = "alerting" + var target: Target? = null XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -355,6 +368,11 @@ data class Monitor( xcp.booleanValue() } OWNER_FIELD -> owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text() + TARGET_FIELD -> target = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + target + } else { + Target.parse(xcp) + } else -> { xcp.skipChildren() } @@ -383,7 +401,8 @@ data class Monitor( dataSources, deleteQueryIndexInEveryRun, delegateMonitor, - owner + owner, + target ) } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Target.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Target.kt new file mode 100644 index 00000000..e0625c48 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Target.kt @@ -0,0 +1,77 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import java.io.IOException + +/** + * Describes where a monitor query executes. + * + * @property type the target type — LOCAL by default. Extensible for additional target types. + * @property endpoint the URL of the remote target. Required when type is not LOCAL. + */ +data class Target( + val type: String = LOCAL, + val endpoint: String = "" +) : Writeable, ToXContent { + + init { + require(type.isNotBlank()) { "target type cannot be empty" } + if (type != LOCAL) { + require(endpoint.isNotBlank()) { "endpoint is required when target type is not LOCAL" } + } + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + type = sin.readString(), + endpoint = sin.readString() + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(TYPE_FIELD, type) + .field(ENDPOINT_FIELD, endpoint) + return builder.endObject() + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(type) + out.writeString(endpoint) + } + + companion object { + const val TYPE_FIELD = "type" + const val ENDPOINT_FIELD = "endpoint" + const val LOCAL = "local" + val DEFAULT = Target() + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): Target { + var type = LOCAL + var endpoint = "" + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + when (fieldName) { + TYPE_FIELD -> type = xcp.text() + ENDPOINT_FIELD -> endpoint = xcp.text() + } + } + return Target(type, endpoint) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): Target = Target(sin) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/TargetTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/TargetTests.kt new file mode 100644 index 00000000..d4660225 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/TargetTests.kt @@ -0,0 +1,64 @@ +package org.opensearch.commons.alerting.model + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.Test +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.xcontent.ToXContent + +class TargetTests { + + @Test + fun `test default Target`() { + val target = Target() + assertEquals(Target.LOCAL, target.type) + assertEquals("", target.endpoint) + } + + @Test + fun `test Target with custom type and endpoint`() { + val target = Target("custom_type", "https://example.com") + assertEquals("custom_type", target.type) + assertEquals("https://example.com", target.endpoint) + } + + @Test + fun `test Target with empty type requires endpoint`() { + assertThrows(IllegalArgumentException::class.java) { + Target("", "") + } + } + + @Test + fun `test Target requires endpoint for non-LOCAL type`() { + assertThrows(IllegalArgumentException::class.java) { + Target("custom_type", "") + } + } + + @Test + fun `test Target stream serialization roundtrip`() { + val target = Target("custom_type", "https://example.com") + val out = BytesStreamOutput() + target.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val deserialized = Target(sin) + assertEquals(target, deserialized) + } + + @Test + fun `test Target XContent roundtrip`() { + val target = Target("custom_type", "https://example.com") + val builder = XContentFactory.jsonBuilder() + target.toXContent(builder, ToXContent.EMPTY_PARAMS) + val json = builder.toString() + + val parser = XContentType.JSON.xContent().createParser(null, null, json) + parser.nextToken() + val parsed = Target.parse(parser) + assertEquals(target, parsed) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index 252be78f..3138552e 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -611,4 +611,68 @@ class XContentTests { val parsedComment = Comment.parse(parser(commentString), "123") Assertions.assertEquals(comment, parsedComment, "Round tripping Comment doesn't work") } + + @Test + fun `test monitor with target parsing roundtrip`() { + val monitor = randomQueryLevelMonitor().copy( + target = Target("custom_type", "https://example.com") + ) + val monitorString = monitor.toJsonStringWithUser() + val parsedMonitor = Monitor.parse(parser(monitorString)) + assertEquals("Round tripping Monitor with Target doesn't work", monitor, parsedMonitor) + } + + @Test + fun `test monitor with default target parsing roundtrip`() { + val monitor = randomQueryLevelMonitor() + val monitorString = monitor.toJsonStringWithUser() + val parsedMonitor = Monitor.parse(parser(monitorString)) + assertEquals("Round tripping Monitor with default Target doesn't work", monitor, parsedMonitor) + assertEquals("Default target should be null", null, parsedMonitor.target) + } + + @Test + fun `test alert with target from monitor`() { + val monitor = randomQueryLevelMonitor().copy( + target = Target("custom_type", "https://example.com") + ) + val alert = randomAlert(monitor) + assertEquals("Alert should copy target from monitor", monitor.target, alert.target) + } + + @Test + fun `test alert with target XContent roundtrip`() { + val monitor = randomQueryLevelMonitor().copy( + target = Target("custom_type", "https://example.com") + ) + val alert = randomAlert(monitor) + val alertString = alert.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() + val parsedAlert = Alert.parse(parser(alertString)) + assertEquals("Round tripping Alert with Target doesn't work", alert.target, parsedAlert.target) + } + + @Test + fun `test alert with default target from monitor`() { + val monitor = randomQueryLevelMonitor() + val alert = randomAlert(monitor) + assertEquals("Alert target should be null for default monitor", null, alert.target) + } + + @Test + fun `test bucket-level monitor with target parsing roundtrip`() { + val monitor = randomBucketLevelMonitor().copy( + target = Target("custom_type", "https://example.com") + ) + val monitorString = monitor.toJsonStringWithUser() + val parsedMonitor = Monitor.parse(parser(monitorString)) + assertEquals("Round tripping bucket-level Monitor with Target doesn't work", monitor, parsedMonitor) + } + + @Test + fun `test old monitor format without target parses with default`() { + val monitor = randomQueryLevelMonitor() + val monitorString = monitor.toJsonStringWithUser() + val parsedMonitor = Monitor.parse(parser(monitorString)) + assertEquals("Default target should be null", null, parsedMonitor.target) + } }