Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.commons.alerting.model

import org.opensearch.Version
import org.opensearch.common.lucene.uid.Versions
import org.opensearch.commons.alerting.alerts.AlertError
import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
Expand Down Expand Up @@ -43,7 +44,8 @@ data class Alert(
val aggregationResultBucket: AggregationResultBucket? = null,
val executionId: String? = null,
val associatedAlertIds: List<String>,
val clusters: List<String>? = null
val clusters: List<String>? = null,
val target: Target? = null
) : Writeable, ToXContent {

init {
Expand Down Expand Up @@ -125,7 +127,8 @@ data class Alert(
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList(),
clusters = clusters
clusters = clusters,
target = monitor.target
)

constructor(
Expand Down Expand Up @@ -164,7 +167,8 @@ data class Alert(
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList(),
clusters = clusters
clusters = clusters,
target = monitor.target
)

constructor(
Expand Down Expand Up @@ -204,7 +208,8 @@ data class Alert(
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList(),
clusters = clusters
clusters = clusters,
target = monitor.target
)

constructor(
Expand Down Expand Up @@ -246,7 +251,8 @@ data class Alert(
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList(),
clusters = clusters
clusters = clusters,
target = monitor.target
)

constructor(
Expand Down Expand Up @@ -285,7 +291,8 @@ data class Alert(
workflowId = workflowId ?: "",
executionId = executionId,
associatedAlertIds = emptyList(),
clusters = clusters
clusters = clusters,
target = monitor.target
)

enum class State {
Expand Down Expand Up @@ -329,7 +336,12 @@ data class Alert(
aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null,
executionId = sin.readOptionalString(),
associatedAlertIds = sin.readStringList(),
clusters = sin.readOptionalStringList()
clusters = sin.readOptionalStringList(),
target = if (sin.version.onOrAfter(Version.V_3_6_0)) {
if (sin.readBoolean()) Target(sin) else null
} else {
null
}
)

fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED)
Expand Down Expand Up @@ -368,6 +380,10 @@ data class Alert(
out.writeOptionalString(executionId)
out.writeStringCollection(associatedAlertIds)
out.writeOptionalStringArray(clusters?.toTypedArray())
if (out.version.onOrAfter(Version.V_3_6_0)) {
out.writeBoolean(target != null)
target?.writeTo(out)
}
}

companion object {
Expand Down Expand Up @@ -399,6 +415,7 @@ data class Alert(
const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS
const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH
const val CLUSTERS_FIELD = "clusters"
const val TARGET_FIELD = "target"
const val NO_ID = ""
const val NO_VERSION = Versions.NOT_FOUND

Expand Down Expand Up @@ -430,6 +447,7 @@ data class Alert(
var aggAlertBucket: AggregationResultBucket? = null
val associatedAlertIds = mutableListOf<String>()
val clusters = mutableListOf<String>()
var target: Target? = null
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
Expand Down Expand Up @@ -505,6 +523,11 @@ data class Alert(
clusters.add(xcp.text())
}
}
TARGET_FIELD -> target = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) {
null
} else {
Target.parse(xcp)
}
}
}

Expand Down Expand Up @@ -534,7 +557,8 @@ data class Alert(
workflowId = workflowId,
workflowName = workflowName,
associatedAlertIds = associatedAlertIds,
clusters = if (clusters.size > 0) clusters else null
clusters = if (clusters.size > 0) clusters else null,
target = target
)
}

Expand Down Expand Up @@ -586,6 +610,7 @@ data class Alert(
aggregationResultBucket?.innerXContent(builder)

if (!clusters.isNullOrEmpty()) builder.field(CLUSTERS_FIELD, clusters.toTypedArray())
if (target != null) builder.field(TARGET_FIELD, target)

builder.endObject()
return builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ data class Monitor(
val dataSources: DataSources = DataSources(),
val deleteQueryIndexInEveryRun: Boolean? = false,
val shouldCreateSingleAlertForFindings: Boolean? = false,
val owner: String? = "alerting"
val owner: String? = "alerting",
val target: Target? = null
) : ScheduledJob {

override val type = MONITOR_TYPE
Expand Down Expand Up @@ -121,7 +122,12 @@ data class Monitor(
} else {
false
},
owner = sin.readOptionalString()
owner = sin.readOptionalString(),
target = if (sin.version.onOrAfter(Version.V_3_6_0)) {
if (sin.readBoolean()) Target(sin) else null
} else {
null
}
)

// This enum classifies different Monitors
Expand Down Expand Up @@ -183,6 +189,7 @@ data class Monitor(
builder.field(DELETE_QUERY_INDEX_IN_EVERY_RUN_FIELD, deleteQueryIndexInEveryRun)
builder.field(SHOULD_CREATE_SINGLE_ALERT_FOR_FINDINGS_FIELD, shouldCreateSingleAlertForFindings)
builder.field(OWNER_FIELD, owner)
if (target != null) builder.field(TARGET_FIELD, target)
if (params.paramAsBoolean("with_type", false)) builder.endObject()
return builder.endObject()
}
Expand Down Expand Up @@ -240,6 +247,10 @@ data class Monitor(
out.writeOptionalBoolean(shouldCreateSingleAlertForFindings)
}
out.writeOptionalString(owner)
if (out.version.onOrAfter(Version.V_3_6_0)) {
out.writeBoolean(target != null)
target?.writeTo(out)
}
}

companion object {
Expand All @@ -262,6 +273,7 @@ data class Monitor(
const val DELETE_QUERY_INDEX_IN_EVERY_RUN_FIELD = "delete_query_index_in_every_run"
const val SHOULD_CREATE_SINGLE_ALERT_FOR_FINDINGS_FIELD = "should_create_single_alert_for_findings"
const val OWNER_FIELD = "owner"
const val TARGET_FIELD = "target"
val MONITOR_TYPE_PATTERN = Pattern.compile("[a-zA-Z0-9_]{5,25}")

// This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all
Expand Down Expand Up @@ -292,6 +304,7 @@ data class Monitor(
var deleteQueryIndexInEveryRun = false
var delegateMonitor = false
var owner = "alerting"
var target: Target? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -355,6 +368,11 @@ data class Monitor(
xcp.booleanValue()
}
OWNER_FIELD -> owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text()
TARGET_FIELD -> target = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) {
target
} else {
Target.parse(xcp)
}
else -> {
xcp.skipChildren()
}
Expand Down Expand Up @@ -383,7 +401,8 @@ data class Monitor(
dataSources,
deleteQueryIndexInEveryRun,
delegateMonitor,
owner
owner,
target
)
}

Expand Down
77 changes: 77 additions & 0 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Target.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.opensearch.commons.alerting.model

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException

/**
* Describes where a monitor query executes.
*
* @property type the target type — LOCAL by default. Extensible for additional target types.
* @property endpoint the URL of the remote target. Required when type is not LOCAL.
*/
data class Target(
val type: String = LOCAL,
val endpoint: String = ""
) : Writeable, ToXContent {

init {
require(type.isNotBlank()) { "target type cannot be empty" }
if (type != LOCAL) {
require(endpoint.isNotBlank()) { "endpoint is required when target type is not LOCAL" }
}
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
type = sin.readString(),
endpoint = sin.readString()
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(TYPE_FIELD, type)
.field(ENDPOINT_FIELD, endpoint)
return builder.endObject()
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(type)
out.writeString(endpoint)
}

companion object {
const val TYPE_FIELD = "type"
const val ENDPOINT_FIELD = "endpoint"
const val LOCAL = "local"
val DEFAULT = Target()

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): Target {
var type = LOCAL
var endpoint = ""

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
TYPE_FIELD -> type = xcp.text()
ENDPOINT_FIELD -> endpoint = xcp.text()
}
}
return Target(type, endpoint)
}

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): Target = Target(sin)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.opensearch.commons.alerting.model

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Test
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.ToXContent

class TargetTests {

@Test
fun `test default Target`() {
val target = Target()
assertEquals(Target.LOCAL, target.type)
assertEquals("", target.endpoint)
}

@Test
fun `test Target with custom type and endpoint`() {
val target = Target("custom_type", "https://example.com")
assertEquals("custom_type", target.type)
assertEquals("https://example.com", target.endpoint)
}

@Test
fun `test Target with empty type requires endpoint`() {
assertThrows(IllegalArgumentException::class.java) {
Target("", "")
}
}

@Test
fun `test Target requires endpoint for non-LOCAL type`() {
assertThrows(IllegalArgumentException::class.java) {
Target("custom_type", "")
}
}

@Test
fun `test Target stream serialization roundtrip`() {
val target = Target("custom_type", "https://example.com")
val out = BytesStreamOutput()
target.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val deserialized = Target(sin)
assertEquals(target, deserialized)
}

@Test
fun `test Target XContent roundtrip`() {
val target = Target("custom_type", "https://example.com")
val builder = XContentFactory.jsonBuilder()
target.toXContent(builder, ToXContent.EMPTY_PARAMS)
val json = builder.toString()

val parser = XContentType.JSON.xContent().createParser(null, null, json)
parser.nextToken()
val parsed = Target.parse(parser)
assertEquals(target, parsed)
}
}
Loading
Loading