diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 5216a234..a0aab980 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -23,7 +23,7 @@ on: jobs: build: - runs-on: k8s-runner-e2e + runs-on: self-hosted if: ${{ github.event.workflow_run.conclusion != 'skipped' }} diff --git a/.github/workflows/trigger.yml b/.github/workflows/trigger.yml index 863a0e25..1b842487 100644 --- a/.github/workflows/trigger.yml +++ b/.github/workflows/trigger.yml @@ -18,15 +18,15 @@ name: Trigger build on: push: - branches: [ develop, release/** ] + branches: [ Sanity-Testing ] pull_request: - branches: [ develop, release/** ] + branches: [ Sanity-Testing ] types: [opened, synchronize, reopened, labeled] workflow_dispatch: jobs: trigger: - runs-on: ubuntu-latest + runs-on: self-hosted # We allow builds: # 1) When triggered manually diff --git a/docs/Salesforce-batchsink.md b/docs/Salesforce-batchsink.md index 47ceea34..aa79fb42 100644 --- a/docs/Salesforce-batchsink.md +++ b/docs/Salesforce-batchsink.md @@ -81,18 +81,18 @@ Below is a non-comprehensive list of **sObjects** that are not currently availab **Operation:** Operation used for writing data into Salesforce. Insert - adds records. -Upsert - upserts the records. Salesforce will decide if sObjects +Upsert - upserts the records. Salesforce will decide if sObjects are the same using external ID field. -Update - updates existing records based on Id field. +Update - updates existing records based on ID field. -**Upsert External ID Field:** External id field name. It is used only if operation is upsert. -The field specified can be either 'Id' or any customly created field, which has external id attribute set. +**Upsert External ID Field:** External ID field name. It is used only if operation is upsert. +The field specified can be either 'ID' or any customly created field, which has external ID attribute set. **Concurrency Mode:** The concurrency mode for the bulk job. Select one of the following options: Parallel - Process batches in parallel mode. -Serial - Process batches in serial mode. Processing in parallel can cause lock contention. When this is severe, -the Salesforce job can fail. If you’re experiencing this issue, in the Salesforce sink, change concurrency mode to -Serial and run the pipeline again. This mode guarantees that batches are processed one at a time, but can +Serial - Process batches in serial mode. Processing in parallel can cause lock contention. When this is severe, +the Salesforce job can fail. If you’re experiencing this issue, in the Salesforce sink, change concurrency mode to +Serial and run the pipeline again. This mode guarantees that batches are processed one at a time, but can significantly increase the processing time. Default is Parallel. @@ -105,3 +105,21 @@ This value cannot be greater than 10,000,000. **Error Handling:** Strategy used to handle erroneous records. Skip on error - Ignores erroneous records. Fail on error - Fails pipeline due to erroneous record. + +Ingesting File and Attachment Data +----------- +The Salesforce Sink Plugin enables users to ingest file and attachment data, such as PDF and DOC files, into Salesforce +SObjects, **Attachment** and **ContentVersion**. + +Salesforce requires this data to be provided as a base64-encoded string. You can achieve this encoding by using the +transform plugin called **Field Encoder**, which transforms byte data into a +base64 string. + +Before running the pipeline, there are some file size constraints that you need to consider: + +- File Size Limit: The file size cannot exceed 10 MB, which is also the maximum size per batch. + +- Maximum Records per Batch: The number of records per batch cannot exceed 1000. + +Ensure that the files you want to ingest comply with these constraints to avoid any issues during the pipeline +execution. diff --git a/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java b/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java index 2879f896..cf111da7 100644 --- a/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java @@ -57,17 +57,24 @@ public final class SalesforceBulkUtil { /** * Create a new job using the Bulk API. * + * @param bulkConnection BulkConnection object that will connect to salesforce server using bulk APIs. + * @param sObject sObject name + * @param operationEnum Operation that need to be performed on sObject + * @param externalIdField externalIdField will be used in case of update/upsert operation. + * @param concurrencyMode concurrencyMode can be serial/parallel. + * @param contentType contentType will be CSV for query jobs and ZIP_CSV for insert jobs. * @return The JobInfo for the new job. * @throws AsyncApiException if there is an issue creating the job */ + public static JobInfo createJob(BulkConnection bulkConnection, String sObject, OperationEnum operationEnum, @Nullable String externalIdField, - ConcurrencyMode concurrencyMode) throws AsyncApiException { + ConcurrencyMode concurrencyMode, ContentType contentType) throws AsyncApiException { JobInfo job = new JobInfo(); job.setObject(sObject); job.setOperation(operationEnum); job.setConcurrencyMode(concurrencyMode); - job.setContentType(ContentType.CSV); + job.setContentType(contentType); if (externalIdField != null) { job.setExternalIdFieldName(externalIdField); } diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/FileUploadSobject.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/FileUploadSobject.java new file mode 100644 index 00000000..58babb38 --- /dev/null +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/FileUploadSobject.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.salesforce.plugin.sink.batch; + +/** + * FileUploadSobject enum with name field and base64 encoded data field. + */ +public enum FileUploadSobject { + Attachment("Attachment", "Name", "Body"), + ContentVersion("ContentVersion", "PathOnClient", "VersionData"); + + private final String sObjectName; + private final String nameField; + private final String dataField; + + FileUploadSobject(String sObjectName, String nameField, String dataField) { + this.sObjectName = sObjectName; + this.nameField = nameField; + this.dataField = dataField; + } + + public String getsObjectName() { + return sObjectName; + } + + public String getNameField() { + return nameField; + } + + public String getDataField() { + return dataField; + } + + public static boolean isFileUploadSobject(String value) { + for (FileUploadSobject fileUploadSobject : FileUploadSobject.class.getEnumConstants()) { + if (fileUploadSobject.name().equalsIgnoreCase(value)) { + return true; + } + } + return false; + } +} diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java index 1eda3eca..d0a94094 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java @@ -49,11 +49,10 @@ @Name(SalesforceBatchSink.PLUGIN_NAME) @Description("Writes records to Salesforce") @Metadata(properties = {@MetadataProperty(key = Connector.PLUGIN_TYPE, value = SalesforceConstants.PLUGIN_NAME)}) -public class SalesforceBatchSink extends BatchSink { +public class SalesforceBatchSink extends BatchSink { public static final String PLUGIN_NAME = "Salesforce"; private final SalesforceSinkConfig config; - private StructuredRecordToCSVRecordTransformer transformer; public SalesforceBatchSink(SalesforceSinkConfig config) throws ConnectionException { this.config = config; @@ -102,12 +101,10 @@ public void prepareRun(BatchSinkContext context) { @Override public void initialize(BatchRuntimeContext context) throws Exception { super.initialize(context); - this.transformer = new StructuredRecordToCSVRecordTransformer(); } @Override - public void transform(StructuredRecord record, Emitter> emitter) { - CSVRecord csvRecord = transformer.transform(record); - emitter.emit(new KeyValue<>(null, csvRecord)); + public void transform(StructuredRecord record, Emitter> emitter) { + emitter.emit(new KeyValue<>(null, record)); } } diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormat.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormat.java index 71847e8f..b5d138a5 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormat.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormat.java @@ -17,6 +17,7 @@ import com.sforce.async.AsyncApiException; import com.sforce.async.BulkConnection; +import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.plugin.salesforce.SalesforceBulkUtil; import io.cdap.plugin.salesforce.SalesforceConnectionUtil; import io.cdap.plugin.salesforce.authenticator.Authenticator; @@ -35,10 +36,10 @@ * An OutputFormat that sends the output of a Hadoop job to the Salesforce record writer, also * it defines the output committer. */ -public class SalesforceOutputFormat extends OutputFormat { +public class SalesforceOutputFormat extends OutputFormat { @Override - public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) + public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException { try { diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java index ba9b7cef..ac4263f5 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableMap; import com.sforce.async.AsyncApiException; import com.sforce.async.BulkConnection; +import com.sforce.async.ContentType; import com.sforce.async.JobInfo; import io.cdap.cdap.api.data.batch.OutputFormatProvider; import io.cdap.plugin.salesforce.SalesforceBulkUtil; @@ -82,7 +83,8 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) { try { BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials)); JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, config.getSObject(), config.getOperationEnum(), - config.getExternalIdField(), config.getConcurrencyModeEnum()); + config.getExternalIdField(), config.getConcurrencyModeEnum(), + ContentType.ZIP_CSV); configBuilder.put(SalesforceSinkConstants.CONFIG_JOB_ID, job.getId()); LOG.info("Started Salesforce job with jobId='{}'", job.getId()); } catch (AsyncApiException e) { diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceRecordWriter.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceRecordWriter.java index 90fa85e9..bf7a45c6 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceRecordWriter.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceRecordWriter.java @@ -19,6 +19,7 @@ import com.sforce.async.BatchInfo; import com.sforce.async.BulkConnection; import com.sforce.async.JobInfo; +import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.plugin.salesforce.BulkAPIBatchException; import io.cdap.plugin.salesforce.SalesforceBulkUtil; import io.cdap.plugin.salesforce.SalesforceConnectionUtil; @@ -36,13 +37,16 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Writes csv records into batches and submits them to Salesforce Bulk job. - * Accepts null as a key, and CSVRecord as a value. + * Accepts null as a key, and StructuredRecord as a value. */ -public class SalesforceRecordWriter extends RecordWriter { +public class SalesforceRecordWriter extends RecordWriter { private static final Logger LOG = LoggerFactory.getLogger(SalesforceRecordWriter.class); private BulkConnection bulkConnection; @@ -53,6 +57,13 @@ public class SalesforceRecordWriter extends RecordWriter batchInfoList = new ArrayList<>(); private CSVBuffer csvBuffer; private CSVBuffer csvBufferSizeCheck; + private final StructuredRecordToCSVRecordTransformer transformer = new StructuredRecordToCSVRecordTransformer(); + private int count = 0; + private int filesArraySize = 0; + private final Map attachmentMap = new HashMap<>(); + private final boolean isFileUploadObject; + private FileUploadSobject fileUploadObject; + Base64.Decoder base64Decoder = Base64.getDecoder(); public SalesforceRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, AsyncApiException { Configuration conf = taskAttemptContext.getConfiguration(); @@ -69,26 +80,68 @@ public SalesforceRecordWriter(TaskAttemptContext taskAttemptContext) throws IOEx AuthenticatorCredentials credentials = SalesforceConnectionUtil.getAuthenticatorCredentials(conf); bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials)); jobInfo = bulkConnection.getJobStatus(jobId); + isFileUploadObject = FileUploadSobject.isFileUploadSobject(jobInfo.getObject()); + if (isFileUploadObject) { + fileUploadObject = FileUploadSobject.valueOf(jobInfo.getObject()); + } } @Override - public void write(NullWritable key, CSVRecord csvRecord) throws IOException { + public void write(NullWritable key, StructuredRecord record) throws IOException { + CSVRecord csvRecord = transformer.transform(record, fileUploadObject, ++count); csvBufferSizeCheck.reset(); csvBufferSizeCheck.write(csvRecord); - if (csvBuffer.size() + csvBufferSizeCheck.size() > maxBytesPerBatch || + if (csvBuffer.size() + csvBufferSizeCheck.size() + filesArraySize + getFileUploadSize(record) > maxBytesPerBatch || csvBuffer.getRecordsCount() >= maxRecordsPerBatch) { submitCurrentBatch(); } + if (isFileUploadObject) { + populateInputStreamMap(record); + } csvBuffer.write(csvRecord); } + /** + * This method will return the attachment file size in bytes. Exception will be thrown if file size exceeds max + * batch size. Max batch size can not be more than 10 MB. + * + * @param record StructuredRecord to get the data file from it. + * @return fileSizeInBytes + */ + private int getFileUploadSize(StructuredRecord record) { + if (isFileUploadObject) { + String encodedFileData = record.get(fileUploadObject.getDataField()); + int fileSizeInBytes = base64Decoder.decode(encodedFileData).length; + if (fileSizeInBytes > maxBytesPerBatch) { + throw new RuntimeException("File Size exceeded the limit of maximum bytes per batch"); + } + return fileSizeInBytes; + } + return 0; + } + + /** + * This method will populate the attachment map with file name as key and value as InputStream for the file. + * + * @param record StructuredRecord to get the data file from it to put into attachment map. + */ + private void populateInputStreamMap(StructuredRecord record) { + String encodedFileData = record.get(fileUploadObject.getDataField()); + byte[] byteArray = base64Decoder.decode(encodedFileData); + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArray); + filesArraySize = filesArraySize + byteArray.length; + attachmentMap.put(transformer.getAttachmentKey(count, record.get(fileUploadObject.getNameField())), + byteArrayInputStream); + } + private void submitCurrentBatch() throws IOException { if (csvBuffer.getRecordsCount() != 0) { InputStream csvInputStream = new ByteArrayInputStream(csvBuffer.getByteArray()); try { - BatchInfo batchInfo = bulkConnection.createBatchFromStream(jobInfo, csvInputStream); + BatchInfo batchInfo = bulkConnection.createBatchWithInputStreamAttachments(jobInfo, csvInputStream, + attachmentMap); batchInfoList.add(batchInfo); LOG.info("Submitted a batch with batchId='{}'", batchInfo.getId()); } catch (AsyncApiException e) { @@ -96,6 +149,8 @@ private void submitCurrentBatch() throws IOException { String.format("Failed to create and submit a batch for writes: %s", e.getMessage()), e); } + attachmentMap.clear(); + filesArraySize = 0; csvBuffer.reset(); } } diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java index 7275125e..25ab9d91 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java @@ -340,6 +340,11 @@ private void validateSchema(Schema schema, FailureCollector collector, @Nullable break; case upsert: externalIdFieldName = getExternalIdField(); + if (Strings.isNullOrEmpty(externalIdFieldName)) { + collector.addFailure( + String.format("External id field must be set for operation='%s'", operation), null) + .withConfigProperty(SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD); + } break; case update: externalIdFieldName = SALESFORCE_ID_FIELD; @@ -349,7 +354,7 @@ private void validateSchema(Schema schema, FailureCollector collector, @Nullable .withConfigProperty(PROPERTY_OPERATION); } - if (operation == OperationEnum.upsert) { + if (operation == OperationEnum.upsert && externalIdFieldName != null) { Field externalIdField = describeResult.getField(sObject, externalIdFieldName); if (externalIdField == null) { collector.addFailure( diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConstants.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConstants.java index f8af0578..0d73c6bc 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConstants.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConstants.java @@ -27,4 +27,6 @@ public class SalesforceSinkConstants { public static final String CONFIG_JOB_ID = "mapred.salesforce.job.id"; public static final String CONFIG_MAX_BYTES_PER_BATCH = "mapred.salesforce.max.bytes.per.batch"; public static final String CONFIG_MAX_RECORDS_PER_BATCH = "mapred.salesforce.max.records.per.batch"; + public static final String DATA_FIELD_PREFIX = "#"; + public static final String ATTACHMENT_MAP_KEY = "%s_%s"; } diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/StructuredRecordToCSVRecordTransformer.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/StructuredRecordToCSVRecordTransformer.java index d5128282..ffad5bc3 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/StructuredRecordToCSVRecordTransformer.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/StructuredRecordToCSVRecordTransformer.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; /** * Transforms a {@link StructuredRecord} to a {@link CSVRecord} @@ -81,18 +82,39 @@ public static String convertSchemaFieldToString(Object value, Schema.Field field return value.toString(); } - public CSVRecord transform(StructuredRecord record) { + /** + * @param record StructuredRecord that needs to be converted to CSVRecord. + * @param sObjectName FileUploadSobject object if record contains base64 encoded field. + * @param recordCount recordCount is added to use as a prefix for attachment file to avoid overwriting of files of + * same name. Same will be used as key in attachment map. + * @return CSVRecord + */ + public CSVRecord transform(StructuredRecord record, @Nullable FileUploadSobject sObjectName, int recordCount) { List fieldNames = new ArrayList<>(); List values = new ArrayList<>(); for (Schema.Field field : record.getSchema().getFields()) { + String value; String fieldName = field.getName(); - String value = convertSchemaFieldToString(record.get(fieldName), field); - + if (sObjectName != null && sObjectName.getDataField().equalsIgnoreCase(fieldName)) { + value = SalesforceSinkConstants.DATA_FIELD_PREFIX + getAttachmentKey(recordCount, + record.get(sObjectName.getNameField())); + } else { + value = convertSchemaFieldToString(record.get(fieldName), field); + } fieldNames.add(fieldName); values.add(value); } return new CSVRecord(fieldNames, values); } + + /** + * @param count count is added as a prefix to the key to avoid key duplicity in attachmentMap. + * @param nameFieldValue name of the file as mentioned in name field. + * @return Key for the attachmentMap. + */ + public String getAttachmentKey(int count, String nameFieldValue) { + return String.format(SalesforceSinkConstants.ATTACHMENT_MAP_KEY, count, nameFieldValue); + } } diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java index fe6347b7..ee16a74a 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java @@ -20,6 +20,7 @@ import com.sforce.async.BatchStateEnum; import com.sforce.async.BulkConnection; import com.sforce.async.ConcurrencyMode; +import com.sforce.async.ContentType; import com.sforce.async.JobInfo; import com.sforce.async.JobStateEnum; import com.sforce.async.OperationEnum; @@ -109,7 +110,7 @@ private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String qu SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromQuery(query); JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, sObjectDescriptor.getName(), getOperationEnum(operation), - null, ConcurrencyMode.Parallel); + null, ConcurrencyMode.Parallel, ContentType.CSV); BatchInfo batchInfo; try (ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes())) { batchInfo = bulkConnection.createBatchFromStream(job, bout); diff --git a/src/test/java/io/cdap/plugin/salesforce/SalesforceSchemaUtilTest.java b/src/test/java/io/cdap/plugin/salesforce/SalesforceSchemaUtilTest.java index 1269edbf..9a9ac989 100644 --- a/src/test/java/io/cdap/plugin/salesforce/SalesforceSchemaUtilTest.java +++ b/src/test/java/io/cdap/plugin/salesforce/SalesforceSchemaUtilTest.java @@ -17,10 +17,14 @@ import com.sforce.soap.partner.Field; import com.sforce.soap.partner.FieldType; +import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.mock.common.MockPipelineConfigurer; import io.cdap.cdap.etl.mock.validation.MockFailureCollector; import io.cdap.plugin.salesforce.plugin.SalesforceConnectorConfig; +import io.cdap.plugin.salesforce.plugin.sink.batch.CSVRecord; +import io.cdap.plugin.salesforce.plugin.sink.batch.FileUploadSobject; +import io.cdap.plugin.salesforce.plugin.sink.batch.StructuredRecordToCSVRecordTransformer; import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceBatchMultiSource; import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceBatchSource; import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceMultiSourceConfig; @@ -301,4 +305,45 @@ public void testMultiSourceSchemaNotNullIfConnectionMacroAndImportManually() { source.configurePipeline(mockPipelineConfigurer); Assert.assertNull(mockPipelineConfigurer.getOutputSchema()); } + + @Test + public void testTransformWithAttachment() { + Schema schema = Schema.recordOf("Schema", + Schema.Field.of("Id", Schema.of(Schema.Type.INT)), + Schema.Field.of("Name", Schema.of(Schema.Type.STRING)), + Schema.Field.of("Body", Schema.of(Schema.Type.STRING))); + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + StructuredRecord record = builder.set("Id", 1) + .set("Name", "attachment.pdf").set("Body", "base64-encoded value").build(); + FileUploadSobject sObjectName = FileUploadSobject.Attachment; + // Call the transform method + CSVRecord csvRecord = new StructuredRecordToCSVRecordTransformer().transform(record, sObjectName, 1); + + // Verify the results + Assert.assertEquals(3, csvRecord.getColumnNames().size()); + Assert.assertEquals(3, csvRecord.getValues().size()); + Assert.assertEquals("Id", csvRecord.getColumnNames().get(0)); + // Body Field value will be replaced with record number concatenated with file name. + Assert.assertEquals("#1_attachment.pdf", csvRecord.getValues().get(2)); + } + + @Test + public void testTransformWithoutAttachment() { + Schema schema = Schema.recordOf("Schema", + Schema.Field.of("Id", Schema.of(Schema.Type.INT)), + Schema.Field.of("Name", Schema.of(Schema.Type.STRING)), + Schema.Field.of("Body", Schema.of(Schema.Type.STRING))); + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + StructuredRecord record = builder.set("Id", 1) + .set("Name", "attachment.pdf").set("Body", "normal value").build(); + FileUploadSobject sObjectName = null; + // Call the transform method + CSVRecord csvRecord = new StructuredRecordToCSVRecordTransformer().transform(record, sObjectName, 1); + + // Verify the results + Assert.assertEquals(3, csvRecord.getColumnNames().size()); + Assert.assertEquals(3, csvRecord.getValues().size()); + Assert.assertEquals("Id", csvRecord.getColumnNames().get(0)); + Assert.assertEquals("normal value", csvRecord.getValues().get(2)); + } }