Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ on:

jobs:
build:
runs-on: k8s-runner-e2e
runs-on: self-hosted

if: ${{ github.event.workflow_run.conclusion != 'skipped' }}

Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/trigger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ name: Trigger build

on:
push:
branches: [ develop, release/** ]
branches: [ Sanity-Testing ]
pull_request:
branches: [ develop, release/** ]
branches: [ Sanity-Testing ]
types: [opened, synchronize, reopened, labeled]
workflow_dispatch:

jobs:
trigger:
runs-on: ubuntu-latest
runs-on: self-hosted

# We allow builds:
# 1) When triggered manually
Expand Down
32 changes: 25 additions & 7 deletions docs/Salesforce-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ Below is a non-comprehensive list of **sObjects** that are not currently availab

**Operation:** Operation used for writing data into Salesforce.
Insert - adds records.
Upsert - upserts the records. Salesforce will decide if sObjects
Upsert - upserts the records. Salesforce will decide if sObjects
are the same using external ID field.
Update - updates existing records based on Id field.
Update - updates existing records based on ID field.

**Upsert External ID Field:** External id field name. It is used only if operation is upsert.
The field specified can be either 'Id' or any customly created field, which has external id attribute set.
**Upsert External ID Field:** External ID field name. It is used only if operation is upsert.
The field specified can be either 'ID' or any customly created field, which has external ID attribute set.

**Concurrency Mode:** The concurrency mode for the bulk job. Select one of the following options:
Parallel - Process batches in parallel mode.
Serial - Process batches in serial mode. Processing in parallel can cause lock contention. When this is severe,
the Salesforce job can fail. If you’re experiencing this issue, in the Salesforce sink, change concurrency mode to
Serial and run the pipeline again. This mode guarantees that batches are processed one at a time, but can
Serial - Process batches in serial mode. Processing in parallel can cause lock contention. When this is severe,
the Salesforce job can fail. If you’re experiencing this issue, in the Salesforce sink, change concurrency mode to
Serial and run the pipeline again. This mode guarantees that batches are processed one at a time, but can
significantly increase the processing time.
Default is Parallel.

Expand All @@ -105,3 +105,21 @@ This value cannot be greater than 10,000,000.
**Error Handling:** Strategy used to handle erroneous records.
Skip on error - Ignores erroneous records.
Fail on error - Fails pipeline due to erroneous record.

Ingesting File and Attachment Data
-----------
The Salesforce Sink Plugin enables users to ingest file and attachment data, such as PDF and DOC files, into Salesforce
SObjects, **Attachment** and **ContentVersion**.

Salesforce requires this data to be provided as a base64-encoded string. You can achieve this encoding by using the
transform plugin called **Field Encoder**, which transforms byte data into a
base64 string.

Before running the pipeline, there are some file size constraints that you need to consider:

- File Size Limit: The file size cannot exceed 10 MB, which is also the maximum size per batch.

- Maximum Records per Batch: The number of records per batch cannot exceed 1000.

Ensure that the files you want to ingest comply with these constraints to avoid any issues during the pipeline
execution.
11 changes: 9 additions & 2 deletions src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,24 @@ public final class SalesforceBulkUtil {
/**
* Create a new job using the Bulk API.
*
* @param bulkConnection BulkConnection object that will connect to salesforce server using bulk APIs.
* @param sObject sObject name
* @param operationEnum Operation that need to be performed on sObject
* @param externalIdField externalIdField will be used in case of update/upsert operation.
* @param concurrencyMode concurrencyMode can be serial/parallel.
* @param contentType contentType will be CSV for query jobs and ZIP_CSV for insert jobs.
* @return The JobInfo for the new job.
* @throws AsyncApiException if there is an issue creating the job
*/

public static JobInfo createJob(BulkConnection bulkConnection, String sObject, OperationEnum operationEnum,
@Nullable String externalIdField,
ConcurrencyMode concurrencyMode) throws AsyncApiException {
ConcurrencyMode concurrencyMode, ContentType contentType) throws AsyncApiException {
JobInfo job = new JobInfo();
job.setObject(sObject);
job.setOperation(operationEnum);
job.setConcurrencyMode(concurrencyMode);
job.setContentType(ContentType.CSV);
job.setContentType(contentType);
if (externalIdField != null) {
job.setExternalIdFieldName(externalIdField);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.plugin.salesforce.plugin.sink.batch;

/**
* FileUploadSobject enum with name field and base64 encoded data field.
*/
public enum FileUploadSobject {
Attachment("Attachment", "Name", "Body"),
ContentVersion("ContentVersion", "PathOnClient", "VersionData");

private final String sObjectName;
private final String nameField;
private final String dataField;

FileUploadSobject(String sObjectName, String nameField, String dataField) {
this.sObjectName = sObjectName;
this.nameField = nameField;
this.dataField = dataField;
}

public String getsObjectName() {
return sObjectName;
}

public String getNameField() {
return nameField;
}

public String getDataField() {
return dataField;
}

public static boolean isFileUploadSobject(String value) {
for (FileUploadSobject fileUploadSobject : FileUploadSobject.class.getEnumConstants()) {
if (fileUploadSobject.name().equalsIgnoreCase(value)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@
@Name(SalesforceBatchSink.PLUGIN_NAME)
@Description("Writes records to Salesforce")
@Metadata(properties = {@MetadataProperty(key = Connector.PLUGIN_TYPE, value = SalesforceConstants.PLUGIN_NAME)})
public class SalesforceBatchSink extends BatchSink<StructuredRecord, NullWritable, CSVRecord> {
public class SalesforceBatchSink extends BatchSink<StructuredRecord, NullWritable, StructuredRecord> {

public static final String PLUGIN_NAME = "Salesforce";
private final SalesforceSinkConfig config;
private StructuredRecordToCSVRecordTransformer transformer;

public SalesforceBatchSink(SalesforceSinkConfig config) throws ConnectionException {
this.config = config;
Expand Down Expand Up @@ -102,12 +101,10 @@ public void prepareRun(BatchSinkContext context) {
@Override
public void initialize(BatchRuntimeContext context) throws Exception {
super.initialize(context);
this.transformer = new StructuredRecordToCSVRecordTransformer();
}

@Override
public void transform(StructuredRecord record, Emitter<KeyValue<NullWritable, CSVRecord>> emitter) {
CSVRecord csvRecord = transformer.transform(record);
emitter.emit(new KeyValue<>(null, csvRecord));
public void transform(StructuredRecord record, Emitter<KeyValue<NullWritable, StructuredRecord>> emitter) {
emitter.emit(new KeyValue<>(null, record));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.sforce.async.AsyncApiException;
import com.sforce.async.BulkConnection;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.plugin.salesforce.SalesforceBulkUtil;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
Expand All @@ -35,10 +36,10 @@
* An OutputFormat that sends the output of a Hadoop job to the Salesforce record writer, also
* it defines the output committer.
*/
public class SalesforceOutputFormat extends OutputFormat<NullWritable, CSVRecord> {
public class SalesforceOutputFormat extends OutputFormat<NullWritable, StructuredRecord> {

@Override
public RecordWriter<NullWritable, CSVRecord> getRecordWriter(TaskAttemptContext taskAttemptContext)
public RecordWriter<NullWritable, StructuredRecord> getRecordWriter(TaskAttemptContext taskAttemptContext)
throws IOException {

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import com.sforce.async.AsyncApiException;
import com.sforce.async.BulkConnection;
import com.sforce.async.ContentType;
import com.sforce.async.JobInfo;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.plugin.salesforce.SalesforceBulkUtil;
Expand Down Expand Up @@ -82,7 +83,8 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {
try {
BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials));
JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, config.getSObject(), config.getOperationEnum(),
config.getExternalIdField(), config.getConcurrencyModeEnum());
config.getExternalIdField(), config.getConcurrencyModeEnum(),
ContentType.ZIP_CSV);
configBuilder.put(SalesforceSinkConstants.CONFIG_JOB_ID, job.getId());
LOG.info("Started Salesforce job with jobId='{}'", job.getId());
} catch (AsyncApiException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.sforce.async.BatchInfo;
import com.sforce.async.BulkConnection;
import com.sforce.async.JobInfo;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.plugin.salesforce.BulkAPIBatchException;
import io.cdap.plugin.salesforce.SalesforceBulkUtil;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
Expand All @@ -36,13 +37,16 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Writes csv records into batches and submits them to Salesforce Bulk job.
* Accepts <code>null</code> as a key, and CSVRecord as a value.
* Accepts <code>null</code> as a key, and StructuredRecord as a value.
*/
public class SalesforceRecordWriter extends RecordWriter<NullWritable, CSVRecord> {
public class SalesforceRecordWriter extends RecordWriter<NullWritable, StructuredRecord> {
private static final Logger LOG = LoggerFactory.getLogger(SalesforceRecordWriter.class);

private BulkConnection bulkConnection;
Expand All @@ -53,6 +57,13 @@ public class SalesforceRecordWriter extends RecordWriter<NullWritable, CSVRecord
private List<BatchInfo> batchInfoList = new ArrayList<>();
private CSVBuffer csvBuffer;
private CSVBuffer csvBufferSizeCheck;
private final StructuredRecordToCSVRecordTransformer transformer = new StructuredRecordToCSVRecordTransformer();
private int count = 0;
private int filesArraySize = 0;
private final Map<String, InputStream> attachmentMap = new HashMap<>();
private final boolean isFileUploadObject;
private FileUploadSobject fileUploadObject;
Base64.Decoder base64Decoder = Base64.getDecoder();

public SalesforceRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, AsyncApiException {
Configuration conf = taskAttemptContext.getConfiguration();
Expand All @@ -69,33 +80,77 @@ public SalesforceRecordWriter(TaskAttemptContext taskAttemptContext) throws IOEx
AuthenticatorCredentials credentials = SalesforceConnectionUtil.getAuthenticatorCredentials(conf);
bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials));
jobInfo = bulkConnection.getJobStatus(jobId);
isFileUploadObject = FileUploadSobject.isFileUploadSobject(jobInfo.getObject());
if (isFileUploadObject) {
fileUploadObject = FileUploadSobject.valueOf(jobInfo.getObject());
}
}

@Override
public void write(NullWritable key, CSVRecord csvRecord) throws IOException {
public void write(NullWritable key, StructuredRecord record) throws IOException {
CSVRecord csvRecord = transformer.transform(record, fileUploadObject, ++count);
csvBufferSizeCheck.reset();
csvBufferSizeCheck.write(csvRecord);

if (csvBuffer.size() + csvBufferSizeCheck.size() > maxBytesPerBatch ||
if (csvBuffer.size() + csvBufferSizeCheck.size() + filesArraySize + getFileUploadSize(record) > maxBytesPerBatch ||
csvBuffer.getRecordsCount() >= maxRecordsPerBatch) {
submitCurrentBatch();
}

if (isFileUploadObject) {
populateInputStreamMap(record);
}
csvBuffer.write(csvRecord);
}

/**
* This method will return the attachment file size in bytes. Exception will be thrown if file size exceeds max
* batch size. Max batch size can not be more than 10 MB.
*
* @param record StructuredRecord to get the data file from it.
* @return fileSizeInBytes
*/
private int getFileUploadSize(StructuredRecord record) {
if (isFileUploadObject) {
String encodedFileData = record.get(fileUploadObject.getDataField());
int fileSizeInBytes = base64Decoder.decode(encodedFileData).length;
if (fileSizeInBytes > maxBytesPerBatch) {
throw new RuntimeException("File Size exceeded the limit of maximum bytes per batch");
}
return fileSizeInBytes;
}
return 0;
}

/**
* This method will populate the attachment map with file name as key and value as InputStream for the file.
*
* @param record StructuredRecord to get the data file from it to put into attachment map.
*/
private void populateInputStreamMap(StructuredRecord record) {
String encodedFileData = record.get(fileUploadObject.getDataField());
byte[] byteArray = base64Decoder.decode(encodedFileData);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArray);
filesArraySize = filesArraySize + byteArray.length;
attachmentMap.put(transformer.getAttachmentKey(count, record.get(fileUploadObject.getNameField())),
byteArrayInputStream);
}

private void submitCurrentBatch() throws IOException {
if (csvBuffer.getRecordsCount() != 0) {
InputStream csvInputStream = new ByteArrayInputStream(csvBuffer.getByteArray());
try {
BatchInfo batchInfo = bulkConnection.createBatchFromStream(jobInfo, csvInputStream);
BatchInfo batchInfo = bulkConnection.createBatchWithInputStreamAttachments(jobInfo, csvInputStream,
attachmentMap);
batchInfoList.add(batchInfo);
LOG.info("Submitted a batch with batchId='{}'", batchInfo.getId());
} catch (AsyncApiException e) {
throw new RuntimeException(
String.format("Failed to create and submit a batch for writes: %s", e.getMessage()),
e);
}
attachmentMap.clear();
filesArraySize = 0;
csvBuffer.reset();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ private void validateSchema(Schema schema, FailureCollector collector, @Nullable
break;
case upsert:
externalIdFieldName = getExternalIdField();
if (Strings.isNullOrEmpty(externalIdFieldName)) {
collector.addFailure(
String.format("External id field must be set for operation='%s'", operation), null)
.withConfigProperty(SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
}
break;
case update:
externalIdFieldName = SALESFORCE_ID_FIELD;
Expand All @@ -349,7 +354,7 @@ private void validateSchema(Schema schema, FailureCollector collector, @Nullable
.withConfigProperty(PROPERTY_OPERATION);
}

if (operation == OperationEnum.upsert) {
if (operation == OperationEnum.upsert && externalIdFieldName != null) {
Field externalIdField = describeResult.getField(sObject, externalIdFieldName);
if (externalIdField == null) {
collector.addFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ public class SalesforceSinkConstants {
public static final String CONFIG_JOB_ID = "mapred.salesforce.job.id";
public static final String CONFIG_MAX_BYTES_PER_BATCH = "mapred.salesforce.max.bytes.per.batch";
public static final String CONFIG_MAX_RECORDS_PER_BATCH = "mapred.salesforce.max.records.per.batch";
public static final String DATA_FIELD_PREFIX = "#";
public static final String ATTACHMENT_MAP_KEY = "%s_%s";
}
Loading