Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@
<version>${cdap.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down
40 changes: 38 additions & 2 deletions src/main/java/io/cdap/plugin/servicenow/ServiceNowBaseConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package io.cdap.plugin.servicenow;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.stream.JsonReader;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.servicenow.apiclient.ServiceNowAPIException;
import io.cdap.plugin.servicenow.apiclient.ServiceNowTableAPIClientImpl;
import io.cdap.plugin.servicenow.apiclient.ServiceNowTableAPIRequestBuilder;
import io.cdap.plugin.servicenow.connector.ServiceNowConnectorConfig;
Expand All @@ -31,14 +33,21 @@
import io.cdap.plugin.servicenow.util.SchemaType;
import io.cdap.plugin.servicenow.util.ServiceNowConstants;
import io.cdap.plugin.servicenow.util.SourceValueType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;

/**
* ServiceNow Base Config. Contains connection properties and methods.
*/
public class ServiceNowBaseConfig extends PluginConfig {

private static final Logger log = LoggerFactory.getLogger(ServiceNowBaseConfig.class);
@Name(ConfigUtil.NAME_USE_CONNECTION)
@Nullable
@Description("Whether to use an existing connection.")
Expand Down Expand Up @@ -83,7 +92,7 @@ public void validateCredentials(FailureCollector collector) {
}

@VisibleForTesting
public void validateServiceNowConnection(FailureCollector collector) {
public void validateServiceNowConnection(FailureCollector collector) {
try {
ServiceNowTableAPIClientImpl restApi = new ServiceNowTableAPIClientImpl(connection, useConnection);
restApi.getAccessToken();
Expand Down Expand Up @@ -140,7 +149,7 @@ public void validateTable(String tableName, SourceValueType valueType, FailureCo
requestBuilder.setResponseHeaders(ServiceNowConstants.HEADER_NAME_TOTAL_COUNT);

apiResponse = serviceNowTableAPIClient.executeGetWithRetries(requestBuilder.build());
if (serviceNowTableAPIClient.parseResponseToResultListOfMap(apiResponse.getResponseBody()).isEmpty()) {
if (isResultEmpty(apiResponse)) {
// Removed config property as in case of MultiSource, only first table error was populating.
collector.addFailure("Table: " + tableName + " is empty.", "");
}
Expand All @@ -152,4 +161,31 @@ public void validateTable(String tableName, SourceValueType valueType, FailureCo
}
}

/**
* Checks if the "result" array in the ServiceNow REST API response is empty.
* <p>
* Determines if the "result" array in a ServiceNow REST API response is empty by specifically looking for a top-level
* key named "result". Once found, it opens the associated array and checks for the presence of a first element.
* </p>
*
* @param restAPIResponse The response object containing the JSON input stream
* @return true, if the "result" array exists and is empty, or if the "result" key is never found;
* false, if the array contains at least one element.
* @throws IOException If there is an error reading the input stream or parsing the JSON.
*/
public boolean isResultEmpty(RestAPIResponse restAPIResponse) throws IOException {
JsonReader reader = new JsonReader(new InputStreamReader(restAPIResponse.getResponseStream()));
reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
if (ServiceNowConstants.RESULT.equals(name)) {
reader.beginArray();
return !reader.hasNext();
} else {
reader.skipValue();
}
}
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.servicenow.connector.ServiceNowConnectorConfig;
Expand Down Expand Up @@ -55,14 +56,18 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

import static io.cdap.plugin.servicenow.util.ServiceNowConstants.STC_FIELD_SUFFIX;
Expand All @@ -85,6 +90,7 @@ public class ServiceNowTableAPIClientImpl extends RestAPIClient {
public static JsonArray serviceNowJsonResultArray;

public ServiceNowTableAPIClientImpl(ServiceNowConnectorConfig conf, Boolean useConnection) {
super();
this.conf = conf;
this.schemaType = getSchemaTypeBasedOnUseConnection(useConnection);
}
Expand Down Expand Up @@ -131,7 +137,7 @@ public String getAccessTokenRetryableMode() throws ExecutionException, RetryExce
* @param limit The number of records to be fetched
* @return The list of Map; each Map representing a table row
*/
public List<Map<String, String>> fetchTableRecords(
public RestAPIResponse fetchTableRecords(
String tableName,
SourceValueType valueType,
String startDate,
Expand All @@ -154,7 +160,7 @@ public List<Map<String, String>> fetchTableRecords(
String accessToken = getAccessToken();
requestBuilder.setAuthHeader(accessToken);
RestAPIResponse apiResponse = executeGetWithRetries(requestBuilder.build());
return parseResponseToResultListOfMap(apiResponse.getResponseBody());
return apiResponse;
}

private void applyDateRangeToRequest(ServiceNowTableAPIRequestBuilder requestBuilder, String startDate,
Expand Down Expand Up @@ -197,6 +203,12 @@ public List<Map<String, String>> parseResponseToResultListOfMap(String responseB
return GSON.fromJson(ja, type);
}

public List<JsonObject> parseResponseToResultListOfMap(InputStream in) {
APIResponse apiResponse = GSON.fromJson(new JsonReader(new InputStreamReader(in, StandardCharsets.UTF_8)),
APIResponse.class);
return apiResponse.getResult();
}

private String getErrorMessage(String responseBody) {
try {
JsonObject jo = GSON.fromJson(responseBody, JsonObject.class);
Expand Down Expand Up @@ -231,14 +243,13 @@ private String getErrorMessage(String responseBody) {
* @param limit The number of records to be fetched
* @return The list of Map; each Map representing a table row
*/
public List<Map<String, String>> fetchTableRecordsRetryableMode(String tableName, SourceValueType valueType,
public RestAPIResponse fetchTableRecordsRetryableMode(String tableName, SourceValueType valueType,
String startDate, String endDate, int offset,
int limit) throws ServiceNowAPIException {
final List<Map<String, String>> results = new ArrayList<>();
Callable<Boolean> fetchRecords = () -> {
results.addAll(fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit));
return true;
};
// Using AtomicReference to capture a value inside a lambda that needs to be accessed outside.
AtomicReference<RestAPIResponse> responseRef = new AtomicReference<>();
Callable<Boolean> fetchRecords = () -> executeFetch(tableName, valueType, startDate, endDate, offset, limit,
responseRef);

Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfException(this::isExceptionRetryable)
Expand All @@ -253,8 +264,15 @@ public List<Map<String, String>> fetchTableRecordsRetryableMode(String tableName
String.format("Data Recovery failed for batch %s to %s.", offset, (offset + limit)),
e, null, false);
}
// Return the value captured inside the lambda
return responseRef.get();
}

return results;
private boolean executeFetch(String tableName, SourceValueType type, String startDate, String endDate, int offset,
int limit, AtomicReference<RestAPIResponse> ref) throws ServiceNowAPIException {
RestAPIResponse response = fetchTableRecords(tableName, type, startDate, endDate, offset, limit);
ref.set(response);
return true;
}

/**
Expand All @@ -277,8 +295,8 @@ public Schema fetchTableSchema(String tableName, FailureCollector collector) {
}

@VisibleForTesting
public MetadataAPISchemaResponse parseSchemaResponse(String responseBody) {
return GSON.fromJson(responseBody, MetadataAPISchemaResponse.class);
public MetadataAPISchemaResponse parseSchemaResponse(InputStream responseStream) {
return GSON.fromJson(createJsonReader(responseStream), MetadataAPISchemaResponse.class);
}

/**
Expand Down Expand Up @@ -352,7 +370,7 @@ public Schema fetchTableSchema(String tableName, String accessToken, SourceValue
private Schema prepareSchemaWithSchemaAPI(RestAPIResponse restAPIResponse, List<ServiceNowColumn> columns,
String tableName) throws ServiceNowAPIException {
SchemaAPISchemaResponse schemaAPISchemaResponse =
GSON.fromJson(restAPIResponse.getResponseBody(), SchemaAPISchemaResponse.class);
GSON.fromJson(createJsonReader(restAPIResponse.getResponseStream()), SchemaAPISchemaResponse.class);

if (schemaAPISchemaResponse.getResult() == null || schemaAPISchemaResponse.getResult().isEmpty()) {
throw new ServiceNowAPIException(
Expand Down Expand Up @@ -386,8 +404,7 @@ private Schema prepareSchemaWithSchemaAPI(RestAPIResponse restAPIResponse, List<
private Schema prepareSchemaWithMetadataAPI(RestAPIResponse restAPIResponse, List<ServiceNowColumn> columns,
String tableName, SourceValueType valueType) throws
ServiceNowAPIException {
MetadataAPISchemaResponse metadataAPISchemaResponse = parseSchemaResponse(restAPIResponse.getResponseBody());

MetadataAPISchemaResponse metadataAPISchemaResponse = parseSchemaResponse(restAPIResponse.getResponseStream());
if (metadataAPISchemaResponse.getResult() == null || metadataAPISchemaResponse.getResult().getColumns() == null ||
metadataAPISchemaResponse.getResult().getColumns().isEmpty()) {
throw new ServiceNowAPIException(
Expand All @@ -413,6 +430,11 @@ private Schema prepareSchemaWithMetadataAPI(RestAPIResponse restAPIResponse, Lis
return SchemaBuilder.constructSchema(tableName, columns);
}

public JsonReader createJsonReader(InputStream inputStream) {
Objects.requireNonNull(inputStream, "InputStream must not be null");
return new JsonReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
}

/**
* Get the total number of records in the table
*
Expand Down Expand Up @@ -503,8 +525,9 @@ public String createRecordInDisplayMode(String tableName, HttpEntity entity) thr
}

private String getSystemId(RestAPIResponse restAPIResponse) {
CreateRecordAPIResponse apiResponse = GSON.fromJson(restAPIResponse.getResponseBody(),
CreateRecordAPIResponse.class);
CreateRecordAPIResponse apiResponse = GSON.fromJson(
new InputStreamReader(restAPIResponse.getResponseStream(), StandardCharsets.UTF_8),
CreateRecordAPIResponse.class);
return apiResponse.getResult().get(ServiceNowConstants.SYSTEM_ID).toString();
}

Expand All @@ -515,7 +538,7 @@ private String getSystemId(RestAPIResponse restAPIResponse) {
* @param tableName The ServiceNow table name
* @param query The query
*/
public Map<String, String> getRecordFromServiceNowTable(String tableName, String query)
public JsonObject getRecordFromServiceNowTable(String tableName, String query)
throws ServiceNowAPIException {

ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder(
Expand All @@ -527,7 +550,8 @@ public Map<String, String> getRecordFromServiceNowTable(String tableName, String
requestBuilder.setAuthHeader(accessToken);
restAPIResponse = executeGetWithRetries(requestBuilder.build());

APIResponse apiResponse = GSON.fromJson(restAPIResponse.getResponseBody(), APIResponse.class);
APIResponse apiResponse = GSON.fromJson(
new InputStreamReader(restAPIResponse.getResponseStream(), StandardCharsets.UTF_8), APIResponse.class);
return apiResponse.getResult().get(0);
}

Expand All @@ -545,13 +569,12 @@ public Map<String, String> getRecordFromServiceNowTable(String tableName, String
* @throws RuntimeException if the schema response is null or contains no result.
*/
private Schema prepareStringBasedSchema(RestAPIResponse restAPIResponse, List<ServiceNowColumn> columns,
String tableName) {
List<Map<String, String>> result = parseResponseToResultListOfMap(restAPIResponse.getResponseBody());
String tableName) throws ServiceNowAPIException {
List<JsonObject> result = parseResponseToResultListOfMap(restAPIResponse.getResponseStream());
if (result != null && !result.isEmpty()) {
Map<String, String> firstRecord = result.get(0);
for (String key : firstRecord.keySet()) {
columns.add(new ServiceNowColumn(key, "string"));
}
result.get(0).entrySet().forEach(entry ->
columns.add(new ServiceNowColumn(entry.getKey(), "string"))
);
return SchemaBuilder.constructSchema(tableName, columns);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.cdap.plugin.servicenow.connector;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
Expand Down Expand Up @@ -135,7 +136,7 @@ private TableList listTables(String accessToken) throws ServiceNowAPIException {
ServiceNowTableAPIClientImpl serviceNowTableAPIClient = new ServiceNowTableAPIClientImpl(config, true);
RestAPIResponse apiResponse =
serviceNowTableAPIClient.executeGetWithRetries(requestBuilder.build());
return GSON.fromJson(apiResponse.getResponseBody(), TableList.class);
return GSON.fromJson(serviceNowTableAPIClient.createJsonReader(apiResponse.getResponseStream()), TableList.class);
}

public ConnectorSpec generateSpec(ConnectorContext connectorContext, ConnectorSpecRequest connectorSpecRequest) {
Expand Down Expand Up @@ -183,8 +184,8 @@ private List<StructuredRecord> getTableData(String tableName, int limit)
requestBuilder.setAuthHeader(accessToken);
requestBuilder.setResponseHeaders(ServiceNowConstants.HEADER_NAME_TOTAL_COUNT);
RestAPIResponse apiResponse = serviceNowTableAPIClient.executeGetWithRetries(requestBuilder.build());
List<Map<String, String>> result = serviceNowTableAPIClient.parseResponseToResultListOfMap
(apiResponse.getResponseBody());
List<JsonObject> result = serviceNowTableAPIClient.parseResponseToResultListOfMap
(apiResponse.getResponseStream());
List<StructuredRecord> recordList = new ArrayList<>();
Schema schema = getSchema(tableName);
if (schema != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.cdap.plugin.servicenow.connector;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.JsonObject;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.format.UnexpectedFormatException;
import io.cdap.cdap.api.data.schema.Schema;
Expand All @@ -27,11 +28,13 @@
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Utility class for converting the record from ServiceNow data type to CDAP schema data types
Expand Down Expand Up @@ -82,9 +85,10 @@ public class ServiceNowRecordConverter {
DateTimeFormatter.ofPattern("HH:mm")
));

public static void convertToValue(String fieldName, Schema fieldSchema, Map<String, String> record,
public static void convertToValue(String fieldName, Schema fieldSchema, JsonObject record,
StructuredRecord.Builder recordBuilder) {
String fieldValue = record.get(fieldName);
String fieldValue = record.has(fieldName) && !record.get(fieldName).isJsonNull() ? record.get(fieldName)
.getAsString() : null;
if (fieldValue == null || fieldValue.isEmpty()) {
// Set 'null' value as it is
recordBuilder.set(fieldName, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.cdap.plugin.servicenow.model;

import com.google.gson.JsonObject;

import java.util.List;
import java.util.Map;

Expand All @@ -24,9 +26,9 @@
*/
public class APIResponse {

private List<Map<String, String>> result;
private List<JsonObject> result;

public List<Map<String, String>> getResult() {
public List<JsonObject> getResult() {
return result;
}

Expand Down
Loading