diff --git a/connectors/rocketmq-connect-http/README.md b/connectors/rocketmq-connect-http/README.md
index 8b6d5c70..076c4e4f 100644
--- a/connectors/rocketmq-connect-http/README.md
+++ b/connectors/rocketmq-connect-http/README.md
@@ -15,13 +15,18 @@ mvn clean install -Dmaven.test.skip=true
```
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-sink-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "${connect-topicname}","url":"${url}"}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"HttpSinkConnector",
+"urlPattern":"${urlPattern}","method":"${method}","queryStringParameters":"${queryStringParameters}","headerParameters":"${headerParameters}","bodys":"${bodys}","authType":"${authType}","basicUser":"${basicUser}","basicPassword":"${basicPassword}",
+"oauth2Endpoint":"${oauth2Endpoint}","oauth2ClientId":"${oauth2ClientId}","oauth2ClientSecret":"${oauth2ClientSecret}","oauth2HttpMethod":"${oauth2HttpMethod}","proxyType":"${proxyType}","proxyHost":"${proxyHost}","proxyPort":"${proxyPort}","proxyUser":"${proxyUser}",
+"proxyPort":"${proxyPort}","proxyPort":"${proxyPort}","proxyUser":"${proxyUser}","proxyPassword":"${proxyPassword}","apiKeyName":"${apiKeyName}","apiKeyValue":"${apiKeyValue}","timeout":"${timeout}"}
```
-例子
+例子
```
http://localhost:8081/connectors/httpConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "http-topic","url":"192.168.1.2"}
+"connector-class":"HttpSinkConnector","urlPattern":"http://127.0.0.1","method":"POST","queryStringParameters":"","headerParameters":"","bodys":"{"id" : "234"}","authType":"BASIC_AUTH","basicUser":"","basicPassword":"",
+"oauth2Endpoint":"","oauth2ClientId":"","oauth2ClientSecret":"","oauth2HttpMethod":"","proxyType":"","proxyHost":"","proxyPort":"","proxyUser":"",
+"proxyPort":"","proxyPort":"","proxyUser":"","proxyPassword":"","apiKeyName":"","apiKeyValue":"","timeout":"6000"}
```
>**注:** `rocketmq-http-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -35,8 +40,25 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-connector-name}/
## rocketmq-connect-http 参数说明
* **http-sink-connector 参数说明**
-| KEY | TYPE | Must be filled | Description | Example
-|-----|---------|----------------|-------------|------------------|
-| url | String | YES | sink端 域名地址 | http://127.0.0.1 |
-|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx |
-
+| KEY | TYPE | Must be filled | Description | Example
+|-----------------------|---------|----------------|----------------|------------------|
+| urlPattern | String | YES | sink端 域名地址 | http://127.0.0.1 |
+| method | String | YES | 请求类型 | POST、GET |
+| queryStringParameters | String | NO | 请求参数 | xxxx |
+| headerParameters | String | NO | 请求头 | xxxx |
+| bodys | String | NO | 请求体 | xxxx |
+| authType | String | NO | 权限类型 | BASIC_AUTH、OAUTH_AUTH、API_KEY_AUTH |
+| basicUser | String | NO | 用户名 | xxxx |
+| basicPassword | String | NO | 密码 | xxxx |
+| oauth2Endpoint | String | NO | OAuth获取token地址 | http://127.0.0.1 |
+| oauth2ClientId | String | NO | clientId | xxxx |
+| oauth2ClientSecret | String | NO | client secret | xxxx |
+| oauth2HttpMethod | String | NO | oauth的请求类型 | xxxx |
+| proxyType | String | NO | 代理类型 | xxxx |
+| proxyHost | String | NO | 代理地址 | xxxx |
+| proxyPort | String | NO | 代理端口 | xxxx |
+| proxyUser | String | NO | 代理的访问的用户名 | xxxx |
+| proxyPassword | String | NO | 代理访问的密码 | xxxx |
+| apiKeyName | String | NO | auth api key | xxxx |
+| apiKeyValue | String | NO | auth api value | xxxx |
+| timeout | String | NO | 超时时间 | xxxx |
diff --git a/connectors/rocketmq-connect-http/pom.xml b/connectors/rocketmq-connect-http/pom.xml
index 7bbadfe0..f0b655e8 100644
--- a/connectors/rocketmq-connect-http/pom.xml
+++ b/connectors/rocketmq-connect-http/pom.xml
@@ -14,15 +14,18 @@
8
1.7.7
1.2.9
- 2.9.0
- 4.13.1
- 2.6.0
- 2.6.3
- 0.1.2-SNAPSHOT
+ 2.10
+ 4.13.2
+ 3.23.1
+ 4.8.0
+ 0.1.5-SNAPSHOT
3.9.1
- 1.2.83
+ 2.0.19
3.12.0
UTF-8
+ 31.1-jre
+ 4.4.1
+ 4.1.85.Final
@@ -183,9 +186,15 @@
test
- com.squareup.okhttp3
- okhttp
- ${okhttp.version}
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3.version}
+
+
+ com.google.code.gson
+ gson
+ ${gson.version}
+ compile
com.alibaba
@@ -193,9 +202,25 @@
${fastjson.version}
- org.apache.commons
- commons-lang3
- ${commons-lang3.version}
+ com.google.guava
+ guava
+ ${guava.version}
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpclient.version}
+
+
+ org.apache.httpcomponents
+ httpcore
+ ${httpclient.version}
+
+
+ io.netty
+ netty-all
+ ${netty-all.version}
+ compile
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java
index 54c7efe5..c143faca 100644
--- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java
@@ -1,36 +1,63 @@
package org.apache.rocketmq.connect.http.sink;
-import org.apache.rocketmq.connect.http.sink.constant.HttpConstant;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.sink.SinkConnector;
import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.http.sink.constant.HttpConstant;
+import org.apache.rocketmq.connect.http.sink.util.CheckUtils;
-import java.net.URL;
-import java.net.URLConnection;
import java.util.ArrayList;
import java.util.List;
public class HttpSinkConnector extends SinkConnector {
- private String url;
-
- @Override
- public void pause() {
-
- }
+ protected String urlPattern;
+ protected String method;
+ protected String queryStringParameters;
+ protected String headerParameters;
+ protected String bodys;
+ protected String authType;
+ protected String basicUser;
+ protected String basicPassword;
+ protected String oauth2Endpoint;
+ protected String oauth2ClientId;
+ protected String oauth2ClientSecret;
+ protected String oauth2HttpMethod;
+ protected String proxyType;
+ protected String proxyHost;
+ protected String proxyPort;
+ protected String proxyUser;
+ protected String proxyPassword;
+ protected String timeout;
+ protected String apiKeyName;
+ protected String apiKeyValue;
- @Override
- public void resume() {
-
- }
@Override
public List taskConfigs(int maxTasks) {
List keyValueList = new ArrayList<>(11);
KeyValue keyValue = new DefaultKeyValue();
- keyValue.put(HttpConstant.URL_CONSTANT, url);
+ keyValue.put(HttpConstant.URL_PATTERN_CONSTANT, urlPattern);
+ keyValue.put(HttpConstant.METHOD_CONSTANT, method);
+ keyValue.put(HttpConstant.QUERY_STRING_PARAMETERS_CONSTANT, queryStringParameters);
+ keyValue.put(HttpConstant.HEADER_PARAMETERS_CONSTANT, headerParameters);
+ keyValue.put(HttpConstant.BODYS_CONSTANT, bodys);
+ keyValue.put(HttpConstant.AUTH_TYPE_CONSTANT, authType);
+ keyValue.put(HttpConstant.BASIC_USER_CONSTANT, basicUser);
+ keyValue.put(HttpConstant.BASIC_PASSWORD_CONSTANT, basicPassword);
+ keyValue.put(HttpConstant.OAUTH2_ENDPOINT_CONSTANT, oauth2Endpoint);
+ keyValue.put(HttpConstant.OAUTH2_CLIENTID_CONSTANT, oauth2ClientId);
+ keyValue.put(HttpConstant.OAUTH2_CLIENTSECRET_CONSTANT, oauth2ClientSecret);
+ keyValue.put(HttpConstant.OAUTH2_HTTP_METHOD_CONSTANT, oauth2HttpMethod);
+ keyValue.put(HttpConstant.PROXY_TYPE_CONSTANT, proxyType);
+ keyValue.put(HttpConstant.PROXY_HOST_CONSTANT, proxyHost);
+ keyValue.put(HttpConstant.PROXY_PORT_CONSTANT, proxyPort);
+ keyValue.put(HttpConstant.PROXY_USER_CONSTANT, proxyUser);
+ keyValue.put(HttpConstant.PROXY_PASSWORD_CONSTANT, proxyPassword);
+ keyValue.put(HttpConstant.TIMEOUT_CONSTANT, timeout);
+ keyValue.put(HttpConstant.API_KEY_NAME, apiKeyName);
+ keyValue.put(HttpConstant.API_KEY_VALUE, apiKeyValue);
keyValueList.add(keyValue);
return keyValueList;
}
@@ -42,24 +69,35 @@ public Class extends Task> taskClass() {
@Override
public void validate(KeyValue config) {
- if (StringUtils.isBlank(config.getString(HttpConstant.URL_CONSTANT))) {
- throw new RuntimeException("http required parameter is null !");
- }
- try {
- URL urlConnect = new URL(config.getString(HttpConstant.URL_CONSTANT));
- URLConnection urlConnection = urlConnect.openConnection();
- urlConnection.setConnectTimeout(5000);
- urlConnection.connect();
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage());
- }
}
@Override
- public void init(KeyValue config) {
- url = config.getString(HttpConstant.URL_CONSTANT);
+ public void start(KeyValue config) {
+ urlPattern = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.URL_PATTERN_CONSTANT));
+ method = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.METHOD_CONSTANT));
+ queryStringParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.QUERY_STRING_PARAMETERS_CONSTANT));
+ headerParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.HEADER_PARAMETERS_CONSTANT));
+ bodys = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BODYS_CONSTANT));
+ authType = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.AUTH_TYPE_CONSTANT));
+ basicUser = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BASIC_USER_CONSTANT));
+ basicPassword = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BASIC_PASSWORD_CONSTANT));
+ oauth2Endpoint = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_ENDPOINT_CONSTANT));
+ oauth2ClientId = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_CLIENTID_CONSTANT));
+ oauth2ClientSecret = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_CLIENTSECRET_CONSTANT));
+ oauth2HttpMethod = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_HTTP_METHOD_CONSTANT));
+ proxyType = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_TYPE_CONSTANT));
+ proxyHost = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_HOST_CONSTANT));
+ proxyPort = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_PORT_CONSTANT));
+ proxyUser = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_USER_CONSTANT));
+ proxyPassword = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_PASSWORD_CONSTANT));
+ timeout = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.TIMEOUT_CONSTANT));
+ apiKeyName = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.API_KEY_NAME));
+ apiKeyValue = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.API_KEY_VALUE));
+ queryStringParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.QUERY_STRING_PARAMETERS_CONSTANT));
+ headerParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.HEADER_PARAMETERS_CONSTANT));
}
+
@Override
public void stop() {
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
index 603bafaf..fc5a6b0d 100644
--- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
@@ -1,61 +1,226 @@
package org.apache.rocketmq.connect.http.sink;
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Maps;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
-import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.errors.ConnectException;
-import org.apache.rocketmq.connect.http.sink.common.OkHttpUtils;
+import io.openmessaging.connector.api.errors.RetriableException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.http.sink.auth.AbstractHttpClient;
+import org.apache.rocketmq.connect.http.sink.auth.ApacheHttpClientImpl;
+import org.apache.rocketmq.connect.http.sink.auth.ApiKeyImpl;
+import org.apache.rocketmq.connect.http.sink.auth.BasicAuthImpl;
+import org.apache.rocketmq.connect.http.sink.auth.HttpCallback;
+import org.apache.rocketmq.connect.http.sink.auth.OAuthClientImpl;
+import org.apache.rocketmq.connect.http.sink.constant.AuthTypeEnum;
import org.apache.rocketmq.connect.http.sink.constant.HttpConstant;
+import org.apache.rocketmq.connect.http.sink.entity.ClientConfig;
+import org.apache.rocketmq.connect.http.sink.entity.HttpRequest;
+import org.apache.rocketmq.connect.http.sink.util.CheckUtils;
+import org.apache.rocketmq.connect.http.sink.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class HttpSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(HttpSinkTask.class);
+ private static final int DEFAULT_CONSUMER_TIMEOUT_SECONDS = 30;
- private String url;
+ protected ScheduledExecutorService scheduledExecutorService;
+ protected String urlPattern;
+ protected String method;
+ protected String queryStringParameters;
+ protected String headerParameters;
+ protected String bodys;
+ protected String authType;
+ protected String basicUser;
+ protected String basicPassword;
+ protected String oauth2Endpoint;
+ protected String oauth2ClientId;
+ protected String oauth2ClientSecret;
+ protected String oauth2HttpMethod;
+ protected String proxyType;
+ protected String proxyHost;
+ protected String proxyPort;
+ protected String proxyUser;
+ protected String proxyPassword;
+ protected String apiKeyName;
+ protected String apiKeyValue;
+ protected String timeout;
+
+ private AbstractHttpClient httpClient;
+
+ private OAuthClientImpl oAuthClient;
+
+ private BasicAuthImpl basicAuth;
+
+ private ApiKeyImpl apiKey;
@Override
public void put(List sinkRecords) throws ConnectException {
try {
- sinkRecords.forEach(connectRecord -> OkHttpUtils.builder()
- .url(url)
- .addParam(HttpConstant.DATA_CONSTANT, connectRecord.getData().toString())
- .post(true)
- .sync());
+ CountDownLatch countDownLatch = new CountDownLatch(sinkRecords.size());
+ HttpCallback httpCallback = new HttpCallback(countDownLatch);
+ for (ConnectRecord connectRecord : sinkRecords) {
+ ClientConfig clientConfig = getClientConfig(connectRecord);
+ Map headerMap = Maps.newHashMap();
+ addHeaderMap(headerMap, clientConfig);
+ if (StringUtils.isNotBlank(clientConfig.getAuthType())) {
+ headerMap.putAll(auth(clientConfig));
+ }
+ HttpRequest httpRequest = new HttpRequest();
+ httpRequest.setBody(clientConfig.getBodys());
+ httpRequest.setHeaderMap(headerMap);
+ httpRequest.setMethod(clientConfig.getMethod());
+ httpRequest.setTimeout(clientConfig.getTimeout());
+ httpRequest.setUrl(JsonUtils.queryStringAndPathValue(clientConfig.getUrlPattern(), clientConfig.getQueryStringParameters(), connectRecord.getExtension(HttpConstant.HTTP_PATH_VALUE)));
+ httpClient.execute(httpRequest, httpCallback);
+ }
+ boolean consumeSucceed = Boolean.FALSE;
+ try {
+ consumeSucceed = countDownLatch.await(DEFAULT_CONSUMER_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ } catch (Throwable e) {
+ log.error("count down latch failed.", e);
+ }
+ if (!consumeSucceed) {
+ throw new RetriableException("Request Timeout");
+ }
+ if (httpCallback.isFailed()) {
+ throw new RetriableException(httpCallback.getMsg());
+ }
} catch (Exception e) {
log.error("HttpSinkTask | put | error => ", e);
+ throw new RuntimeException(e);
}
}
- @Override
- public void pause() {
-
+ private ClientConfig getClientConfig(ConnectRecord connectRecord) {
+ ClientConfig clientConfig = new ClientConfig();
+ clientConfig.setHttpClient(httpClient);
+ clientConfig.setUrlPattern(urlPattern);
+ clientConfig.setMethod(CheckUtils.checkNull(method) ? connectRecord.getExtension(HttpConstant.HTTP_METHOD) : method);
+ clientConfig.setAuthType(authType);
+ clientConfig.setHttpPathValue(connectRecord.getExtension(HttpConstant.HTTP_PATH_VALUE));
+ clientConfig.setQueryStringParameters(JsonUtils.mergeJson(JSONObject.parseObject(connectRecord.getExtension(HttpConstant.HTTP_QUERY_VALUE)), JSONObject.parseObject(queryStringParameters)) == null ? null : JsonUtils.mergeJson(JSONObject.parseObject(connectRecord.getExtension(HttpConstant.HTTP_QUERY_VALUE)), JSONObject.parseObject(queryStringParameters)).toJSONString());
+ clientConfig.setHeaderParameters(JsonUtils.mergeJson(JSONObject.parseObject(connectRecord.getExtension(HttpConstant.HTTP_HEADER)), JSONObject.parseObject(headerParameters)) == null ? null : JsonUtils.mergeJson(JSONObject.parseObject(connectRecord.getExtension(HttpConstant.HTTP_HEADER)), JSONObject.parseObject(headerParameters)).toJSONString());
+ clientConfig.setBodys(bodys);
+ clientConfig.setProxyUser(proxyUser);
+ clientConfig.setProxyPassword(proxyPassword);
+ clientConfig.setProxyType(proxyType);
+ clientConfig.setProxyPort(proxyPort);
+ clientConfig.setProxyHost(proxyHost);
+ clientConfig.setOauth2ClientId(oauth2ClientId);
+ clientConfig.setOauth2ClientSecret(oauth2ClientSecret);
+ clientConfig.setTimeout(timeout);
+ clientConfig.setOauth2HttpMethod(oauth2HttpMethod);
+ clientConfig.setOauth2Endpoint(oauth2Endpoint);
+ clientConfig.setBasicUser(basicUser);
+ clientConfig.setBasicPassword(basicPassword);
+ clientConfig.setApiKeyName(apiKeyName);
+ clientConfig.setApiKeyValue(apiKeyValue);
+ return clientConfig;
}
- @Override
- public void resume() {
-
+ private void addHeaderMap(Map headerMap, ClientConfig clientConfig) {
+ String header = clientConfig.getHeaderParameters();
+ if (StringUtils.isBlank(header)) {
+ return;
+ }
+ JSONObject jsonObject = JSONObject.parseObject(header);
+ for (Map.Entry entry : jsonObject.entrySet()) {
+ if (entry.getValue() instanceof JSONObject) {
+ headerMap.put(entry.getKey(), ((JSONObject) entry.getValue()).toJSONString());
+ } else {
+ headerMap.put(entry.getKey(), (String) entry.getValue());
+ }
+ }
}
@Override
public void validate(KeyValue config) {
+ if (CheckUtils.checkNull(config.getString(HttpConstant.URL_PATTERN_CONSTANT))
+ || CheckUtils.checkNull(config.getString(HttpConstant.METHOD_CONSTANT))) {
+ throw new RuntimeException("http required parameter is null !");
+ }
+ final List collect = Arrays.stream(AuthTypeEnum.values()).filter(authTypeEnum -> authTypeEnum.getAuthType().equals(config.getString(HttpConstant.AUTH_TYPE_CONSTANT))).collect(Collectors.toList());
+ if (collect.isEmpty()) {
+ throw new RuntimeException("authType required parameter check is fail !");
+ }
}
@Override
- public void init(KeyValue config) {
- url = config.getString(HttpConstant.URL_CONSTANT);
+ public void start(KeyValue config) {
+ urlPattern = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.URL_PATTERN_CONSTANT));
+ method = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.METHOD_CONSTANT));
+ bodys = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BODYS_CONSTANT));
+ authType = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.AUTH_TYPE_CONSTANT));
+ basicUser = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BASIC_USER_CONSTANT));
+ basicPassword = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BASIC_PASSWORD_CONSTANT));
+ oauth2Endpoint = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_ENDPOINT_CONSTANT));
+ oauth2ClientId = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_CLIENTID_CONSTANT));
+ oauth2ClientSecret = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_CLIENTSECRET_CONSTANT));
+ oauth2HttpMethod = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_HTTP_METHOD_CONSTANT));
+ proxyType = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_TYPE_CONSTANT));
+ proxyHost = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_HOST_CONSTANT));
+ proxyPort = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_PORT_CONSTANT));
+ proxyUser = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_USER_CONSTANT));
+ proxyPassword = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_PASSWORD_CONSTANT));
+ timeout = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.TIMEOUT_CONSTANT));
+ apiKeyName = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.API_KEY_NAME));
+ apiKeyValue = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.API_KEY_VALUE));
+ queryStringParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.QUERY_STRING_PARAMETERS_CONSTANT));
+ headerParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.HEADER_PARAMETERS_CONSTANT));
+ try {
+ httpClient = new ApacheHttpClientImpl();
+ ClientConfig proxyConfig = new ClientConfig();
+ proxyConfig.setProxyHost(proxyHost);
+ proxyConfig.setProxyPort(proxyPort);
+ proxyConfig.setProxyType(proxyType);
+ proxyConfig.setProxyUser(proxyUser);
+ proxyConfig.setProxyPassword(proxyPassword);
+ httpClient.init(proxyConfig);
+ oAuthClient = new OAuthClientImpl();
+ oAuthClient.setOauthMap(new ConcurrentHashMap<>(16));
+ scheduledExecutorService.scheduleAtFixedRate(oAuthClient, 1, 1, TimeUnit.SECONDS);
+ basicAuth = new BasicAuthImpl();
+ apiKey = new ApiKeyImpl();
+ } catch (Exception e) {
+ log.error("HttpSinkTask | start | error => ", e);
+ throw new RuntimeException(e);
+ }
}
@Override
- public void start(SinkTaskContext sinkTaskContext) {
- super.start(sinkTaskContext);
+ public void stop() {
+ httpClient.close();
}
- @Override
- public void stop() {
+ private Map auth(ClientConfig config) {
+ switch (config.getAuthType()) {
+ case "BASIC_AUTH":
+ return basicAuth.auth(config);
+ case "OAUTH_AUTH":
+ return oAuthClient.auth(config);
+ case "API_KEY_AUTH":
+ return apiKey.auth(config);
+ default:
+ break;
+ }
+ return new HashMap<>(16);
+ }
+ public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+ this.scheduledExecutorService = scheduledExecutorService;
}
}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/AbstractHttpClient.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/AbstractHttpClient.java
new file mode 100644
index 00000000..7c996f54
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/AbstractHttpClient.java
@@ -0,0 +1,26 @@
+package org.apache.rocketmq.connect.http.sink.auth;
+
+
+import org.apache.rocketmq.connect.http.sink.entity.ClientConfig;
+import org.apache.rocketmq.connect.http.sink.entity.HttpRequest;
+
+import java.io.IOException;
+
+public interface AbstractHttpClient {
+
+ /**
+ *
+ * @param config
+ */
+ void init(ClientConfig config);
+
+ /**
+ *
+ * @return
+ * @throws IOException
+ */
+ String execute(HttpRequest httpRequest, HttpCallback httpCallback) throws Exception;
+
+ void close();
+
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApacheHttpClientImpl.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApacheHttpClientImpl.java
new file mode 100644
index 00000000..c4b1c4b7
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApacheHttpClientImpl.java
@@ -0,0 +1,263 @@
+package org.apache.rocketmq.connect.http.sink.auth;
+
+import com.google.common.net.MediaType;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpOptions;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpTrace;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.DnsResolver;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustStrategy;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.protocol.HTTP;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.rocketmq.connect.http.sink.entity.ClientConfig;
+import org.apache.rocketmq.connect.http.sink.entity.HttpRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.security.cert.X509Certificate;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.rocketmq.connect.http.sink.constant.HttpConstant.LOG_SIFT_TAG;
+
+
+public class ApacheHttpClientImpl implements AbstractHttpClient {
+ private static final Logger log = LoggerFactory.getLogger(ApacheHttpClientImpl.class);
+
+ private static ExecutorService executorServicePool = new ThreadPoolExecutor(200, 2000, 600, TimeUnit.SECONDS,
+ new LinkedBlockingDeque(1000), new DefaultThreadFactory("ApacheHttpClientRequestThread"));
+ private CloseableHttpClient httpClient = null;
+
+ private SocksProxyConfig socksProxyConfig;
+ private static final String SOCKS_ADDRESS_KEY = "socks.address";
+
+ @Override
+ public void init(ClientConfig config) {
+ try {
+ SSLContextBuilder sslContextBuilder = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
+ @Override
+ public boolean isTrusted(X509Certificate[] chain, String authType) {
+ return true;
+ }
+ });
+ Registry reg = RegistryBuilder.create().register("http",
+ new SocksPlainConnectionSocketFactory())
+ .register("https", new SocksSSLConnectionSocketFactory(sslContextBuilder.build()))
+ .build();
+ PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(reg,
+ new FakeDnsResolver());
+ connManager.setMaxTotal(400);
+ connManager.setDefaultMaxPerRoute(500);
+ httpClient = HttpClients.custom()
+ .setConnectionManager(connManager)
+ .build();
+
+ this.socksProxyConfig = new SocksProxyConfig(config.getProxyHost(), config.getProxyUser(), config.getProxyPassword());
+ } catch (Exception e) {
+ log.error("ApacheHttpClientImpl | init | error => ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String execute(HttpRequest httpRequest, HttpCallback httpCallback) throws Exception {
+ CloseableHttpResponse response;
+ HttpRequestBase httpRequestBase = null;
+ if (httpRequest != null) {
+ httpRequestBase = extracted(httpRequest.getUrl(), httpRequest.getMethod(), httpRequest.getHeaderMap(), httpRequest.getBody());
+ if (StringUtils.isNotBlank(httpRequest.getTimeout())) {
+ final RequestConfig requestConfig = RequestConfig.custom().
+ setConnectionRequestTimeout(Integer.parseInt(httpRequest.getTimeout())).
+ setSocketTimeout(Integer.parseInt(httpRequest.getTimeout())).
+ setConnectTimeout(Integer.parseInt(httpRequest.getTimeout())).build();
+ httpRequestBase.setConfig(requestConfig);
+ }
+ }
+ HttpRequestCallable httpRequestCallable = new HttpRequestCallable(httpClient, httpRequestBase,
+ HttpClientContext.create(), this.socksProxyConfig, httpCallback, MDC.get(LOG_SIFT_TAG));
+ Future submit = executorServicePool.submit(httpRequestCallable);
+ String result = submit.get();
+ log.info("ApacheHttpClientImpl | execute| success | result : {}", result);
+ return result;
+ }
+
+ private HttpRequestBase extracted(String url, String method, Map headerMap, String body) throws UnsupportedEncodingException {
+ switch (method) {
+ case "GET":
+ HttpGet httpGet = new HttpGet(url);
+ headerMap.forEach(httpGet::addHeader);
+ httpGet.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString());
+ return httpGet;
+ case "POST":
+ HttpPost httpPost = new HttpPost(url);
+ headerMap.forEach(httpPost::addHeader);
+ httpPost.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString());
+ if (StringUtils.isNotBlank(body)) {
+ HttpEntity entityPot = new StringEntity(body);
+ httpPost.setEntity(entityPot);
+ }
+ return httpPost;
+ case "DELETE":
+ HttpDelete httpDelete = new HttpDelete(url);
+ headerMap.forEach(httpDelete::addHeader);
+ httpDelete.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString());
+ return httpDelete;
+ case "PUT":
+ HttpPut httpPut = new HttpPut(url);
+ headerMap.forEach(httpPut::addHeader);
+ httpPut.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString());
+ if (StringUtils.isNotBlank(body)) {
+ HttpEntity entityPot = new StringEntity(body);
+ httpPut.setEntity(entityPot);
+ }
+ return httpPut;
+ case "HEAD":
+ HttpHead httpHead = new HttpHead(url);
+ headerMap.forEach(httpHead::addHeader);
+ httpHead.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString());
+ return httpHead;
+ case "TRACE":
+ HttpTrace httpTrace = new HttpTrace(url);
+ headerMap.forEach(httpTrace::addHeader);
+ httpTrace.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString());
+ break;
+ case "PATCH":
+ HttpPatch httpPatch = new HttpPatch(url);
+ headerMap.forEach(httpPatch::addHeader);
+ httpPatch.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString());
+ if (StringUtils.isNotBlank(body)) {
+ HttpEntity entityPot = new StringEntity(body);
+ httpPatch.setEntity(entityPot);
+ }
+ return httpPatch;
+ default:
+ }
+ HttpOptions httpOptions = new HttpOptions(url);
+ headerMap.forEach(httpOptions::addHeader);
+ httpOptions.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString());
+ return httpOptions;
+ }
+
+ @Override
+ public void close() {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ log.error("ApacheHttpClientImpl | close | error => ", e);
+ }
+ }
+
+ /**
+ * 实现 http 链接的socket 工厂
+ */
+ static class SocksPlainConnectionSocketFactory extends PlainConnectionSocketFactory {
+
+ @Override
+ public Socket createSocket(final HttpContext context) throws IOException {
+ InetSocketAddress socksaddr = (InetSocketAddress)context.getAttribute(SOCKS_ADDRESS_KEY);
+ if (socksaddr != null) {
+ Proxy proxy = new Proxy(Proxy.Type.SOCKS, socksaddr);
+ return new Socket(proxy);
+ } else {
+ return new Socket();
+ }
+ }
+
+ @Override
+ public Socket connectSocket(int connectTimeout, Socket socket, HttpHost host, InetSocketAddress remoteAddress,
+ InetSocketAddress localAddress, HttpContext context) throws IOException {
+ InetSocketAddress socksaddr = (InetSocketAddress)context.getAttribute(SOCKS_ADDRESS_KEY);
+ if (socksaddr != null) {//make proxy server to resolve host in http url
+ remoteAddress = InetSocketAddress.createUnresolved(host.getHostName(), host.getPort());
+ }
+ return super.connectSocket(connectTimeout, socket, host, remoteAddress, localAddress, context);
+ }
+ }
+
+ static class FakeDnsResolver implements DnsResolver {
+ @Override
+ public InetAddress[] resolve(String host) throws UnknownHostException {
+ // Return some fake DNS record for every request, we won't be using it
+ try {
+ return new InetAddress[] {InetAddress.getByName(host)};
+ } catch (Throwable e) {
+ return new InetAddress[] {InetAddress.getByAddress(new byte[] {0, 0, 0, 0})};
+ }
+ }
+ }
+
+ /**
+ * 实现 https 链接的socket 工厂
+ */
+ static class SocksSSLConnectionSocketFactory extends SSLConnectionSocketFactory {
+ public SocksSSLConnectionSocketFactory(SSLContext sslContext) {
+ super(sslContext, NoopHostnameVerifier.INSTANCE);
+ }
+
+ @Override
+ public Socket createSocket(final HttpContext context) throws IOException {
+ InetSocketAddress socksaddr = (InetSocketAddress)context.getAttribute(SOCKS_ADDRESS_KEY);
+ if (socksaddr != null) {
+ Proxy proxy = new Proxy(Proxy.Type.SOCKS, socksaddr);
+ return new Socket(proxy);
+ } else {
+ return new Socket();
+ }
+ }
+
+ @Override
+ public Socket connectSocket(int connectTimeout, Socket socket, HttpHost host, InetSocketAddress remoteAddress,
+ InetSocketAddress localAddress, HttpContext context) throws IOException {
+ InetSocketAddress socksaddr = (InetSocketAddress)context.getAttribute(SOCKS_ADDRESS_KEY);
+ if (socksaddr != null) {
+ remoteAddress = InetSocketAddress.createUnresolved(host.getHostName(), host.getPort());
+ }
+ return super.connectSocket(connectTimeout, socket, host, remoteAddress, localAddress, context);
+ }
+ }
+
+ static class NoopHostnameVerifier implements javax.net.ssl.HostnameVerifier {
+ public static final NoopHostnameVerifier INSTANCE = new NoopHostnameVerifier();
+
+ @Override
+ public boolean verify(String s, SSLSession sslSession) {
+ return true;
+ }
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApiKeyImpl.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApiKeyImpl.java
new file mode 100644
index 00000000..9ead7bdf
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApiKeyImpl.java
@@ -0,0 +1,26 @@
+package org.apache.rocketmq.connect.http.sink.auth;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.http.sink.entity.ClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class ApiKeyImpl implements Auth {
+ private static final Logger log = LoggerFactory.getLogger(ApiKeyImpl.class);
+
+ @Override
+ public Map auth(ClientConfig config) {
+ Map headMap = Maps.newHashMap();
+ try {
+ if (StringUtils.isNotBlank(config.getApiKeyName()) && StringUtils.isNotBlank(config.getApiKeyValue())) {
+ headMap.put(config.getApiKeyName(), config.getApiKeyValue());
+ }
+ } catch (Exception e) {
+ log.error("ApiKeyImpl | auth | error => ", e);
+ }
+ return headMap;
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/Auth.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/Auth.java
new file mode 100644
index 00000000..54b4970d
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/Auth.java
@@ -0,0 +1,16 @@
+package org.apache.rocketmq.connect.http.sink.auth;
+
+
+import org.apache.rocketmq.connect.http.sink.entity.ClientConfig;
+
+import java.util.Map;
+
+public interface Auth {
+
+ /**
+ * Authentication abstract method
+ * @param config
+ * @return
+ */
+ Map auth(ClientConfig config);
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/BasicAuthImpl.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/BasicAuthImpl.java
new file mode 100644
index 00000000..4cdbfea4
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/BasicAuthImpl.java
@@ -0,0 +1,31 @@
+package org.apache.rocketmq.connect.http.sink.auth;
+
+import com.google.common.collect.Maps;
+import com.sun.org.apache.xerces.internal.impl.dv.util.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.http.sink.constant.HttpConstant;
+import org.apache.rocketmq.connect.http.sink.entity.ClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public class BasicAuthImpl implements Auth {
+ private static final Logger log = LoggerFactory.getLogger(BasicAuthImpl.class);
+
+
+ @Override
+ public Map auth(ClientConfig config) {
+ Map headMap = Maps.newHashMap();
+ try {
+ if (StringUtils.isNotBlank(config.getBasicUser()) && StringUtils.isNotBlank(config.getBasicPassword())) {
+ String authorizationValue = config.getBasicUser() + ":" + config.getBasicPassword();
+ headMap.put(HttpConstant.AUTHORIZATION, "Basic " + Base64.encode(authorizationValue.getBytes(StandardCharsets.UTF_8)));
+ }
+ } catch (Exception e) {
+ log.error("BasicAuthImpl | auth | error => ", e);
+ }
+ return headMap;
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpCallback.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpCallback.java
new file mode 100644
index 00000000..800d63f5
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpCallback.java
@@ -0,0 +1,57 @@
+package org.apache.rocketmq.connect.http.sink.auth;
+
+import org.apache.http.concurrent.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+
+public class HttpCallback implements FutureCallback {
+
+ private static final Logger log = LoggerFactory.getLogger(HttpCallback.class);
+
+ private CountDownLatch countDownLatch;
+
+ private boolean isFailed;
+
+ private String msg;
+
+ public HttpCallback(CountDownLatch countDownLatch) {
+ this.countDownLatch = countDownLatch;
+ }
+
+ @Override
+ public void completed(String s) {
+ countDownLatch.countDown();
+ }
+
+ public void failed(final Exception ex) {
+ countDownLatch.countDown();
+ isFailed = true;
+ log.error("http request failed.", ex);
+ }
+
+ public void cancelled() {
+ countDownLatch.countDown();
+ }
+
+ public CountDownLatch getCountDownLatch() {
+ return countDownLatch;
+ }
+
+ public boolean isFailed() {
+ return isFailed;
+ }
+
+ public void setFailed(boolean failed) {
+ isFailed = failed;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpRequestCallable.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpRequestCallable.java
new file mode 100644
index 00000000..87ec8f40
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpRequestCallable.java
@@ -0,0 +1,92 @@
+package org.apache.rocketmq.connect.http.sink.auth;
+
+import com.google.common.base.Strings;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.text.MessageFormat;
+import java.util.concurrent.Callable;
+
+import static org.apache.rocketmq.connect.http.sink.constant.HttpConstant.LOG_SIFT_TAG;
+
+
+public class HttpRequestCallable implements Callable {
+
+ private static final Logger log = LoggerFactory.getLogger(HttpCallback.class);
+
+ private CloseableHttpClient httpclient;
+
+ private HttpCallback httpCallback;
+
+ private HttpUriRequest httpUriRequest;
+
+ private HttpClientContext context;
+ private SocksProxyConfig socksProxyConfig;
+ private String siftTag;
+
+ private static final String SOCKS_ADDRESS_KEY = "socks.address";
+
+ public HttpRequestCallable(CloseableHttpClient httpclient, HttpUriRequest httpUriRequest, HttpClientContext context,
+ SocksProxyConfig socksProxyConfig, HttpCallback httpCallback, String siftTag) {
+ this.httpclient = httpclient;
+ this.httpUriRequest = httpUriRequest;
+ this.context = context;
+ this.socksProxyConfig = socksProxyConfig;
+ this.httpCallback = httpCallback;
+ this.siftTag = siftTag;
+ }
+
+ public void loadSocks5ProxyConfig() {
+ if (!Strings.isNullOrEmpty(socksProxyConfig.getSocks5Endpoint())) {
+ String[] socksAddrAndPor = socksProxyConfig.getSocks5Endpoint()
+ .split(":");
+ InetSocketAddress socksaddr = new InetSocketAddress(socksAddrAndPor[0],
+ Integer.parseInt(socksAddrAndPor[1]));
+ context.setAttribute(SOCKS_ADDRESS_KEY, socksaddr);
+ ThreadLocalProxyAuthenticator.getInstance()
+ .setCredentials(socksProxyConfig.getSocks5UserName(), socksProxyConfig.getSocks5Password());
+ }
+ }
+
+ @Override
+ public String call() throws Exception {
+ MDC.put(LOG_SIFT_TAG, siftTag);
+ CloseableHttpResponse response = null;
+ try {
+ Long startTime = System.currentTimeMillis();
+ loadSocks5ProxyConfig();
+ response = httpclient.execute(httpUriRequest, context);
+ String result = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
+ if (response.getStatusLine()
+ .getStatusCode() / 100 != 2) {
+ String msg = MessageFormat.format("Http Status:{0},Msg:{1}", response.getStatusLine()
+ .getStatusCode(), result);
+ httpCallback.setMsg(msg);
+ httpCallback.setFailed(Boolean.TRUE);
+ }
+ log.info("The cost of one http request:{}, Connection Connection={},Keep-Alive={}",
+ System.currentTimeMillis() - startTime, response.getHeaders("Connection"),
+ response.getHeaders("Keep-Alive"));
+ return result;
+ } catch (Throwable e) {
+ log.error("http execute failed.", e);
+ httpCallback.setFailed(Boolean.TRUE);
+ httpCallback.setMsg(e.getLocalizedMessage());
+ } finally {
+ httpCallback.getCountDownLatch()
+ .countDown();
+ if (null != response.getEntity()) {
+ EntityUtils.consume(response.getEntity());
+ }
+ }
+ return null;
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/OAuthClientImpl.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/OAuthClientImpl.java
new file mode 100644
index 00000000..fe036310
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/OAuthClientImpl.java
@@ -0,0 +1,148 @@
+package org.apache.rocketmq.connect.http.sink.auth;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.http.sink.constant.HttpConstant;
+import org.apache.rocketmq.connect.http.sink.entity.ClientConfig;
+import org.apache.rocketmq.connect.http.sink.entity.HttpRequest;
+import org.apache.rocketmq.connect.http.sink.entity.OAuthEntity;
+import org.apache.rocketmq.connect.http.sink.entity.TokenEntity;
+import org.apache.rocketmq.connect.http.sink.util.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class OAuthClientImpl implements Auth, Runnable {
+ private static final Logger log = LoggerFactory.getLogger(OAuthClientImpl.class);
+ public Map oauthMap;
+
+ public void setOauthMap(Map oauthMap) {
+ this.oauthMap = oauthMap;
+ }
+
+ @Override
+ public Map auth(ClientConfig config) {
+ Map headMap = Maps.newHashMap();
+ try {
+ String resultToken = "";
+ if (StringUtils.isNotBlank(config.getOauth2Endpoint())
+ && StringUtils.isNotBlank(config.getOauth2HttpMethod())) {
+ OAuthEntity oAuthEntity = new OAuthEntity();
+ oAuthEntity.setOauth2ClientId(config.getOauth2ClientId());
+ oAuthEntity.setOauth2ClientSecret(config.getOauth2ClientSecret());
+ oAuthEntity.setOauth2Endpoint(config.getOauth2Endpoint());
+ oAuthEntity.setOauth2HttpMethod(config.getOauth2HttpMethod());
+ oAuthEntity.setHeaderParamsters(config.getHeaderParameters());
+ oAuthEntity.setQueryStringParameters(config.getQueryStringParameters());
+ oAuthEntity.setTimeout(config.getTimeout());
+ oAuthEntity.setHttpClient(config.getHttpClient());
+ queryParameterClient(config);
+ final TokenEntity tokenEntity = oauthMap.get(oAuthEntity);
+ if (tokenEntity != null) {
+ headMap.put(HttpConstant.AUTHORIZATION, "Bearer " + tokenEntity.getAccessToken());
+ return headMap;
+ }
+ HttpRequest httpRequest = new HttpRequest();
+ resultToken = getResultToken(oAuthEntity, headMap, oAuthEntity.getHttpClient(), httpRequest);
+ if (StringUtils.isNotBlank(resultToken)) {
+ final TokenEntity token = JSONObject.parseObject(resultToken, TokenEntity.class);
+ if (StringUtils.isNotBlank(token.getAccessToken())) {
+ headMap.put(HttpConstant.AUTHORIZATION, "Bearer " + token.getAccessToken());
+ token.setTokenTimestamp(Long.toString(System.currentTimeMillis()));
+ oauthMap.putIfAbsent(oAuthEntity, token);
+ } else {
+ throw new RuntimeException(token.getError());
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("OAuthClientImpl | auth | error => ", e);
+ throw new RuntimeException(e);
+ }
+ return headMap;
+ }
+
+ private void queryParameterClient(ClientConfig config) {
+ Map map = Maps.newHashMap();
+ map.put("client_id", config.getOauth2ClientId());
+ map.put("client_secret", config.getOauth2ClientSecret());
+ JSONObject queryString = JSONObject.parseObject(config.getQueryStringParameters());
+ JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(map));
+ config.setQueryStringParameters(JsonUtils.mergeJson(jsonObject, queryString).toJSONString());
+ }
+
+ public String getResultToken(OAuthEntity config, Map headMap, AbstractHttpClient httpClient, HttpRequest httpRequest) throws Exception {
+ if (HttpConstant.POST_METHOD.equals(config.getOauth2HttpMethod())
+ || HttpConstant.PUT_METHOD.equals(config.getOauth2HttpMethod())
+ || HttpConstant.PATCH_METHOD.equals(config.getOauth2HttpMethod())) {
+ String headerParamsters = config.getHeaderParamsters();
+ JSONObject jsonObject = JSONObject.parseObject(headerParamsters);
+ for (Map.Entry entry : jsonObject.entrySet()) {
+ headMap.put(entry.getKey(), (String) entry.getValue());
+ }
+ httpRequest.setBody(StringUtils.EMPTY);
+ httpRequest.setUrl(JsonUtils.queryStringAndPathValue(config.getOauth2Endpoint(), config.getQueryStringParameters(), null));
+ httpRequest.setMethod(config.getOauth2HttpMethod());
+ httpRequest.setTimeout(config.getTimeout());
+ httpRequest.setHeaderMap(headMap);
+ } else {
+ httpRequest.setUrl(JsonUtils.queryStringAndPathValue(config.getOauth2Endpoint(), config.getQueryStringParameters(), null));
+ httpRequest.setTimeout(config.getTimeout());
+ httpRequest.setMethod(config.getOauth2HttpMethod());
+ httpRequest.setHeaderMap(headMap);
+ httpRequest.setBody(StringUtils.EMPTY);
+ }
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ HttpCallback httpCallback = new HttpCallback(countDownLatch);
+ return httpClient.execute(httpRequest, httpCallback);
+ }
+
+ @Override
+ public void run() {
+ log.info("OAuthTokenRunnable | run");
+ oauthMap.forEach((oAuthEntity1, tokenEntity) -> {
+ String resultToken = "";
+ long tokenTimestamp = Long.parseLong(tokenEntity.getTokenTimestamp()) + (tokenEntity.getExpiresIn() * 1000L);
+ log.info("OAuthTokenRunnable | run | tokenTimestamp : {} | system.currentTimeMillis : {} | boolean : {}", tokenTimestamp, System.currentTimeMillis(), System.currentTimeMillis() > tokenTimestamp);
+ if (System.currentTimeMillis() > tokenTimestamp) {
+ log.info("OAuthTokenRunnable | run | update token");
+ HttpRequest httpRequest = new HttpRequest();
+ try {
+ resultToken = this.getResultToken(oAuthEntity1, new HashMap<>(16), oAuthEntity1.getHttpClient(), httpRequest);
+ } catch (Exception e) {
+ log.error("OAuthTokenRunnable | update token | scheduledExecutorService | error => ", e);
+ throw new RuntimeException(e);
+ }
+ if (StringUtils.isNotBlank(resultToken)) {
+ final TokenEntity token = JSONObject.parseObject(resultToken, TokenEntity.class);
+ if (StringUtils.isNotBlank(token.getAccessToken())) {
+ oauthMap.putIfAbsent(oAuthEntity1, updateTokenEntity(tokenEntity, token));
+ } else {
+ throw new RuntimeException(token.getError());
+ }
+ }
+ }
+ });
+ }
+
+ private TokenEntity updateTokenEntity(TokenEntity oldTokenEntity, TokenEntity newTokenEntity) {
+ if (newTokenEntity != null) {
+ if (StringUtils.isNotBlank(newTokenEntity.getAccessToken())) {
+ oldTokenEntity.setAccessToken(newTokenEntity.getAccessToken());
+ }
+ oldTokenEntity.setExpiresIn(newTokenEntity.getExpiresIn());
+ if (StringUtils.isNotBlank(newTokenEntity.getScope())) {
+ oldTokenEntity.setScope(newTokenEntity.getScope());
+ }
+ if (StringUtils.isNotBlank(newTokenEntity.getTokenType())) {
+ oldTokenEntity.setTokenType(newTokenEntity.getTokenType());
+ }
+ }
+ oldTokenEntity.setTokenTimestamp(Long.toString(System.currentTimeMillis()));
+ return oldTokenEntity;
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/SocksProxyConfig.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/SocksProxyConfig.java
new file mode 100644
index 00000000..ecd59dcc
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/SocksProxyConfig.java
@@ -0,0 +1,38 @@
+package org.apache.rocketmq.connect.http.sink.auth;
+
+public class SocksProxyConfig {
+
+ private String socks5Endpoint;
+ private String socks5UserName;
+ private String socks5Password;
+
+ public SocksProxyConfig(String socks5Endpoint, String socks5UserName, String socks5Password) {
+ this.socks5Endpoint = socks5Endpoint;
+ this.socks5UserName = socks5UserName;
+ this.socks5Password = socks5Password;
+ }
+
+ public String getSocks5Endpoint() {
+ return socks5Endpoint;
+ }
+
+ public void setSocks5Endpoint(String socks5Endpoint) {
+ this.socks5Endpoint = socks5Endpoint;
+ }
+
+ public String getSocks5UserName() {
+ return socks5UserName;
+ }
+
+ public void setSocks5UserName(String socks5UserName) {
+ this.socks5UserName = socks5UserName;
+ }
+
+ public String getSocks5Password() {
+ return socks5Password;
+ }
+
+ public void setSocks5Password(String socks5Password) {
+ this.socks5Password = socks5Password;
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ThreadLocalProxyAuthenticator.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ThreadLocalProxyAuthenticator.java
new file mode 100644
index 00000000..3faca6a8
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ThreadLocalProxyAuthenticator.java
@@ -0,0 +1,30 @@
+package org.apache.rocketmq.connect.http.sink.auth;
+
+import java.net.Authenticator;
+import java.net.PasswordAuthentication;
+
+public class ThreadLocalProxyAuthenticator extends Authenticator{
+
+ private ThreadLocal credential = new ThreadLocal();
+
+ private static class SingletonHolder {
+ private static final ThreadLocalProxyAuthenticator instance = new ThreadLocalProxyAuthenticator();
+ }
+
+ public static final ThreadLocalProxyAuthenticator getInstance() {
+ return SingletonHolder.instance;
+ }
+
+ public void setCredentials(String user, String password) {
+ credential.set(new PasswordAuthentication(user, password.toCharArray()));
+ Authenticator.setDefault(this);
+ }
+
+
+
+ @Override
+ public PasswordAuthentication getPasswordAuthentication() {
+ return credential.get();
+ }
+
+}
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/common/OkHttpUtils.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/common/OkHttpUtils.java
deleted file mode 100644
index 2c3fe632..00000000
--- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/common/OkHttpUtils.java
+++ /dev/null
@@ -1,274 +0,0 @@
-package org.apache.rocketmq.connect.http.sink.common;
-
-import com.alibaba.fastjson.JSON;
-import okhttp3.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.X509TrustManager;
-import java.io.IOException;
-import java.net.URLEncoder;
-import java.security.SecureRandom;
-import java.security.cert.X509Certificate;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-public class OkHttpUtils {
- private static final Logger log = LoggerFactory.getLogger(OkHttpUtils.class);
-
- private static volatile OkHttpClient okHttpClient = null;
- private static volatile Semaphore semaphore = null;
- private Map headerMap;
- private Map paramMap;
- private String url;
- private Request.Builder request;
-
- private OkHttpUtils() {
- if (okHttpClient == null) {
- synchronized (OkHttpUtils.class) {
- if (okHttpClient == null) {
- TrustManager[] trustManagers = buildTrustManagers();
- okHttpClient = new OkHttpClient.Builder()
- .connectTimeout(15, TimeUnit.SECONDS)
- .writeTimeout(20, TimeUnit.SECONDS)
- .readTimeout(20, TimeUnit.SECONDS)
- .sslSocketFactory(createSSLSocketFactory(trustManagers), (X509TrustManager) trustManagers[0])
- .hostnameVerifier((hostName, session) -> true)
- .retryOnConnectionFailure(true)
- .build();
- addHeader("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
- }
- }
- }
- }
-
- private static Semaphore getSemaphoreInstance() {
- synchronized (OkHttpUtils.class) {
- if (semaphore == null) {
- semaphore = new Semaphore(0);
- }
- }
- return semaphore;
- }
-
- public static OkHttpUtils builder() {
- return new OkHttpUtils();
- }
-
- public OkHttpUtils url(String url) {
- this.url = url;
- return this;
- }
-
- /**
- * 添加参数
- *
- * @param key 参数名
- * @param value 参数值
- * @return
- */
- public OkHttpUtils addParam(String key, String value) {
- if (paramMap == null) {
- paramMap = new LinkedHashMap<>(16);
- }
- paramMap.put(key, value);
- return this;
- }
-
- /**
- * 添加请求头
- *
- * @param key 参数名
- * @param value 参数值
- * @return
- */
- public OkHttpUtils addHeader(String key, String value) {
- if (headerMap == null) {
- headerMap = new LinkedHashMap<>(16);
- }
- headerMap.put(key, value);
- return this;
- }
-
- public OkHttpUtils get() {
- request = new Request.Builder().get();
- StringBuilder urlBuilder = new StringBuilder(url);
- if (paramMap != null) {
- urlBuilder.append("?");
- try {
- for (Map.Entry entry : paramMap.entrySet()) {
- urlBuilder.append(URLEncoder.encode(entry.getKey(), "utf-8")).
- append("=").
- append(URLEncoder.encode(entry.getValue(), "utf-8")).
- append("&");
- }
- } catch (Exception e) {
- log.error("OkHttpUtils | get | error => ", e);
- }
- urlBuilder.deleteCharAt(urlBuilder.length() - 1);
- }
- request.url(urlBuilder.toString());
- return this;
- }
-
- /**
- * 初始化post方法
- *
- * @param isJsonPost true等于json的方式提交数据,类似postman里post方法的raw
- * false等于普通的表单提交
- * @return
- */
- public OkHttpUtils post(boolean isJsonPost) {
- RequestBody requestBody;
- if (isJsonPost) {
- String json = "";
- if (paramMap != null) {
- json = JSON.toJSONString(paramMap);
- }
- requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), json);
- } else {
- FormBody.Builder formBody = new FormBody.Builder();
- if (paramMap != null) {
- paramMap.forEach(formBody::add);
- }
- requestBody = formBody.build();
- }
- request = new Request.Builder().post(requestBody).url(url);
- return this;
- }
-
- /**
- * 同步请求
- *
- * @return
- */
- public String sync() {
- setHeader(request);
- try {
- Response response = okHttpClient.newCall(request.build()).execute();
- assert response.body() != null;
- return response.body().string();
- } catch (IOException e) {
- log.error("OkHttpUtils | sync | error => ", e);
- return "请求失败:" + e.getMessage();
- }
- }
-
- /**
- * 异步请求,有返回值
- */
- public String async() {
- StringBuilder buffer = new StringBuilder("");
- setHeader(request);
- okHttpClient.newCall(request.build()).enqueue(new Callback() {
- @Override
- public void onFailure(Call call, IOException e) {
- buffer.append("请求出错:").append(e.getMessage());
- }
-
- @Override
- public void onResponse(Call call, Response response) throws IOException {
- assert response.body() != null;
- buffer.append(response.body().string());
- getSemaphoreInstance().release();
- }
- });
- try {
- getSemaphoreInstance().acquire();
- } catch (InterruptedException e) {
- log.error("OkHttpUtils | async | error => ", e);
- }
- return buffer.toString();
- }
-
- /**
- * 异步请求,带有接口回调
- *
- * @param callBack
- */
- public void async(ICallBack callBack) {
- setHeader(request);
- okHttpClient.newCall(request.build()).enqueue(new Callback() {
- @Override
- public void onFailure(Call call, IOException e) {
- callBack.onFailure(call, e.getMessage());
- }
-
- @Override
- public void onResponse(Call call, Response response) throws IOException {
- assert response.body() != null;
- callBack.onSuccessful(call, response.body().string());
- }
- });
- }
-
- /**
- * 为request添加请求头
- *
- * @param request
- */
- private void setHeader(Request.Builder request) {
- if (headerMap != null) {
- try {
- for (Map.Entry entry : headerMap.entrySet()) {
- request.addHeader(entry.getKey(), entry.getValue());
- }
- } catch (Exception e) {
- log.error("OkHttpUtils | setHeader | error => ", e);
- }
- }
- }
-
-
- /**
- * 生成安全套接字工厂,用于https请求的证书跳过
- *
- * @return
- */
- private static SSLSocketFactory createSSLSocketFactory(TrustManager[] trustAllCerts) {
- SSLSocketFactory ssfFactory = null;
- try {
- SSLContext sc = SSLContext.getInstance("SSL");
- sc.init(null, trustAllCerts, new SecureRandom());
- ssfFactory = sc.getSocketFactory();
- } catch (Exception e) {
- log.error("OkHttpUtils | createSSLSocketFactory | error => ", e);
- }
- return ssfFactory;
- }
-
- private static TrustManager[] buildTrustManagers() {
- return new TrustManager[]{
- new X509TrustManager() {
- @Override
- public void checkClientTrusted(X509Certificate[] chain, String authType) {
- }
-
- @Override
- public void checkServerTrusted(X509Certificate[] chain, String authType) {
- }
-
- @Override
- public X509Certificate[] getAcceptedIssuers() {
- return new X509Certificate[]{};
- }
- }
- };
- }
-
- /**
- * 自定义一个接口回调
- */
- public interface ICallBack {
-
- void onSuccessful(Call call, String data);
-
- void onFailure(Call call, String errorMsg);
-
- }
-}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/AuthTypeEnum.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/AuthTypeEnum.java
new file mode 100644
index 00000000..91d820ee
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/AuthTypeEnum.java
@@ -0,0 +1,27 @@
+package org.apache.rocketmq.connect.http.sink.constant;
+
+
+public enum AuthTypeEnum {
+
+ /**
+ * BASIC
+ */
+ BASIC("BASIC_AUTH"),
+ /**
+ * OAUTH_CLIENT_CREDENTIALS
+ */
+ OAUTH_CLIENT_CREDENTIALS("OAUTH_AUTH"),
+ /**
+ * API_KEY
+ */
+ API_KEY("API_KEY_AUTH");
+ private final String authType;
+
+ AuthTypeEnum(String authType) {
+ this.authType = authType;
+ }
+
+ public String getAuthType() {
+ return authType;
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/HttpConstant.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/HttpConstant.java
index 09532b04..85ff2d00 100644
--- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/HttpConstant.java
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/HttpConstant.java
@@ -2,7 +2,34 @@
public class HttpConstant {
- public static final String URL_CONSTANT = "url";
+ public static final String URL_PATTERN_CONSTANT = "urlPattern";
+ public static final String METHOD_CONSTANT = "method";
+ public static final String QUERY_STRING_PARAMETERS_CONSTANT = "queryStringParameters";
+ public static final String HEADER_PARAMETERS_CONSTANT = "headerParameters";
+ public static final String BODYS_CONSTANT = "bodys";
+ public static final String AUTH_TYPE_CONSTANT = "authType";
+ public static final String BASIC_USER_CONSTANT = "basicUser";
+ public static final String BASIC_PASSWORD_CONSTANT = "basicPassword";
+ public static final String OAUTH2_ENDPOINT_CONSTANT = "oauth2Endpoint";
+ public static final String OAUTH2_CLIENTID_CONSTANT = "oauth2ClientId";
+ public static final String OAUTH2_CLIENTSECRET_CONSTANT = "oauth2ClientSecret";
+ public static final String OAUTH2_HTTP_METHOD_CONSTANT = "oauth2HttpMethod";
+ public static final String PROXY_TYPE_CONSTANT = "proxyType";
+ public static final String PROXY_HOST_CONSTANT = "proxyHost";
+ public static final String PROXY_PORT_CONSTANT = "proxyPort";
+ public static final String PROXY_USER_CONSTANT = "proxyUser";
+ public static final String PROXY_PASSWORD_CONSTANT = "proxyPassword";
+ public static final String TIMEOUT_CONSTANT = "timeout";
+ public static final String HTTP_PATH_VALUE = "httpPathValue";
+ public static final String HTTP_QUERY_VALUE = "httpQuery";
+ public static final String HTTP_METHOD = "httpMethod";
+ public static final String HTTP_HEADER = "httpHeader";
+ public static final String POST_METHOD = "POST";
+ public static final String PUT_METHOD = "PUT";
+ public static final String PATCH_METHOD = "PATCH";
+ public static final String AUTHORIZATION = "Authorization";
+ public static final String API_KEY_NAME = "apiKeyName";
+ public static final String API_KEY_VALUE = "apiKeyValue";
+ public static final String LOG_SIFT_TAG = "SIFT-TAG";
- public static final String DATA_CONSTANT = "data";
}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/ClientConfig.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/ClientConfig.java
new file mode 100644
index 00000000..74cb5e72
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/ClientConfig.java
@@ -0,0 +1,206 @@
+package org.apache.rocketmq.connect.http.sink.entity;
+
+
+import org.apache.rocketmq.connect.http.sink.auth.AbstractHttpClient;
+
+public class ClientConfig {
+
+ private String urlPattern;
+ private String method;
+ private String queryStringParameters;
+ private String headerParameters;
+ private String bodys;
+ private String httpPathValue;
+ private String authType;
+ private String basicUser;
+ private String basicPassword;
+ private String apiKeyName;
+ private String apiKeyValue;
+ private String oauth2Endpoint;
+ private String oauth2ClientId;
+ private String oauth2ClientSecret;
+ private String oauth2HttpMethod;
+ private String proxyType;
+ private String proxyHost;
+ private String proxyPort;
+ private String proxyUser;
+ private String proxyPassword;
+ private String timeout;
+ private AbstractHttpClient httpClient;
+
+ public String getProxyType() {
+ return proxyType;
+ }
+
+ public void setProxyType(String proxyType) {
+ this.proxyType = proxyType;
+ }
+
+ public String getProxyHost() {
+ return proxyHost;
+ }
+
+ public void setProxyHost(String proxyHost) {
+ this.proxyHost = proxyHost;
+ }
+
+ public String getProxyPort() {
+ return proxyPort;
+ }
+
+ public void setProxyPort(String proxyPort) {
+ this.proxyPort = proxyPort;
+ }
+
+ public String getProxyUser() {
+ return proxyUser;
+ }
+
+ public void setProxyUser(String proxyUser) {
+ this.proxyUser = proxyUser;
+ }
+
+ public String getProxyPassword() {
+ return proxyPassword;
+ }
+
+ public void setProxyPassword(String proxyPassword) {
+ this.proxyPassword = proxyPassword;
+ }
+
+ public String getUrlPattern() {
+ return urlPattern;
+ }
+
+ public void setUrlPattern(String urlPattern) {
+ this.urlPattern = urlPattern;
+ }
+
+ public String getMethod() {
+ return method;
+ }
+
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public String getQueryStringParameters() {
+ return queryStringParameters;
+ }
+
+ public void setQueryStringParameters(String queryStringParameters) {
+ this.queryStringParameters = queryStringParameters;
+ }
+
+ public String getHeaderParameters() {
+ return headerParameters;
+ }
+
+ public void setHeaderParameters(String headerParameters) {
+ this.headerParameters = headerParameters;
+ }
+
+ public String getBodys() {
+ return bodys;
+ }
+
+ public void setBodys(String bodys) {
+ this.bodys = bodys;
+ }
+
+ public String getAuthType() {
+ return authType;
+ }
+
+ public void setAuthType(String authType) {
+ this.authType = authType;
+ }
+
+ public String getBasicUser() {
+ return basicUser;
+ }
+
+ public void setBasicUser(String basicUser) {
+ this.basicUser = basicUser;
+ }
+
+ public String getBasicPassword() {
+ return basicPassword;
+ }
+
+ public void setBasicPassword(String basicPassword) {
+ this.basicPassword = basicPassword;
+ }
+
+ public String getOauth2Endpoint() {
+ return oauth2Endpoint;
+ }
+
+ public void setOauth2Endpoint(String oauth2Endpoint) {
+ this.oauth2Endpoint = oauth2Endpoint;
+ }
+
+ public String getOauth2ClientId() {
+ return oauth2ClientId;
+ }
+
+ public void setOauth2ClientId(String oauth2ClientId) {
+ this.oauth2ClientId = oauth2ClientId;
+ }
+
+ public String getOauth2ClientSecret() {
+ return oauth2ClientSecret;
+ }
+
+ public void setOauth2ClientSecret(String oauth2ClientSecret) {
+ this.oauth2ClientSecret = oauth2ClientSecret;
+ }
+
+ public String getOauth2HttpMethod() {
+ return oauth2HttpMethod;
+ }
+
+ public String getHttpPathValue() {
+ return httpPathValue;
+ }
+
+ public void setHttpPathValue(String httpPathValue) {
+ this.httpPathValue = httpPathValue;
+ }
+
+ public void setOauth2HttpMethod(String oauth2HttpMethod) {
+ this.oauth2HttpMethod = oauth2HttpMethod;
+ }
+
+ public String getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(String timeout) {
+ this.timeout = timeout;
+ }
+
+ public String getApiKeyName() {
+ return apiKeyName;
+ }
+
+ public void setApiKeyName(String apiKeyName) {
+ this.apiKeyName = apiKeyName;
+ }
+
+ public String getApiKeyValue() {
+ return apiKeyValue;
+ }
+
+ public void setApiKeyValue(String apiKeyValue) {
+ this.apiKeyValue = apiKeyValue;
+ }
+
+ public AbstractHttpClient getHttpClient() {
+ return httpClient;
+ }
+
+ public void setHttpClient(AbstractHttpClient httpClient) {
+ this.httpClient = httpClient;
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/HttpRequest.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/HttpRequest.java
new file mode 100644
index 00000000..d9f5aa9e
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/HttpRequest.java
@@ -0,0 +1,52 @@
+package org.apache.rocketmq.connect.http.sink.entity;
+
+import java.util.Map;
+
+public class HttpRequest {
+
+ private String url;
+ private String method;
+ private Map headerMap;
+ private String body;
+ private String timeout;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getMethod() {
+ return method;
+ }
+
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public Map getHeaderMap() {
+ return headerMap;
+ }
+
+ public void setHeaderMap(Map headerMap) {
+ this.headerMap = headerMap;
+ }
+
+ public String getBody() {
+ return body;
+ }
+
+ public void setBody(String body) {
+ this.body = body;
+ }
+
+ public String getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(String timeout) {
+ this.timeout = timeout;
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/OAuthEntity.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/OAuthEntity.java
new file mode 100644
index 00000000..b08a9a1b
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/OAuthEntity.java
@@ -0,0 +1,111 @@
+package org.apache.rocketmq.connect.http.sink.entity;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.rocketmq.connect.http.sink.auth.AbstractHttpClient;
+
+public class OAuthEntity {
+ private String oauth2Endpoint;
+ private String oauth2ClientId;
+ private String oauth2ClientSecret;
+ private String oauth2HttpMethod;
+ private String queryStringParameters;
+ private String headerParamsters;
+ private String timeout;
+
+ private AbstractHttpClient httpClient;
+
+ public String getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(String timeout) {
+ this.timeout = timeout;
+ }
+
+ public String getOauth2Endpoint() {
+ return oauth2Endpoint;
+ }
+
+ public void setOauth2Endpoint(String oauth2Endpoint) {
+ this.oauth2Endpoint = oauth2Endpoint;
+ }
+
+ public String getOauth2ClientId() {
+ return oauth2ClientId;
+ }
+
+ public void setOauth2ClientId(String oauth2ClientId) {
+ this.oauth2ClientId = oauth2ClientId;
+ }
+
+ public String getOauth2ClientSecret() {
+ return oauth2ClientSecret;
+ }
+
+ public void setOauth2ClientSecret(String oauth2ClientSecret) {
+ this.oauth2ClientSecret = oauth2ClientSecret;
+ }
+
+ public String getOauth2HttpMethod() {
+ return oauth2HttpMethod;
+ }
+
+ public void setOauth2HttpMethod(String oauth2HttpMethod) {
+ this.oauth2HttpMethod = oauth2HttpMethod;
+ }
+
+ public String getQueryStringParameters() {
+ return queryStringParameters;
+ }
+
+ public void setQueryStringParameters(String queryStringParameters) {
+ this.queryStringParameters = queryStringParameters;
+ }
+
+ public String getHeaderParamsters() {
+ return headerParamsters;
+ }
+
+ public void setHeaderParamsters(String headerParamsters) {
+ this.headerParamsters = headerParamsters;
+ }
+
+ public AbstractHttpClient getHttpClient() {
+ return httpClient;
+ }
+
+ public void setHttpClient(AbstractHttpClient httpClient) {
+ this.httpClient = httpClient;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+
+ if (o == null || getClass() != o.getClass()) return false;
+
+ OAuthEntity that = (OAuthEntity) o;
+
+ return new EqualsBuilder().append(oauth2Endpoint, that.oauth2Endpoint).append(oauth2ClientId, that.oauth2ClientId).append(oauth2ClientSecret, that.oauth2ClientSecret).append(oauth2HttpMethod, that.oauth2HttpMethod).append(queryStringParameters, that.queryStringParameters).append(headerParamsters, that.headerParamsters).isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37).append(oauth2Endpoint).append(oauth2ClientId).append(oauth2ClientSecret).append(oauth2HttpMethod).append(queryStringParameters).append(headerParamsters).toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("oauth2Endpoint", oauth2Endpoint)
+ .append("oauth2ClientId", oauth2ClientId)
+ .append("oauth2ClientSecret", oauth2ClientSecret)
+ .append("oauth2HttpMethod", oauth2HttpMethod)
+ .append("queryStringParameters", queryStringParameters)
+ .append("headerParamsters", headerParamsters)
+ .append("timeout", timeout)
+ .toString();
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/TokenEntity.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/TokenEntity.java
new file mode 100644
index 00000000..11d59e9c
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/TokenEntity.java
@@ -0,0 +1,124 @@
+package org.apache.rocketmq.connect.http.sink.entity;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+public class TokenEntity {
+
+ private String accessToken;
+ private String tokenType;
+ private int expiresIn;
+ private String exampleParameter;
+ private String timestamp;
+ private String status;
+ private String error;
+ private String message;
+ private String path;
+ private String tokenTimestamp;
+
+ private String scope;
+
+ public String getAccessToken() {
+ return accessToken;
+ }
+
+ public void setAccessToken(String accessToken) {
+ this.accessToken = accessToken;
+ }
+
+ public String getTokenType() {
+ return tokenType;
+ }
+
+ public void setTokenType(String tokenType) {
+ this.tokenType = tokenType;
+ }
+
+ public int getExpiresIn() {
+ return expiresIn;
+ }
+
+ public void setExpiresIn(int expiresIn) {
+ this.expiresIn = expiresIn;
+ }
+
+ public String getTokenTimestamp() {
+ return tokenTimestamp;
+ }
+
+ public void setTokenTimestamp(String tokenTimestamp) {
+ this.tokenTimestamp = tokenTimestamp;
+ }
+
+ public String getExampleParameter() {
+ return exampleParameter;
+ }
+
+ public void setExampleParameter(String exampleParameter) {
+ this.exampleParameter = exampleParameter;
+ }
+
+ public String getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(String timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getError() {
+ return error;
+ }
+
+ public void setError(String error) {
+ this.error = error;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public String getScope() {
+ return scope;
+ }
+
+ public void setScope(String scope) {
+ this.scope = scope;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("accessToken", accessToken)
+ .append("tokenType", tokenType)
+ .append("expiresIn", expiresIn)
+ .append("exampleParameter", exampleParameter)
+ .append("timestamp", timestamp)
+ .append("status", status)
+ .append("error", error)
+ .append("message", message)
+ .append("path", path)
+ .append("tokenTimestamp", tokenTimestamp)
+ .append("scope", scope)
+ .toString();
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/CheckUtils.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/CheckUtils.java
new file mode 100644
index 00000000..4cbf39fc
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/CheckUtils.java
@@ -0,0 +1,33 @@
+package org.apache.rocketmq.connect.http.sink.util;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class CheckUtils {
+
+ private static final String NULL_CONSTANT = "null";
+
+ public static Boolean checkNull(String check) {
+ if (StringUtils.isBlank(check)) {
+ return Boolean.TRUE;
+ }
+ if (StringUtils.isNotBlank(check) && NULL_CONSTANT.equalsIgnoreCase(check)) {
+ return Boolean.TRUE;
+ }
+ return Boolean.FALSE;
+ }
+
+ public static Boolean checkNotNull(String check) {
+ if (StringUtils.isNotBlank(check) && !NULL_CONSTANT.equalsIgnoreCase(check)) {
+ return Boolean.TRUE;
+ }
+ return Boolean.FALSE;
+ }
+
+ public static String checkNullReturnDefault(String check) {
+ if (NULL_CONSTANT.equalsIgnoreCase(check)) {
+ return null;
+ }
+ return check;
+ }
+
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/JsonUtils.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/JsonUtils.java
new file mode 100644
index 00000000..f78202cd
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/JsonUtils.java
@@ -0,0 +1,76 @@
+package org.apache.rocketmq.connect.http.sink.util;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.http.sink.constant.HttpConstant;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.Map.Entry;
+
+public class JsonUtils {
+
+ private static final String questionMark = "?";
+
+ public static JSONObject mergeJson(JSONObject source, JSONObject target) {
+ if (target == null) {
+ return source;
+ }
+ if (source == null) {
+ return target;
+ }
+ for (String key : source.keySet()) {
+ Object value = source.get(key);
+ if (!target.containsKey(key)) {
+ target.put(key, value);
+ } else {
+ if (value instanceof JSONObject) {
+ JSONObject valueJson = (JSONObject) value;
+ JSONObject targetValue = mergeJson(valueJson, target.getJSONObject(key));
+ target.put(key, targetValue);
+ } else if (value instanceof JSONArray) {
+ JSONArray valueArray = (JSONArray) value;
+ for (int i = 0; i < valueArray.size(); i++) {
+ JSONObject obj = (JSONObject) valueArray.get(i);
+ JSONObject targetValue = mergeJson(obj, (JSONObject) target.getJSONArray(key).get(i));
+ target.getJSONArray(key).set(i, targetValue);
+ }
+ } else {
+ target.put(key, value);
+ }
+ }
+ }
+ return target;
+ }
+
+ public static String queryStringAndPathValue(String url, String queryString, String pathValue) throws UnsupportedEncodingException {
+ StringBuilder pathValueString = new StringBuilder();
+ if (StringUtils.isNotBlank(pathValue)) {
+ final JSONArray objects = JSONArray.parseArray(pathValue);
+ for (Object object : objects) {
+ pathValueString.append(HttpConstant.HTTP_PATH_VALUE)
+ .append("=").append(object).append("&");
+ }
+ }
+ StringBuilder queryStringBuilder = new StringBuilder();
+ if (StringUtils.isNotBlank(queryString)) {
+ final JSONObject jsonObject = JSONObject.parseObject(queryString);
+ for (Entry next : jsonObject.entrySet()) {
+ if (next.getValue() instanceof JSONObject) {
+ queryStringBuilder.append(next.getKey()).append("=").append(URLEncoder.encode(((JSONObject) next.getValue()).toJSONString(), "UTF-8")).append("&");
+ } else {
+ queryStringBuilder.append(next.getKey()).append("=").append(URLEncoder.encode((String) next.getValue(), "UTF-8")).append("&");
+ }
+ }
+ }
+ String path = pathValueString + queryStringBuilder.toString();
+ if (StringUtils.isNotBlank(path) && StringUtils.isNotBlank(url)) {
+ if (url.contains(questionMark)) {
+ return url + "&" + path.substring(0, path.length() - 1);
+ }
+ return url + "?" + path.substring(0, path.length() - 1);
+ }
+ return url;
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java b/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java
index d75e4454..8f6e3ffd 100644
--- a/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java
+++ b/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java
@@ -18,25 +18,4 @@ public class HttpSinkConnectorTest {
public void testTaskConfigs() {
Assert.assertEquals(httpSinkConnector.taskConfigs(1).size(), 1);
}
-
- @Test
- public void testPut() {
- HttpSinkTask httpSinkTask = new HttpSinkTask();
- KeyValue keyValue = new DefaultKeyValue();
- keyValue.put(HttpConstant.URL_CONSTANT, "http://127.0.0.1:8081/demo");
- httpSinkTask.init(keyValue);
- List connectRecordList = new ArrayList<>();
- ConnectRecord connectRecord = new ConnectRecord(null ,null, System.currentTimeMillis());
- connectRecord.setData("test");
- connectRecordList.add(connectRecord);
- httpSinkTask.put(connectRecordList);
- }
-
- @Test(expected = RuntimeException.class)
- public void testValidate() {
- KeyValue keyValue = new DefaultKeyValue();
- // 需要添加测试的http地址
- keyValue.put(HttpConstant.URL_CONSTANT, "http://127.0.0.1");
- httpSinkConnector.validate(keyValue);
- }
}
diff --git a/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkTaskTest.java b/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkTaskTest.java
new file mode 100644
index 00000000..7bcf398e
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkTaskTest.java
@@ -0,0 +1,82 @@
+package org.apache.rocketmq.connect.http.sink;
+
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.http.sink.constant.AuthTypeEnum;
+import org.apache.rocketmq.connect.http.sink.constant.HttpConstant;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+
+public class HttpSinkTaskTest {
+
+ private final HttpSinkTask httpSinkTask = new HttpSinkTask();
+
+ @Test(expected = RuntimeException.class)
+ public void testPutBasic() {
+ KeyValue keyValue = new DefaultKeyValue();
+ String apiDestinationName = UUID.randomUUID().toString();
+ keyValue.put(HttpConstant.URL_PATTERN_CONSTANT, "http://127.0.0.1:7001/xxxx?id=" + apiDestinationName);
+ keyValue.put(HttpConstant.METHOD_CONSTANT, "POST");
+ keyValue.put(HttpConstant.BASIC_USER_CONSTANT, "xxxx");
+ keyValue.put(HttpConstant.BASIC_PASSWORD_CONSTANT, "xxxx");
+ keyValue.put(HttpConstant.AUTH_TYPE_CONSTANT, AuthTypeEnum.BASIC.getAuthType());
+ httpSinkTask.setScheduledExecutorService(Executors.newSingleThreadScheduledExecutor());
+ httpSinkTask.start(keyValue);
+ List sinkRecords = new ArrayList<>(11);
+ ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis());
+ sinkRecords.add(connectRecord);
+ httpSinkTask.put(sinkRecords);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testPutApiKey() {
+ KeyValue keyValue = new DefaultKeyValue();
+ String apiDestinationName = UUID.randomUUID().toString();
+ keyValue.put(HttpConstant.URL_PATTERN_CONSTANT, "http://127.0.0.1:7001/xxxx?id=" + apiDestinationName);
+ keyValue.put(HttpConstant.METHOD_CONSTANT, "POST");
+ keyValue.put(HttpConstant.API_KEY_NAME, "Token");
+ keyValue.put(HttpConstant.API_KEY_VALUE, "xxxx");
+ keyValue.put(HttpConstant.AUTH_TYPE_CONSTANT, AuthTypeEnum.API_KEY.getAuthType());
+ httpSinkTask.setScheduledExecutorService(Executors.newSingleThreadScheduledExecutor());
+ httpSinkTask.start(keyValue);
+ List sinkRecords = new ArrayList<>(11);
+ ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis());
+ sinkRecords.add(connectRecord);
+ httpSinkTask.put(sinkRecords);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testPutOAuth2() {
+ KeyValue keyValue = new DefaultKeyValue();
+ String apiDestinationName = UUID.randomUUID().toString();
+ keyValue.put(HttpConstant.URL_PATTERN_CONSTANT, "http://127.0.0.1:7001/xxxx?id=" + apiDestinationName);
+ keyValue.put(HttpConstant.METHOD_CONSTANT, "POST");
+ keyValue.put(HttpConstant.OAUTH2_CLIENTID_CONSTANT, "clientId");
+ keyValue.put(HttpConstant.OAUTH2_CLIENTSECRET_CONSTANT, "clientSecret");
+ keyValue.put(HttpConstant.OAUTH2_HTTP_METHOD_CONSTANT, "POST");
+ keyValue.put(HttpConstant.OAUTH2_ENDPOINT_CONSTANT, "http://127.0.0.1:7001/oauth/token");
+ keyValue.put(HttpConstant.AUTH_TYPE_CONSTANT, AuthTypeEnum.OAUTH_CLIENT_CREDENTIALS.getAuthType());
+ Map queryStringParameters = Maps.newHashMap();
+ queryStringParameters.put("grant_type", "xxxx");
+ queryStringParameters.put("scope", "xxxx");
+ keyValue.put(HttpConstant.QUERY_STRING_PARAMETERS_CONSTANT, new Gson().toJson(queryStringParameters));
+ Map headerParameters = Maps.newHashMap();
+ headerParameters.put("Content-Type", "xxxx");
+ headerParameters.put("Authorization", "xxxx");
+ keyValue.put(HttpConstant.HEADER_PARAMETERS_CONSTANT, new Gson().toJson(headerParameters));
+ httpSinkTask.setScheduledExecutorService(Executors.newSingleThreadScheduledExecutor());
+ httpSinkTask.start(keyValue);
+ List sinkRecords = new ArrayList<>(11);
+ ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis());
+ sinkRecords.add(connectRecord);
+ httpSinkTask.put(sinkRecords);
+ }
+}