PLUGIN-1936: Read data in a streaming manner#131
PLUGIN-1936: Read data in a streaming manner#131sgarg-CS wants to merge 5 commits intodata-integrations:developfrom
Conversation
| } | ||
|
|
||
| /** | ||
| * Check whether the result is empty or not. |
There was a problem hiding this comment.
Also explain how are we checking is result is empty
There was a problem hiding this comment.
It specifically looks for a top-level key named result. Once found, it opens the associated array and checks for the presence of a first element. If it does contain at least one element, then it return false, otherwise true.
Updated the method comment. ba5eb53
| return GSON.fromJson(ja, type); | ||
| } | ||
|
|
||
| public List<Map<String, String>> parseResponseToResultListOfMap(InputStream in) throws ServiceNowAPIException { |
There was a problem hiding this comment.
Why are we creating a List of map? Instead we can directly read it as a JsonObject, something like:
gson.fromJson(reader, ServiceNowRecordObject.class);
There was a problem hiding this comment.
Refactored the code to parse the the RestAPIResponse into result. ba5eb53
APIResponse apiResponse = GSON.fromJson(new JsonReader(new InputStreamReader(in, StandardCharsets.UTF_8)), APIResponse.class);
return apiResponse.getResult();
The calling method : serviceNowTableAPIClient.parseResponseToResultListOfMap in ServiceNowConnector.getTableData expects a result of type: List<Map<String, String>>
Can you check the APIResponse class once? When you say ServiceNowRecordObject class, do you expect that class to hold the result array returned by the ServiceNow Table REST API ? Please clarify.
| String startDate, String endDate, int offset, | ||
| int limit) throws ServiceNowAPIException { | ||
| final List<Map<String, String>> results = new ArrayList<>(); | ||
| //final List<Map<String, String>> results = new ArrayList<>(); |
| final RestAPIResponse[] restAPIResponse = new RestAPIResponse[1]; | ||
| Callable<Boolean> fetchRecords = () -> { | ||
| results.addAll(fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit)); | ||
| // results.addAll(fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit)); |
| Callable<Boolean> fetchRecords = () -> { | ||
| results.addAll(fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit)); | ||
| // results.addAll(fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit)); | ||
| restAPIResponse[0] = fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit); |
There was a problem hiding this comment.
can you please explain what are we trying to do here? why restAPIResponse is an array
There was a problem hiding this comment.
In Java, when we use a variable from an outer scope inside a lambda expression or a Callable, that variable must be effectively final. This means its reference cannot change after it is initialised. so, I was creating an array earlier for RestAPIResponse and returning only the first element. I've now replaced it with AtomicReference. ba5eb53
| private final ServiceNowMultiSourceConfig multiSourcePluginConf; | ||
| private ServiceNowTableAPIClientImpl restApi; | ||
| private final Gson gson = new Gson(); | ||
| private final Type mapType = new TypeToken<Map<String, String>>() { }.getType(); |
There was a problem hiding this comment.
should use an object instead of map
There was a problem hiding this comment.
Do we need to replace TypeToken<Map<String, String>>() with TypeToken<Object>() here? Please confirm.
| return false; | ||
| } | ||
| this.jsonReader = new JsonReader(new InputStreamReader(in, StandardCharsets.UTF_8)); | ||
| this.jsonReader.setLenient(true); |
There was a problem hiding this comment.
In general, the ServiceNow Table REST API provides standard-compliant JSON that does not strictly require jsonreader.setlenient(true) for basic parsing. However, there are specific scenarios where enabling leniency becomes necessary or is a recommended safeguard.
If the API call fails (e.g., 401 Unauthorised, 404 Not Found, or a ServiceNow "maintenance" page), the server might return an HTML body instead of JSON. A strict parser will fail immediately but a lenient one may provide a more readable error or allow you to handle the string manually.
| JsonToken top; | ||
| try { | ||
| top = jsonReader.peek(); | ||
| LOG.info("Peeking JSON token for table {}: {}", tableName, top); |
There was a problem hiding this comment.
please remove, it should be a debug log
| } | ||
|
|
||
| public void closeCurrentPage() { | ||
| LOG.info("Closing current page for table {}", tableName); |
There was a problem hiding this comment.
Yes, had added this logger during dev testing to troubleshoot an issue. Removed it now. ba5eb53
| return restAPIResponse; | ||
| } | ||
|
|
||
| private boolean openNextPage() throws IOException, ServiceNowAPIException { |
There was a problem hiding this comment.
looks redundant, as same function is present in src/main/java/io/cdap/plugin/servicenow/source/ServiceNowMultiRecordReader.java
can we abstract these out in a common file. Same comment for nextKeyValue()
There was a problem hiding this comment.
Added the implementation of nextKey(), openNextPage() and closeCurrentPage() to the ServiceNowBaseRecordReader class ba5eb53
| } | ||
|
|
||
| /** | ||
| * Determines if the "result" array in a ServiceNow REST API response is empty. It specifically looks for a top-level |
There was a problem hiding this comment.
can you please format it as per java doc.
/** One line summary
*
-
Additional details
-
params
*@return
*/
| int limit) throws ServiceNowAPIException { | ||
| final List<Map<String, String>> results = new ArrayList<>(); | ||
| // Using AtomicReference to capture a value inside a lambda that needs to be accessed outside. | ||
| AtomicReference<RestAPIResponse> responseRef = new AtomicReference<>(); |
There was a problem hiding this comment.
this seems like a overkill, it's better to abstract the logic in a function instead of lambda to simplify the code here
There was a problem hiding this comment.
Refactored the code by separating out the logic in a private method. PTAL. 6115173
| String tableName) { | ||
| List<Map<String, String>> result = parseResponseToResultListOfMap(restAPIResponse.getResponseBody()); | ||
| String tableName) throws ServiceNowAPIException { | ||
| List<Map<String, String>> result = parseResponseToResultListOfMap(restAPIResponse.getBodyAsStream()); |
There was a problem hiding this comment.
Instead of a Map, we should use well defined Java Objects.
Since we are already refactoring the code we can always change the caller functions, I don't see that as a blocker
| List<Map<String, String>> result = parseResponseToResultListOfMap(restAPIResponse.getResponseBody()); | ||
| String tableName) throws ServiceNowAPIException { | ||
| List<Map<String, String>> result = parseResponseToResultListOfMap(restAPIResponse.getBodyAsStream()); | ||
| // List<Map<String, String>> result = parseResponseToResultListOfMap(restAPIResponse.getResponseBody()); |
| return GSON.fromJson(ja, type); | ||
| } | ||
|
|
||
| public List<Map<String, String>> parseResponseToResultListOfMap(InputStream in) { |
There was a problem hiding this comment.
Should use well defined Objects here instead of Map
There was a problem hiding this comment.
Sorry, it's not clear to me what's the ask here. Can you please elaborate more what you mean by well defined objects ? Are you looking for a POJO class here?
There was a problem hiding this comment.
If I understand it correctly this Map is used to store the response of the API, if yes then response can be represented by a POJO class instead of a generic map
There was a problem hiding this comment.
Yes, it is used to store the ServiceNow API Response. Can we use this POJO class ?
| inputStream, MAX_PAGE_BYTES + 1); // +1 to detect overflow | ||
| responseBody = IOUtils.toByteArray(boundedInputStream); | ||
| LOG.info("RAW JSON: {}", new String(responseBody, StandardCharsets.UTF_8)); | ||
| if (responseBody.length > MAX_PAGE_BYTES) { |
There was a problem hiding this comment.
Loading the complete page defeats the whole purpose of this change altogether. We are making this change to avoid OOM issues when we load the entire page into memory and loading it into a bounded buffer is not solving that problem.
Regarding multiple access of the input stream: We can always refactor out code(validation etc.) such that all the processing happens record by record instead of the whole page.
Basically when the connector request for a record we can do all the processing/validation is required, instead of doing it beforehand. This would solve the problem of multiple access of InputStream.
|
|
||
| JsonObject responseJSON = jsonParser.parse(apiResponse.getResponseBody()).getAsJsonObject(); | ||
| JsonObject responseJSON = jsonParser.parse( | ||
| new InputStreamReader(apiResponse.getBodyAsStream(), StandardCharsets.UTF_8)).getAsJsonObject(); |
There was a problem hiding this comment.
Are we switching to stream for POST methods also? Can't we restrict it to read/GET operations only?
| } | ||
|
|
||
| /** | ||
| * The refactored nextKeyValue() — uses Gson JsonReader to stream one record at a time. |
There was a problem hiding this comment.
Please rephrase the comment here, refactored doesn't provide any context to the reader, the purpose of comment is not to provide the history of the change but explain the logic
| closeCurrentPage(); | ||
| LOG.debug("Fetching data for table {} at offset {}", tableName, split.getOffset()); | ||
| RestAPIResponse resp = fetchData(); | ||
| LOG.debug("Fetched data for table {} at offset {}", tableName, split.getOffset()); |
There was a problem hiding this comment.
do we really need these many debug logs?
There was a problem hiding this comment.
Not really, these were more for dev-testing. I can remove them. 4b89ca5
5a062f7 to
47c753b
Compare
26c0bb1 to
c24f96b
Compare
itsankit-google
left a comment
There was a problem hiding this comment.
Please fix the compilation failure in cdap-e2e-tests:
Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:testCompile (default-testCompile) on project servicenow-plugins: Compilation failure
[ERROR] /tmp/servicenow-plugins/servicenow-plugins/plugin/src/e2e-test/java/io/cdap/plugin/servicenowsink/actions/ServiceNowSinkPropertiesPageActions.java:[66,78] incompatible types: com.google.gson.JsonObject cannot be converted to java.util.Map<java.lang.String,java.lang.String>
| return GSON.fromJson(ja, type); | ||
| } | ||
|
|
||
| public List<JsonObject> parseResponseToResultListOfMap(InputStream in) { |
There was a problem hiding this comment.
this method is returning List<JsonObject> but the name of method is parseResponseToResultListOfMap, please fix.
pom.xml
Outdated
| <dependency> | ||
| <groupId>commons-io</groupId> | ||
| <artifactId>commons-io</artifactId> | ||
| <version>2.5</version> |
There was a problem hiding this comment.
why using 2.5 when newer versions are available?
pom.xml
Outdated
| </dependency> | ||
| <dependency> | ||
| <groupId>commons-io</groupId> | ||
| <artifactId>commons-io</artifactId> |
There was a problem hiding this comment.
Also please move the version to properties section.
PLUGIN-1936
Issue : OutOfMemory (OOM) Errors seen in the ServiceNow Connector frequently when transfering large number of records
RCA: The entire response is converted into string and the JSON is parsed in memory leading to OOM issues.
Proposed Fix: Read the records incrementally i.e. one by one in a streaming manner.
Changes :
prepareResponse:in theRESTAPIResponseclass to return a reusable InputStream in theRESTAPIResponseobject.openNextPageto initialise theGson'sJsonReaderand read theresultarray in the response andcloseCurrentPageto close theJsonReadernextKeyValue(): Refactored the existing logic to stream one record at a time.fetchData()fetchTableRecordsRetryableMode()fetchTableRecords()Testing
