Skip to content

CDAP-15313. Salesforce Batch Sink#18

Open
aonischuk wants to merge 1 commit intomasterfrom
feature/CDAP-15313-salesforce-batch-sink
Open

CDAP-15313. Salesforce Batch Sink#18
aonischuk wants to merge 1 commit intomasterfrom
feature/CDAP-15313-salesforce-batch-sink

Conversation

@aonischuk
Copy link
Owner

@aonischuk aonischuk commented May 22, 2019

Insert records into Salesforce sObjects using Salesforce Bulk API. Diving the data into separate Salesforce batches and submit them from separate mappers.

How Insert to Salesforce Batch API works:

  1. Connect to Bulk API via OAuth2
  2. Create a bulk job id
  3. Split data into batches
  4. Add batches to the job
  5. Close job
  6. Await every batch completion
  7. Check results for every batch and for every record

For more info:
https://wiki.cask.co/display/CE/Salesforce+Batch+Sink

@aonischuk aonischuk self-assigned this May 22, 2019
**Max bytes per batch:** Maximum number of records, which plugin puts into a Salesforce batch.
This value cannot be greater than 10,000,000. Which is enforced by Salesforce.

**Error Handling:** Strategy used to handle erroneous records.<br>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think
is not park of markdown. Just use list below

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's part of markdown or not, but it works. Do you want me remove line breaks here?

for (int i = 0; i < resultCols; i++) {
resultInfo.put(resultHeader.get(i), row.get(i));
}
boolean success = Boolean.valueOf(resultInfo.get("Success"));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseBoolean

try {
return new SalesforceRecordWriter(taskAttemptContext);
} catch (AsyncApiException e) {
throw new RuntimeException("There was issue communicating with Salesforce", e);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a strong reason to wrap this exception? Same question for all similar cases

Copy link
Owner Author

@aonischuk aonischuk May 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is checked exception. I think I wrap only those which are in overriden methods which does not have "throws" and also the one which is in config validation.

* @param config Salesforce batch sink configuration
*/
public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {
ImmutableMap.Builder<String, String> builder = new ImmutableMap.Builder<String, String>()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

builder variable is redundant

submitCurrentBatch(true);
}

csvBufferSizeCheck.write(csvRecord);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it write to csvBuffer?

public void write(NullWritable key, CSVRecord csvRecord) throws IOException {
csvBufferSizeCheck.write(csvRecord);

if(csvBufferSizeCheck.size() > maxBytesPerBatch ||

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need additional buffer just to check sizes? It's a bit confusing

try {
creatableSObjectFields = getCreatableSObjectFields();
} catch (ConnectionException e) {
throw new RuntimeException("There was issue communicating with Salesforce", e);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be InvalidStageException

inputFields.removeAll(creatableSObjectFields);

if (!inputFields.isEmpty()) {
throw new IllegalArgumentException(String.format("Following schema fields: '%s' are not present " +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be InvalidStageException

@aonischuk aonischuk force-pushed the feature/CDAP-15313-salesforce-batch-sink branch 3 times, most recently from 9c2c07d to f49b34d Compare May 30, 2019 05:59
@aonischuk aonischuk force-pushed the feature/CDAP-15313-salesforce-batch-sink branch from f49b34d to 400f09d Compare May 30, 2019 22:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants