diff --git a/pom.xml b/pom.xml
index 4cc982e7bdd64..dda7f316acc6f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -199,7 +199,7 @@ flexible messaging model and an intuitive client API.
0.4.4-hotfix1
3.3.5
2.4.10
- 1.2.4
+ 2.16.0
8.12.1
1.9.7.Final
42.5.5
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
index 9f42dbda7be1b..33c2d34a1c992 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
@@ -59,10 +59,12 @@ public class ElasticSearchConfig implements Serializable {
)
private String indexName;
+ @Deprecated
@FieldDoc(
required = false,
defaultValue = "_doc",
- help = "The type name that the connector writes messages to, with the default value set to _doc."
+ help = "No longer in use in OpenSearch 2+. "
+ + "The type name that the connector writes messages to, with the default value set to _doc."
+ " This value should be set explicitly to a valid type name other than _doc for Elasticsearch version before 6.2,"
+ " and left to the default value otherwise."
)
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
index bb92047f17a31..87c4913529f04 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
@@ -49,12 +49,12 @@
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
-import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
-import org.opensearch.common.unit.ByteSizeUnit;
-import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
+import org.opensearch.core.common.Strings;
+import org.opensearch.core.common.unit.ByteSizeUnit;
+import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
@@ -229,7 +229,6 @@ public boolean indexDocument(String index, String documentId, String documentSou
if (!Strings.isNullOrEmpty(documentId)) {
indexRequest.id(documentId);
}
- indexRequest.type(config.getTypeName());
indexRequest.source(documentSource, XContentType.JSON);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
@@ -245,7 +244,6 @@ public boolean indexDocument(String index, String documentId, String documentSou
public boolean deleteDocument(String index, String documentId) throws IOException {
DeleteRequest deleteRequest = Requests.deleteRequest(index);
deleteRequest.id(documentId);
- deleteRequest.type(config.getTypeName());
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
if (log.isDebugEnabled()) {
log.debug("delete result {}", deleteResponse.getResult());
@@ -301,7 +299,6 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO
if (!Strings.isNullOrEmpty(request.getDocumentId())) {
indexRequest.id(request.getDocumentId());
}
- indexRequest.type(config.getTypeName());
indexRequest.source(request.getDocumentSource(), XContentType.JSON);
internalBulkProcessor.add(indexRequest);
}
@@ -310,7 +307,6 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO
public void appendDeleteRequest(BulkProcessor.BulkDeleteRequest request) throws IOException {
DeleteRequest deleteRequest = new DeleteRequestWithPulsarRecord(request.getIndex(), request.getRecord());
deleteRequest.id(request.getDocumentId());
- deleteRequest.type(config.getTypeName());
internalBulkProcessor.add(deleteRequest);
}
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
index 85e30e766f030..506df31923378 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
@@ -44,7 +44,6 @@ public final void loadFromYamlFileTest() throws IOException {
assertNotNull(config);
assertEquals(config.getElasticSearchUrl(), "http://localhost:90902");
assertEquals(config.getIndexName(), "myIndex");
- assertEquals(config.getTypeName(), "doc");
assertEquals(config.getUsername(), "scooby");
assertEquals(config.getPassword(), "doobie");
assertEquals(config.getPrimaryFields(), "id,a");
@@ -64,7 +63,6 @@ public final void loadFromMapTest() throws IOException {
assertNotNull(config);
assertEquals(config.getElasticSearchUrl(), "http://localhost:90902");
assertEquals(config.getIndexName(), "myIndex");
- assertEquals(config.getTypeName(), "doc");
assertEquals(config.getUsername(), "racerX");
assertEquals(config.getPassword(), "go-speedie-go");
assertEquals(config.getPrimaryFields(), "x");
@@ -75,7 +73,6 @@ public final void defaultValueTest() throws IOException {
Map requiredConfig = Map.of("elasticSearchUrl", "http://localhost:90902");
ElasticSearchConfig config = ElasticSearchConfig.load(requiredConfig, mockContext);
assertNull(config.getIndexName());
- assertEquals(config.getTypeName(), "_doc");
assertNull(config.getUsername());
assertNull(config.getPassword());
assertNull(config.getToken());
@@ -336,7 +333,6 @@ public final void loadConfigFromSecretsTest() throws IOException {
assertNotNull(config);
assertEquals(config.getElasticSearchUrl(), "http://localhost:90902");
assertEquals(config.getIndexName(), "myIndex");
- assertEquals(config.getTypeName(), "doc");
assertEquals(config.getPrimaryFields(), "x");
assertEquals(config.getUsername(), "secretUser");
assertEquals(config.getPassword(), "$ecret123");
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
index 0f5a42051c7d1..8c5868f27689b 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
@@ -46,7 +46,7 @@ public abstract class ElasticSearchTestBase {
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
- .orElse("opensearchproject/opensearch:1.2.4");
+ .orElse("opensearchproject/opensearch:2.16.0");
protected final String elasticImageName;
@@ -59,6 +59,7 @@ protected ElasticsearchContainer createElasticsearchContainer() {
if (elasticImageName.equals(OPENSEARCH)) {
DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH).asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
elasticsearchContainer = new ElasticsearchContainer(dockerImageName)
+ .withEnv("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "0pEn7earch!")
.withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("bootstrap.memory_lock", "true")
.withEnv("plugins.security.disabled", "true");
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java
index 72bebfe2bbf8d..0b78506491657 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java
@@ -78,7 +78,7 @@ public void testSslBasic() throws IOException {
.setElasticSearchUrl("https://" + container.getHttpHostAddress())
.setIndexName(INDEX)
.setUsername("admin")
- .setPassword("admin")
+ .setPassword("0pEn7earch!")
.setSsl(new ElasticSearchSslConfig()
.setEnabled(true)
.setTruststorePath(sslResourceDir + "/truststore.jks")
@@ -102,7 +102,7 @@ public void testSslWithHostnameVerification() throws IOException {
.setElasticSearchUrl("https://" + container.getHttpHostAddress())
.setIndexName(INDEX)
.setUsername("admin")
- .setPassword("admin")
+ .setPassword("0pEn7earch!")
.setSsl(new ElasticSearchSslConfig()
.setEnabled(true)
.setProtocols("TLSv1.2")
@@ -127,7 +127,7 @@ public void testSslWithClientAuth() throws IOException {
.setElasticSearchUrl("https://" + container.getHttpHostAddress())
.setIndexName(INDEX)
.setUsername("admin")
- .setPassword("admin")
+ .setPassword("0pEn7earch!")
.setSsl(new ElasticSearchSslConfig()
.setEnabled(true)
.setHostnameVerification(true)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
index 75f0fdac6f90c..8daed8d5c04d5 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
@@ -35,7 +35,7 @@
public class OpenSearchSinkTester extends ElasticSearchSinkTester {
public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
- .orElse("opensearchproject/opensearch:1.2.4");
+ .orElse("opensearchproject/opensearch:2.16.0");
private RestHighLevelClient elasticClient;
@@ -49,6 +49,7 @@ protected ElasticsearchContainer createElasticContainer() {
DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
return new ElasticsearchContainer(dockerImageName)
+ .withEnv("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "0pEn7earch!")
.withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("bootstrap.memory_lock", "true")
.withEnv("plugins.security.disabled", "true");