From ffc19ed48cda8f21207af520381baa319b5bce49 Mon Sep 17 00:00:00 2001 From: psainics Date: Tue, 19 Aug 2025 08:04:35 +0530 Subject: [PATCH 1/4] Rename validate to getValidationResult --- .../google/common/GoogleAuthBaseConfig.java | 6 +++--- .../common/GoogleFilteringSourceConfig.java | 4 ++-- .../drive/sink/GoogleDriveSinkConfig.java | 3 ++- .../drive/source/GoogleDriveInputFormat.java | 7 +++++-- .../drive/source/GoogleDriveRecordReader.java | 20 +++++++++++-------- .../drive/source/GoogleDriveSource.java | 11 +++------- .../drive/source/GoogleDriveSourceConfig.java | 4 ++-- .../sheets/sink/GoogleSheetsSinkConfig.java | 3 ++- .../sheets/source/GoogleSheetsSource.java | 4 ++-- .../source/GoogleSheetsSourceConfig.java | 4 ++-- .../common/GoogleAuthBaseConfigTest.java | 12 +++++------ .../source/GoogleSheetsSourceConfigTest.java | 6 +++--- 12 files changed, 44 insertions(+), 40 deletions(-) diff --git a/src/main/java/io/cdap/plugin/google/common/GoogleAuthBaseConfig.java b/src/main/java/io/cdap/plugin/google/common/GoogleAuthBaseConfig.java index d8a8b11e..4d5f71e6 100644 --- a/src/main/java/io/cdap/plugin/google/common/GoogleAuthBaseConfig.java +++ b/src/main/java/io/cdap/plugin/google/common/GoogleAuthBaseConfig.java @@ -156,7 +156,7 @@ public abstract class GoogleAuthBaseConfig extends PluginConfig { * @param collector the failure collector is provided * @return The ValidationResult */ - public ValidationResult validate(FailureCollector collector) { + public ValidationResult getValidationResult(FailureCollector collector) { IdUtils.validateReferenceName(referenceName, collector); checkIfDirectoryOrFileIdentifierExists(collector); ValidationResult validationResult = new ValidationResult(); @@ -171,7 +171,7 @@ public ValidationResult validate(FailureCollector collector) { propertiesAreValid = validateServiceAccount(collector); break; default: - collector.addFailure(String.format("'%s' is not processed value.", authType.toString()), null) + collector.addFailure(String.format("'%s' is not processed value.", authType), null) .withConfigProperty(AUTH_TYPE); return validationResult; } @@ -240,7 +240,7 @@ private boolean validateServiceAccount(FailureCollector collector) { .withConfigProperty(NAME_SERVICE_ACCOUNT_JSON); } } - return collector.getValidationFailures().size() == 0; + return collector.getValidationFailures().isEmpty(); } private boolean isDirectoryOrFileAccessible(FailureCollector collector, GoogleDriveClient driveClient) diff --git a/src/main/java/io/cdap/plugin/google/common/GoogleFilteringSourceConfig.java b/src/main/java/io/cdap/plugin/google/common/GoogleFilteringSourceConfig.java index 59bab80b..e6b937e2 100644 --- a/src/main/java/io/cdap/plugin/google/common/GoogleFilteringSourceConfig.java +++ b/src/main/java/io/cdap/plugin/google/common/GoogleFilteringSourceConfig.java @@ -82,8 +82,8 @@ public GoogleFilteringSourceConfig() { * @param collector the failure collector is provided * @return The ValidationResult */ - public ValidationResult validate(FailureCollector collector) { - ValidationResult validationResult = super.validate(collector); + public ValidationResult getValidationResult(FailureCollector collector) { + ValidationResult validationResult = super.getValidationResult(collector); if (validateModificationDateRange(collector) && getModificationDateRangeType().equals(ModifiedDateRangeType.CUSTOM)) { if (checkPropertyIsSet(collector, startDate, START_DATE, START_DATE_LABEL)) { diff --git a/src/main/java/io/cdap/plugin/google/drive/sink/GoogleDriveSinkConfig.java b/src/main/java/io/cdap/plugin/google/drive/sink/GoogleDriveSinkConfig.java index 16d557b0..6fb014ee 100644 --- a/src/main/java/io/cdap/plugin/google/drive/sink/GoogleDriveSinkConfig.java +++ b/src/main/java/io/cdap/plugin/google/drive/sink/GoogleDriveSinkConfig.java @@ -60,7 +60,8 @@ public class GoogleDriveSinkConfig extends GoogleInputSchemaFieldsUsageConfig { * @param schema the schema to check compatibility */ public void validate(FailureCollector collector, Schema schema) { - super.validate(collector); + super.getValidationResult(collector); + collector.getOrThrowException(); // validate body field is in schema and has valid format validateSchemaField(collector, schema, SCHEMA_BODY_FIELD_NAME, schemaBodyFieldName, diff --git a/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveInputFormat.java b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveInputFormat.java index 1ffe1849..648653f4 100644 --- a/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveInputFormat.java +++ b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveInputFormat.java @@ -18,7 +18,9 @@ import com.github.rholder.retry.RetryException; import com.google.api.services.drive.model.File; +import io.cdap.cdap.api.data.format.StructuredRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; @@ -33,7 +35,7 @@ /** * Input format class which generates splits for each query. */ -public class GoogleDriveInputFormat extends InputFormat { +public class GoogleDriveInputFormat extends InputFormat { @Override public List getSplits(JobContext jobContext) throws IOException { @@ -75,7 +77,8 @@ private GoogleDriveSplit getSplitWithUnlimitedPartitionSize(String fileId) { } @Override - public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { + public RecordReader createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) { return new GoogleDriveRecordReader(); } } diff --git a/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveRecordReader.java b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveRecordReader.java index 6d20db3c..4583275b 100644 --- a/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveRecordReader.java +++ b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveRecordReader.java @@ -17,7 +17,7 @@ package io.cdap.plugin.google.drive.source; import com.github.rholder.retry.RetryException; -import io.cdap.plugin.google.drive.common.FileFromFolder; +import io.cdap.cdap.api.data.format.StructuredRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; @@ -28,12 +28,13 @@ import java.util.concurrent.ExecutionException; /** - * RecordReader implementation, which reads {@link FileFromFolder} wrappers from Google Drive using + * RecordReader implementation, which reads {@link StructuredRecord} wrappers from Google Drive using * Google Drive API. */ -public class GoogleDriveRecordReader extends RecordReader { +public class GoogleDriveRecordReader extends RecordReader { private GoogleDriveSourceClient googleDriveSourceClient; + private GoogleDriveSourceConfig googleDriveSourceConfig; private String fileId; private long bytesFrom; private long bytesTo; @@ -43,6 +44,7 @@ public class GoogleDriveRecordReader extends RecordReader { +public class GoogleDriveSource extends BatchSource { public static final String NAME = "GoogleDrive"; private final GoogleDriveSourceConfig config; @@ -53,7 +53,7 @@ public GoogleDriveSource(GoogleDriveSourceConfig config) { @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); - config.validate(failureCollector); + config.getValidationResult(failureCollector); failureCollector.getOrThrowException(); pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema()); @@ -62,7 +62,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { @Override public void prepareRun(BatchSourceContext context) { FailureCollector failureCollector = context.getFailureCollector(); - config.validate(failureCollector); + config.getValidationResult(failureCollector); failureCollector.getOrThrowException(); LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName()); @@ -74,9 +74,4 @@ public void prepareRun(BatchSourceContext context) { context.setInput(Input.of(config.getReferenceName(), new GoogleDriveInputFormatProvider(config))); } - - @Override - public void transform(KeyValue input, Emitter emitter) { - emitter.emit(FilesFromFolderTransformer.transform(input.getValue(), config.getSchema())); - } } diff --git a/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java index e79e4689..ada5b302 100644 --- a/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java +++ b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java @@ -136,8 +136,8 @@ public Schema getSchema() { * @param collector the failure collector is provided * @return The ValidationResult */ - public ValidationResult validate(FailureCollector collector) { - ValidationResult validationResult = super.validate(collector); + public ValidationResult getValidationResult(FailureCollector collector) { + ValidationResult validationResult = super.getValidationResult(collector); validateFileTypesToPull(collector); diff --git a/src/main/java/io/cdap/plugin/google/sheets/sink/GoogleSheetsSinkConfig.java b/src/main/java/io/cdap/plugin/google/sheets/sink/GoogleSheetsSinkConfig.java index 1128d538..5b101d1b 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/sink/GoogleSheetsSinkConfig.java +++ b/src/main/java/io/cdap/plugin/google/sheets/sink/GoogleSheetsSinkConfig.java @@ -167,7 +167,8 @@ public class GoogleSheetsSinkConfig extends GoogleInputSchemaFieldsUsageConfig { * @param schema the schema to check compatibility */ public void validate(FailureCollector collector, Schema schema) { - super.validate(collector); + super.getValidationResult(collector); + collector.getOrThrowException(); // validate spreadsheet name field is in schema and has valid format validateSchemaField(collector, schema, SCHEMA_SPREAD_SHEET_NAME_FIELD_NAME, schemaSpreadsheetNameFieldName, diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java index 3752f07c..54b8fdd6 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java @@ -54,7 +54,7 @@ public GoogleSheetsSource(GoogleSheetsSourceConfig config) { public void configurePipeline(PipelineConfigurer pipelineConfigurer) { StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); - config.validate(failureCollector); + config.getValidationResult(failureCollector); failureCollector.getOrThrowException(); Schema configuredSchema = config.getSchema(failureCollector); @@ -64,7 +64,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { @Override public void prepareRun(BatchSourceContext context) { FailureCollector failureCollector = context.getFailureCollector(); - config.validate(failureCollector); + config.getValidationResult(failureCollector); failureCollector.getOrThrowException(); Schema configSchema = config.getSchema(failureCollector); diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java index 161835d9..1ccfbbac 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java @@ -318,8 +318,8 @@ private boolean shouldGetSchema() { * @param collector the failure collector is provided * @return The ValidationResult */ - public ValidationResult validate(FailureCollector collector) { - ValidationResult validationResult = super.validate(collector); + public ValidationResult getValidationResult(FailureCollector collector) { + ValidationResult validationResult = super.getValidationResult(collector); // reset current headers info dataSchemaInfo = new LinkedHashMap<>(); diff --git a/src/test/java/io/cdap/plugin/google/common/GoogleAuthBaseConfigTest.java b/src/test/java/io/cdap/plugin/google/common/GoogleAuthBaseConfigTest.java index 2c5454db..ed22afb2 100644 --- a/src/test/java/io/cdap/plugin/google/common/GoogleAuthBaseConfigTest.java +++ b/src/test/java/io/cdap/plugin/google/common/GoogleAuthBaseConfigTest.java @@ -60,7 +60,7 @@ public void testValidationErrorFilePath() { config.setIdentifierType(IdentifierType.FILE_IDENTIFIER.name()); config.setFileIdentifier("fileId"); FailureCollector collector = new DefaultFailureCollector("stageConfig", Collections.EMPTY_MAP); - config.validate(collector); + config.getValidationResult(collector); Assert.assertEquals(1, collector.getValidationFailures().size()); Assert.assertEquals("Service Account File Path is not available.", collector.getValidationFailures().get(0).getMessage()); @@ -81,7 +81,7 @@ public void testValidationErrorJSON() { config.setFileIdentifier("fileId"); config.setIdentifierType(IdentifierType.FILE_IDENTIFIER.name()); FailureCollector collector = new DefaultFailureCollector("stageConfig", Collections.EMPTY_MAP); - config.validate(collector); + config.getValidationResult(collector); Assert.assertEquals(1, collector.getValidationFailures().size()); Assert.assertEquals("Service Account JSON can not be empty.", collector.getValidationFailures().get(0).getMessage()); @@ -101,7 +101,7 @@ public void testValidationOauthWithoutAccessToken() { config.setIdentifierType(IdentifierType.FILE_IDENTIFIER.name()); config.setFileIdentifier("fileId"); FailureCollector collector = new DefaultFailureCollector("stageConfig", Collections.EMPTY_MAP); - config.validate(collector); + config.getValidationResult(collector); Assert.assertEquals(1, collector.getValidationFailures().size()); Assert.assertEquals("'Access Token' property is empty or macro is not available.", collector.getValidationFailures().get(0).getMessage()); @@ -121,7 +121,7 @@ public void testValidationOauthWithoutRefreshToken() { config.setFileIdentifier("fileId"); config.setIdentifierType(IdentifierType.FILE_IDENTIFIER.name()); FailureCollector collector = new DefaultFailureCollector("stageConfig", Collections.EMPTY_MAP); - config.validate(collector); + config.getValidationResult(collector); Assert.assertEquals(3, collector.getValidationFailures().size()); Assert.assertEquals("'Client ID' property is empty or macro is not available.", collector.getValidationFailures().get(0).getMessage()); @@ -148,7 +148,7 @@ public void testWithDirectoryIdAsNull() { config.setAccessToken("access"); config.setoAuthMethod(OAuthMethod.ACCESS_TOKEN.name()); FailureCollector collector = new DefaultFailureCollector("stageConfig", Collections.EMPTY_MAP); - config.validate(collector); + config.getValidationResult(collector); Assert.assertEquals(2, collector.getValidationFailures().size()); Assert.assertEquals("Directory Identifier can not be null.", collector.getValidationFailures().get(0).getMessage()); @@ -171,7 +171,7 @@ public void testWithFileIdAsNull() { config.setIdentifierType(IdentifierType.FILE_IDENTIFIER.name()); config.setoAuthMethod(OAuthMethod.ACCESS_TOKEN.name()); FailureCollector collector = new DefaultFailureCollector("stageConfig", Collections.EMPTY_MAP); - config.validate(collector); + config.getValidationResult(collector); Assert.assertEquals(2, collector.getValidationFailures().size()); Assert.assertEquals("File Identifier can not be null.", collector.getValidationFailures().get(0).getMessage()); diff --git a/src/test/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfigTest.java b/src/test/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfigTest.java index 65d8b633..c6135594 100644 --- a/src/test/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfigTest.java +++ b/src/test/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfigTest.java @@ -104,7 +104,7 @@ public void testGetMetadataCoordinates() throws NoSuchFieldException, IllegalAcc } @Test - public void testValidateMetadataCellsOnlyHeader() throws NoSuchFieldException, IllegalAccessException, + public void testGetValidationResultMetadataCellsOnlyHeader() throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException { Method validateMetadataCellsMethod = config.getClass().getDeclaredMethod("validateMetadataCells", FailureCollector.class); @@ -141,7 +141,7 @@ public void testValidateMetadataCellsOnlyHeader() throws NoSuchFieldException, I } @Test - public void testValidateMetadataCellsOnlyFooter() throws NoSuchFieldException, IllegalAccessException, + public void testGetValidationResultMetadataCellsOnlyFooter() throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException { Method validateMetadataCellsMethod = config.getClass().getDeclaredMethod("validateMetadataCells", FailureCollector.class); @@ -180,7 +180,7 @@ public void testValidateMetadataCellsOnlyFooter() throws NoSuchFieldException, I } @Test - public void testValidateMetadataCellsHeaderAndFooter() throws NoSuchFieldException, IllegalAccessException, + public void testGetValidationResultMetadataCellsHeaderAndFooter() throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException { Method validateMetadataCellsMethod = config.getClass().getDeclaredMethod("validateMetadataCells", FailureCollector.class); From 2ee4d806771ffafbcac61a1de5d5738f525d8805 Mon Sep 17 00:00:00 2001 From: psainics Date: Tue, 19 Aug 2025 08:34:19 +0530 Subject: [PATCH 2/4] Make fields nullable --- .../google/common/GoogleAuthBaseConfig.java | 5 ++-- .../drive/source/GoogleDriveSourceConfig.java | 27 ++++++++++++++----- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/cdap/plugin/google/common/GoogleAuthBaseConfig.java b/src/main/java/io/cdap/plugin/google/common/GoogleAuthBaseConfig.java index 4d5f71e6..d109dd25 100644 --- a/src/main/java/io/cdap/plugin/google/common/GoogleAuthBaseConfig.java +++ b/src/main/java/io/cdap/plugin/google/common/GoogleAuthBaseConfig.java @@ -177,7 +177,7 @@ public ValidationResult getValidationResult(FailureCollector collector) { } if (propertiesAreValid) { try { - GoogleDriveClient client = new GoogleDriveClient(this); + GoogleDriveClient client = new GoogleDriveClient<>(this); // check directory or file access if (isDirectoryOrFileAccessible(collector, client)) { @@ -243,7 +243,8 @@ private boolean validateServiceAccount(FailureCollector collector) { return collector.getValidationFailures().isEmpty(); } - private boolean isDirectoryOrFileAccessible(FailureCollector collector, GoogleDriveClient driveClient) + private boolean isDirectoryOrFileAccessible(FailureCollector collector, + GoogleDriveClient driveClient) throws IOException { if (containsMacro(FILE_IDENTIFIER) || containsMacro(DIRECTORY_IDENTIFIER)) { return false; diff --git a/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java index ada5b302..07959dfb 100644 --- a/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java +++ b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java @@ -50,6 +50,13 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig { public static final String DRAWINGS_EXPORTING_FORMAT = "drawingsExportingFormat"; public static final String PRESENTATIONS_EXPORTING_FORMAT = "presentationsExportingFormat"; + public static final String DEFAULT_BODY_FORMAT = "bytes"; + public static final long DEFAULT_MAX_PARTITION_SIZE = 0; + public static final String DEFAULT_DOCS_EXPORTING_FORMAT = "text/plain"; + public static final String DEFAULT_SHEETS_EXPORTING_FORMAT = "text/csv"; + public static final String DEFAULT_DRAWINGS_EXPORTING_FORMAT = "image/svg+xml"; + public static final String DEFAULT_PRESENTATIONS_EXPORTING_FORMAT = "text/plain"; + public static final String FILE_METADATA_PROPERTIES_LABEL = "File properties"; public static final String FILE_TYPES_TO_PULL_LABEL = "File types to pull"; public static final String BODY_FORMAT_LABEL = "Body output format"; @@ -66,37 +73,44 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig { "The following values are supported: binary (all non-Google Drive formats), Google Documents, " + "Google Spreadsheets, Google Drawings, Google Presentations and Google Apps Scripts. \n" + "For Google Drive formats user should specify exporting format in **Exporting** section.") + @Nullable @Macro protected String fileTypesToPull; @Name(MAX_PARTITION_SIZE) @Description("Maximum body size for each structured record specified in bytes. \n" + "Default 0 value means unlimited. Is not applicable for files in Google formats.") + @Nullable @Macro protected String maxPartitionSize; @Name(BODY_FORMAT) @Description("Output format for body of file. \"Bytes\" and \"String\" values are available.") + @Nullable @Macro protected String bodyFormat; @Name(DOCS_EXPORTING_FORMAT) @Description("MIME type which is used for Google Documents when converted to structured records.") + @Nullable @Macro protected String docsExportingFormat; @Name(SHEETS_EXPORTING_FORMAT) @Description("MIME type which is used for Google Spreadsheets when converted to structured records.") + @Nullable @Macro protected String sheetsExportingFormat; @Name(DRAWINGS_EXPORTING_FORMAT) @Description("MIME type which is used for Google Drawings when converted to structured records.") + @Nullable @Macro protected String drawingsExportingFormat; @Name(PRESENTATIONS_EXPORTING_FORMAT) @Description("MIME type which is used for Google Presentations when converted to structured records.") + @Nullable @Macro protected String presentationsExportingFormat; private transient Schema schema = null; @@ -202,27 +216,28 @@ public List getFileTypesToPull() { } public BodyFormat getBodyFormat() { - return BodyFormat.fromValue(bodyFormat); + return bodyFormat == null ? BodyFormat.fromValue(DEFAULT_BODY_FORMAT) : BodyFormat.fromValue(bodyFormat); } public Long getMaxPartitionSize() { - return Long.parseLong(maxPartitionSize); + return Strings.isNullOrEmpty(maxPartitionSize) ? DEFAULT_MAX_PARTITION_SIZE : Long.parseLong(maxPartitionSize); } public String getDocsExportingFormat() { - return docsExportingFormat; + return Strings.isNullOrEmpty(docsExportingFormat) ? DEFAULT_DOCS_EXPORTING_FORMAT : docsExportingFormat; } public String getSheetsExportingFormat() { - return sheetsExportingFormat; + return Strings.isNullOrEmpty(sheetsExportingFormat) ? DEFAULT_SHEETS_EXPORTING_FORMAT : sheetsExportingFormat; } public String getDrawingsExportingFormat() { - return drawingsExportingFormat; + return Strings.isNullOrEmpty(drawingsExportingFormat) ? DEFAULT_DRAWINGS_EXPORTING_FORMAT : drawingsExportingFormat; } public String getPresentationsExportingFormat() { - return presentationsExportingFormat; + return Strings.isNullOrEmpty(presentationsExportingFormat) ? DEFAULT_PRESENTATIONS_EXPORTING_FORMAT + : presentationsExportingFormat; } public GoogleDriveSourceConfig(String referenceName) { From f8359cc2ca08a2bdce2202251c1630873734fd88 Mon Sep 17 00:00:00 2001 From: psainics Date: Wed, 24 Sep 2025 16:55:28 +0530 Subject: [PATCH 3/4] Add AbstractFileBatchSource GDrive UI --- docs/GoogleDrive-batchsource.md | 55 ++ pom.xml | 5 + .../drive/source/GoogleDriveSourceConfig.java | 211 ++++- widgets/GoogleDrive-batchsource.json | 783 +++++++++++++++--- 4 files changed, 946 insertions(+), 108 deletions(-) diff --git a/docs/GoogleDrive-batchsource.md b/docs/GoogleDrive-batchsource.md index d7929d96..d4bd86ea 100644 --- a/docs/GoogleDrive-batchsource.md +++ b/docs/GoogleDrive-batchsource.md @@ -33,6 +33,57 @@ Then the File Identifier would be `17W3vOhBwe0i24OdVNsbz8rAMClzUitKeAbumTqWFrkow They will be a part of output structured record. Descriptions for properties can be view at [Drive API file reference](https://developers.google.com/drive/api/v3/reference/files). +**Structured Schema Required :** Set this property to `true` if the output should be structured records. +Using structured records allows you to use read files with formats like avro, csv, json, text, xls, etc. + +**Format:** Format of the data to read. +The format must be one of 'avro', 'blob', 'csv', 'delimited', 'json', 'parquet', 'text', 'tsv', 'xls', or the +name of any format plugin that you have deployed to your environment. +If the format is a macro, only the pre-packaged formats can be used. +If the format is 'blob', every input file will be read into a separate record. +The 'blob' format also requires a schema that contains a field named 'body' of type 'bytes'. +If the format is 'text', the schema must contain a field named 'body' of type 'string'. + +**Get Schema:** Auto-detects schema from file. Supported formats are: avro, parquet, csv, delimited, tsv, blob, xls +and text. + +Blob - is set by default as field named 'body' of type bytes. + +Text - is set by default as two fields: 'body' of type bytes and 'offset' of type 'long'. + +JSON - is not supported, user has to manually provide the output schema. + +Parquet - If the path is a directory, the plugin will look for files ending in '.parquet' to read the schema from. +If no such file can be found, an error will be returned. + +Avro - If the path is a directory, the plugin will look for files ending in '.avro' to read the schema from. +If no such file can be found, an error will be returned. + +**Sample Size:** The maximum number of rows that will get investigated for automatic data type detection. +The default value is 1000. This is used when the format is `xls`, `csv`, `tsv`, `delimited`. + +**Override:** A list of columns with the corresponding data types for whom the automatic data type detection gets +skipped. This is used when the format is `xls`, `csv`, `tsv`, `delimited`. + +**Delimiter:** Delimiter to use when the format is 'delimited'. This will be ignored for other formats. + +**Enable Quoted Values** Whether to treat content between quotes as a value. This value will only be used if the format +is 'csv', 'tsv' or 'delimited'. For example, if this is set to true, a line that looks like `1, "a, b, c"` will output two fields. +The first field will have `1` as its value and the second will have `a, b, c` as its value. The quote characters will be trimmed. +The newline delimiter cannot be within quotes. + +It also assumes the quotes are well enclosed. The left quote will match the first following quote right before the delimiter. If there is an +unenclosed quote, an error will occur. + +**Use First Row as Header:** Whether to use the first line of each file as the column headers. Supported formats are 'text', 'csv', 'tsv', 'xls', 'delimited'. + +**Terminate Reading After Empty Row:** Specify whether to stop reading after encountering the first empty row. Defaults to false. When false the reader will read all rows in the sheet. This is only used when the format is 'xls'. + +**Select Sheet Using:** Select the sheet by name or number. Default is 'Sheet Number'. This is only used when the format is 'xls'. + +**Sheet Value:** The name/number of the sheet to read from. If not specified, the first sheet will be read. +Sheet Numbers are 0 based, ie first sheet is 0. This is only used when the format is 'xls'. + ### Filtering **Filter:** Filter that can be applied to the files in the selected directory. @@ -107,6 +158,10 @@ Default 0 value means unlimited. Is not applicable for files in Google formats. **Body Output Format** Output format for body of file. "Bytes" and "String" values are available. +**File System Properties:** Additional properties to use with the InputFormat when reading the data. + +**File Encoding:** The character encoding for the file(s) to be read. The default encoding is UTF-8. + ### Exporting **Google Documents Export Format:** MIME type which is used for Google Documents when converted to structured records. diff --git a/pom.xml b/pom.xml index 51b6c2ce..9a4bcca4 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,11 @@ hydrator-common ${cdap.plugin.version} + + io.cdap.plugin + format-common + ${cdap.plugin.version} + io.cdap.cdap cdap-data-pipeline2_2.11 diff --git a/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java index 07959dfb..782e301a 100644 --- a/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java +++ b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java @@ -23,7 +23,11 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.StageContext; +import io.cdap.plugin.format.FileFormat; +import io.cdap.plugin.format.plugin.FileSourceProperties; import io.cdap.plugin.google.common.GoogleFilteringSourceConfig; +import io.cdap.plugin.google.common.IdentifierType; import io.cdap.plugin.google.common.ValidationResult; import io.cdap.plugin.google.common.exceptions.InvalidPropertyTypeException; import io.cdap.plugin.google.common.utils.ExportedType; @@ -33,13 +37,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.annotation.Nullable; /** * Configurations for Google Drive Batch Source plugin. */ -public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig { +public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig implements FileSourceProperties { public static final String FILE_METADATA_PROPERTIES = "fileMetadataProperties"; public static final String CONFIGURATION_PARSE_PROPERTY_NAME = "properties"; public static final String FILE_TYPES_TO_PULL = "fileTypesToPull"; @@ -49,6 +54,9 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig { public static final String SHEETS_EXPORTING_FORMAT = "sheetsExportingFormat"; public static final String DRAWINGS_EXPORTING_FORMAT = "drawingsExportingFormat"; public static final String PRESENTATIONS_EXPORTING_FORMAT = "presentationsExportingFormat"; + public static final String IS_STRUCTURED_SCHEMA_REQUIRED = "structuredSchemaRequired"; + public static final String NAME_SCHEMA = "schema"; + public static final String NAME_FORMAT = "format"; public static final String DEFAULT_BODY_FORMAT = "bytes"; public static final long DEFAULT_MAX_PARTITION_SIZE = 0; @@ -61,6 +69,12 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig { public static final String FILE_TYPES_TO_PULL_LABEL = "File types to pull"; public static final String BODY_FORMAT_LABEL = "Body output format"; + public static final String GOOGLE_DRIVE_SCHEMA = "drive"; + public static final String GOOGLE_DRIVE_AUTHORITY = "drive.google.com"; + public static final String GOOGLE_DRIVE_FILE_PATH_PREFIX = "/drive/file/d"; + public static final String GOOGLE_DRIVE_FOLDER_PATH_PREFIX = "/drive/folders"; + public static final String GOOGLE_DRIVE_DEFAULT_FILENAME = "default.txt"; + @Nullable @Name(FILE_METADATA_PROPERTIES) @Description("Properties that represent metadata of files. \n" + @@ -113,7 +127,91 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig { @Nullable @Macro protected String presentationsExportingFormat; - private transient Schema schema = null; + + @Macro + @Nullable + @Description("Output schema for the source. Formats like 'avro' and 'parquet' require a schema in order to " + + "read the data.") + private String schema; + + @Name(IS_STRUCTURED_SCHEMA_REQUIRED) + @Description("Wheather to fetch schema or not") + @Nullable + protected Boolean isStructuredSchemaRequired; + + @Name(NAME_FORMAT) + @Macro + @Description("Format of the data to read. Supported formats are 'csv'....") + @Nullable + private String format; + + @Macro + @Nullable + @Description("Whether to recursively read directories within the input directory. The default is false.") + private Boolean recursive; + + @Macro + @Nullable + @Description("Whether to allow an input that does not exist. When false, the source will fail the run if the input " + + "does not exist. When true, the run will not fail and the source will not generate any output. " + + "The default value is false.") + private Boolean ignoreNonExistingFolders; + + @Macro + @Nullable + @Description("The maximum number of rows that will get investigated for automatic data type detection.") + private Long sampleSize; + + @Macro + @Nullable + @Description("A list of columns with the corresponding data types for whom the automatic data type detection gets " + + "skipped.") + private String override; + + @Macro + @Nullable + @Description("The delimiter to use if the format is 'delimited'. The delimiter will be ignored if the format " + + "is anything other than 'delimited'.") + private String delimiter; + + @Macro + @Nullable + @Description("Whether to use first row as header. Supported formats are 'text', 'csv', 'tsv', " + + "'delimited'. Default value is false.") + private Boolean skipHeader; + + @Macro + @Nullable + @Description("Whether to treat content between quotes as a value. This value will only be used if the format " + + "is 'csv', 'tsv' or 'delimited'. The default value is false.") + protected Boolean enableQuotedValues; + + @Macro + @Nullable + @Description("Any additional properties to use when reading from the filesystem. " + + "This is an advanced feature that requires knowledge of the properties supported by the underlying filesystem.") + private String fileSystemProperties; + + @Macro + @Nullable + @Description("File encoding for the source files. The default encoding is 'UTF-8'") + private String fileEncoding; + + @Macro + @Nullable + @Description("Select the sheet by name or number. Default is 'Sheet Number'.") + private String sheet; + + @Macro + @Nullable + @Description("The name/number of the sheet to read from. If not specified, the first sheet will be read." + + "Sheet Numbers are 0 based, ie first sheet is 0.") + private String sheetValue; + + @Macro + @Nullable + @Description("Specify whether to stop reading after encountering the first empty row. Defaults to false.") + private String terminateIfEmptyRow; public GoogleDriveSourceConfig(String referenceName, @Nullable String fileMetadataProperties, String fileTypesToPull, String maxPartitionSize, String bodyFormat, String sheetsExportingFormat, @@ -134,15 +232,103 @@ public GoogleDriveSourceConfig(String referenceName, @Nullable String fileMetada this.endDate = endDate; } + @Override + public void validate(FailureCollector collector) { + getSchema(); + // Extra validation when structure schema is required + } + + @Override + public String getPath() { + IdentifierType idType = getIdentifierType(); + if (idType == IdentifierType.FILE_IDENTIFIER) { + return String.format("%s://%s%s/%s/%s", GOOGLE_DRIVE_SCHEMA, GOOGLE_DRIVE_AUTHORITY, + GOOGLE_DRIVE_FILE_PATH_PREFIX, getFileIdentifier(), GOOGLE_DRIVE_DEFAULT_FILENAME); + } else if (idType == IdentifierType.DIRECTORY_IDENTIFIER) { + return String.format("%s://%s%s/%s/", GOOGLE_DRIVE_SCHEMA, GOOGLE_DRIVE_AUTHORITY, + GOOGLE_DRIVE_FOLDER_PATH_PREFIX, getDirectoryIdentifier()); + } + throw new IllegalArgumentException(String.format("Invalid identifier type '%s'. Expected one of: %s or %s.", idType, + IdentifierType.FILE_IDENTIFIER, IdentifierType.DIRECTORY_IDENTIFIER)); + } + + @Override + public String getPath(StageContext context) { + return getPath(); + } + + @Override + public String getFormatName() { + // need to do this for backwards compatibility, where the pre-packaged format names were case insensitive. + try { + FileFormat fileFormat = FileFormat.from(format, x -> true); + return fileFormat.name().toLowerCase(); + } catch (IllegalArgumentException e) { + // ignore + } + return format; + } + + @Nullable + @Override + public FileFormat getFormat() { + throw new UnsupportedOperationException("GDrive does not support: FileFormat getFormat() method"); + } + + @Nullable + @Override + public Pattern getFilePattern() { + return null; + } + + @Override + public long getMaxSplitSize() { + return Long.MAX_VALUE; + } + + @Override + public boolean shouldAllowEmptyInput() { + return ignoreNonExistingFolders != null && ignoreNonExistingFolders; + } + + @Override + public boolean shouldReadRecursively() { + return recursive != null && recursive; + } + + @Nullable + @Override + public String getPathField() { + return null; + } + + @Override + public boolean useFilenameAsPath() { + throw new UnsupportedOperationException("GDrive does not support: boolean useFilenameAsPath() method"); + } + + @Override + public boolean skipHeader() { + throw new UnsupportedOperationException("GDrive does not support: boolean skipHeader() method"); + } + /** + * throw new UnsupportedOperationException("GDrive does not support: /**() method; * Returns the instance of Schema. * @return The instance of Schema */ public Schema getSchema() { - if (schema == null) { - schema = SchemaBuilder.buildSchema(getFileMetadataProperties(), getBodyFormat()); + if (!isStructuredSchemaRequired() && Strings.isNullOrEmpty(schema)) { + schema = SchemaBuilder.buildSchema(getFileMetadataProperties(), getBodyFormat()).toString(); + } + if (Strings.isNullOrEmpty(schema)) { + return null; + } + try { + return Schema.parseJson(schema); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e); } - return schema; } /** @@ -236,8 +422,15 @@ public String getDrawingsExportingFormat() { } public String getPresentationsExportingFormat() { - return Strings.isNullOrEmpty(presentationsExportingFormat) ? DEFAULT_PRESENTATIONS_EXPORTING_FORMAT - : presentationsExportingFormat; + return Strings.isNullOrEmpty(presentationsExportingFormat) ? + DEFAULT_PRESENTATIONS_EXPORTING_FORMAT : presentationsExportingFormat; + } + + public boolean isStructuredSchemaRequired() { + if (isStructuredSchemaRequired == null) { + return false; // for backward compatibility, default to false + } + return isStructuredSchemaRequired; } public GoogleDriveSourceConfig(String referenceName) { @@ -284,8 +477,8 @@ public void setFilter(String filter) { this.filter = filter; } - public void setSchema(String schema) throws IOException { - this.schema = Schema.parseJson(schema); + public void setSchema(String schema) { + this.schema = schema; } public void setModificationDateRange(String modificationDateRange) { diff --git a/widgets/GoogleDrive-batchsource.json b/widgets/GoogleDrive-batchsource.json index c657a281..713c3046 100644 --- a/widgets/GoogleDrive-batchsource.json +++ b/widgets/GoogleDrive-batchsource.json @@ -4,6 +4,104 @@ }, "display-name": "Google Drive Source", "configuration-groups": [ + { + "label": "Authentication", + "properties": [ + { + "widget-type": "radio-group", + "label": "Authentication Type", + "name": "authType", + "widget-attributes": { + "layout": "inline", + "default": "oAuth2", + "options": [ + { + "id": "oAuth2", + "label": "OAuth2" + }, + { + "id": "serviceAccount", + "label": "Service account" + } + ] + } + }, + { + "name": "oAuthMethod", + "label": "OAuth Method", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "REFRESH_TOKEN", + "options": [ + { + "id": "REFRESH_TOKEN", + "label": "Refresh Token" + }, + { + "id": "ACCESS_TOKEN", + "label": "Access Token" + } + ] + } + }, + { + "name": "accessToken", + "label": "Access Token", + "widget-type": "textbox", + "widget-attributes": { + "placeholder": "${oauthAccessToken(provider,credential)}" + } + }, + { + "widget-type": "textbox", + "label": "Client ID", + "name": "clientId" + }, + { + "widget-type": "password", + "label": "Client Secret", + "name": "clientSecret" + }, + { + "widget-type": "password", + "label": "Refresh Token", + "name": "refreshToken" + }, + { + "name": "serviceAccountType", + "label": "Service Account Type", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "filePath", + "options": [ + { + "id": "filePath", + "label": "File Path" + }, + { + "id": "JSON", + "label": "JSON" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "Service Account File Path", + "name": "accountFilePath", + "widget-attributes": { + "default": "auto-detect" + } + }, + { + "widget-type": "textbox", + "label": "Service Account JSON", + "name": "serviceAccountJSON" + } + ] + }, { "label": "Basic", "properties": [ @@ -226,6 +324,186 @@ } ] } + }, + { + "name": "structuredSchemaRequired", + "label": "Structured Schema Required", + "widget-type": "toggle", + "widget-attributes": { + "on": { + "value": "true", + "label": "YES" + }, + "off": { + "value": "false", + "label": "NO" + }, + "default": "true" + } + }, + { + "widget-type": "select", + "label": "Format", + "name": "format", + "widget-attributes": { + "values": [ + "avro", + "blob", + "delimited", + "tsv", + "csv", + "json", + "text", + "xls" + ], + "default": "blob" + } + }, + { + "widget-type": "get-schema", + "widget-category": "plugin" + }, + { + "widget-type": "hidden", + "name": "recursive", + "label": "Read Files Recursively", + "widget-attributes": { + "layout": "inline", + "default": "false", + "options": [ + { + "id": "true", + "label": "True" + }, + { + "id": "false", + "label": "False" + } + ] + } + }, + { + "widget-type": "hidden", + "label": "Allow Empty Input", + "name": "ignoreNonExistingFolders", + "widget-attributes": { + "layout": "inline", + "default": "false", + "options": [ + { + "id": "true", + "label": "True" + }, + { + "id": "false", + "label": "False" + } + ] + } + }, + { + "widget-type": "number", + "label": "Sample Size", + "name": "sampleSize", + "widget-attributes": { + "default": "1000", + "minimum": "1" + } + }, + { + "widget-type": "keyvalue-dropdown", + "label": "Override", + "name": "override", + "widget-attributes": { + "key-placeholder": "Field Name", + "value-placeholder": "Data Type", + "dropdownOptions": [ + "boolean", + "bytes", + "double", + "float", + "int", + "long", + "string", + "timestamp" + ] + } + }, + { + "widget-type": "textbox", + "label": "Delimiter", + "name": "delimiter", + "widget-attributes": { + "placeholder": "Delimiter if the format is 'delimited'" + } + }, + { + "widget-type": "toggle", + "name": "enableQuotedValues", + "label": "Enable Quoted Values", + "widget-attributes": { + "default": "false", + "on": { + "value": "true", + "label": "True" + }, + "off": { + "value": "false", + "label": "False" + } + } + }, + { + "widget-type": "toggle", + "name": "skipHeader", + "label": "Use First Row as Header", + "widget-attributes": { + "default": "false", + "on": { + "value": "true", + "label": "True" + }, + "off": { + "value": "false", + "label": "False" + } + } + }, + { + "widget-type": "toggle", + "label": "Terminate Reading After Empty Row", + "name": "terminateIfEmptyRow", + "widget-attributes": { + "default": "false", + "on": { + "value": "true", + "label": "True" + }, + "off": { + "value": "false", + "label": "False" + } + } + }, + { + "widget-type": "select", + "label": "Select Sheet Using", + "name": "sheet", + "widget-attributes": { + "values": [ + "Sheet Name", + "Sheet Number" + ], + "default": "Sheet Number" + } + }, + { + "widget-type": "textbox", + "label": "Sheet Value", + "name": "sheetValue", + "widget-attributes": { + "default": "0" + } } ] }, @@ -313,104 +591,6 @@ } ] }, - { - "label": "Authentication", - "properties": [ - { - "widget-type": "radio-group", - "label": "Authentication Type", - "name": "authType", - "widget-attributes": { - "layout": "inline", - "default": "oAuth2", - "options": [ - { - "id": "oAuth2", - "label": "OAuth2" - }, - { - "id": "serviceAccount", - "label": "Service account" - } - ] - } - }, - { - "name": "oAuthMethod", - "label": "OAuth Method", - "widget-type": "radio-group", - "widget-attributes": { - "layout": "inline", - "default": "REFRESH_TOKEN", - "options": [ - { - "id": "REFRESH_TOKEN", - "label": "Refresh Token" - }, - { - "id": "ACCESS_TOKEN", - "label": "Access Token" - } - ] - } - }, - { - "name": "accessToken", - "label": "Access Token", - "widget-type": "textbox", - "widget-attributes": { - "placeholder": "${oauthAccessToken(provider,credential)}" - } - }, - { - "widget-type": "textbox", - "label": "Client ID", - "name": "clientId" - }, - { - "widget-type": "password", - "label": "Client Secret", - "name": "clientSecret" - }, - { - "widget-type": "password", - "label": "Refresh Token", - "name": "refreshToken" - }, - { - "name": "serviceAccountType", - "label": "Service Account Type", - "widget-type": "radio-group", - "widget-attributes": { - "layout": "inline", - "default": "filePath", - "options": [ - { - "id": "filePath", - "label": "File Path" - }, - { - "id": "JSON", - "label": "JSON" - } - ] - } - }, - { - "widget-type": "textbox", - "label": "Service Account File Path", - "name": "accountFilePath", - "widget-attributes": { - "default": "auto-detect" - } - }, - { - "widget-type": "textbox", - "label": "Service Account JSON", - "name": "serviceAccountJSON" - } - ] - }, { "label": "Advanced", "properties": [ @@ -441,6 +621,285 @@ } ] } + }, + { + "widget-type": "json-editor", + "label": "File System Properties", + "name": "fileSystemProperties" + }, + { + "widget-type": "select", + "label": "File encoding", + "name": "fileEncoding", + "widget-attributes": { + "values": [ + { + "label": "UTF-8", + "value": "UTF-8" + }, + { + "label": "UTF-32", + "value": "UTF-32" + }, + { + "label": "ISO-8859-1 (Latin-1 Western European)", + "value": "ISO-8859-1" + }, + { + "label": "ISO-8859-2 (Latin-2 Central European)", + "value": "ISO-8859-2" + }, + { + "label": "ISO-8859-3 (Latin-3 South European)", + "value": "ISO-8859-3" + }, + { + "label": "ISO-8859-4 (Latin-4 North European)", + "value": "ISO-8859-4" + }, + { + "label": "ISO-8859-5 (Latin/Cyrillic)", + "value": "ISO-8859-5" + }, + { + "label": "ISO-8859-6 (Latin/Arabic)", + "value": "ISO-8859-6" + }, + { + "label": "ISO-8859-7 (Latin/Greek)", + "value": "ISO-8859-7" + }, + { + "label": "ISO-8859-8 (Latin/Hebrew)", + "value": "ISO-8859-8" + }, + { + "label": "ISO-8859-9 (Latin-5 Turkish)", + "value": "ISO-8859-9" + }, + { + "label": "ISO-8859-11 (Latin/Thai)", + "value": "ISO-8859-11" + }, + { + "label": "ISO-8859-13 (Latin-7 Baltic Rim)", + "value": "ISO-8859-13" + }, + { + "label": "ISO-8859-15 (Latin-9)", + "value": "ISO-8859-15" + }, + { + "label": "Windows-1250", + "value": "Windows-1250" + }, + { + "label": "Windows-1251", + "value": "Windows-1251" + }, + { + "label": "Windows-1252", + "value": "Windows-1252" + }, + { + "label": "Windows-1253", + "value": "Windows-1253" + }, + { + "label": "Windows-1254", + "value": "Windows-1254" + }, + { + "label": "Windows-1255", + "value": "Windows-1255" + }, + { + "label": "Windows-1256", + "value": "Windows-1256" + }, + { + "label": "Windows-1257", + "value": "Windows-1257" + }, + { + "label": "Windows-1258", + "value": "Windows-1258" + }, + { + "label": "IBM00858", + "value": "IBM00858" + }, + { + "label": "IBM01140", + "value": "IBM01140" + }, + { + "label": "IBM01141", + "value": "IBM01141" + }, + { + "label": "IBM01142", + "value": "IBM01142" + }, + { + "label": "IBM01143", + "value": "IBM01143" + }, + { + "label": "IBM01144", + "value": "IBM01144" + }, + { + "label": "IBM01145", + "value": "IBM01145" + }, + { + "label": "IBM01146", + "value": "IBM01146" + }, + { + "label": "IBM01147", + "value": "IBM01147" + }, + { + "label": "IBM01148", + "value": "IBM01148" + }, + { + "label": "IBM01149", + "value": "IBM01149" + }, + { + "label": "IBM037", + "value": "IBM037" + }, + { + "label": "IBM1026", + "value": "IBM1026" + }, + { + "label": "IBM1047", + "value": "IBM1047" + }, + { + "label": "IBM273", + "value": "IBM273" + }, + { + "label": "IBM277", + "value": "IBM277" + }, + { + "label": "IBM278", + "value": "IBM278" + }, + { + "label": "IBM280", + "value": "IBM280" + }, + { + "label": "IBM284", + "value": "IBM284" + }, + { + "label": "IBM285", + "value": "IBM285" + }, + { + "label": "IBM290", + "value": "IBM290" + }, + { + "label": "IBM297", + "value": "IBM297" + }, + { + "label": "IBM420", + "value": "IBM420" + }, + { + "label": "IBM424", + "value": "IBM424" + }, + { + "label": "IBM437", + "value": "IBM437" + }, + { + "label": "IBM500", + "value": "IBM500" + }, + { + "label": "IBM775", + "value": "IBM775" + }, + { + "label": "IBM850", + "value": "IBM850" + }, + { + "label": "IBM852", + "value": "IBM852" + }, + { + "label": "IBM855", + "value": "IBM855" + }, + { + "label": "IBM857", + "value": "IBM857" + }, + { + "label": "IBM860", + "value": "IBM860" + }, + { + "label": "IBM861", + "value": "IBM861" + }, + { + "label": "IBM862", + "value": "IBM862" + }, + { + "label": "IBM863", + "value": "IBM863" + }, + { + "label": "IBM864", + "value": "IBM864" + }, + { + "label": "IBM865", + "value": "IBM865" + }, + { + "label": "IBM866", + "value": "IBM866" + }, + { + "label": "IBM868", + "value": "IBM868" + }, + { + "label": "IBM869", + "value": "IBM869" + }, + { + "label": "IBM870", + "value": "IBM870" + }, + { + "label": "IBM871", + "value": "IBM871" + }, + { + "label": "IBM918", + "value": "IBM918" + } + ], + "default": "UTF-8" + } } ] }, @@ -512,7 +971,28 @@ ] } ], - "outputs": [], + "outputs": [ + { + "name": "schema", + "widget-type": "schema", + "widget-attributes": { + "default-schema": { + "name": "fileRecord", + "type": "record", + "fields": [ + { + "name": "offset", + "type": "long" + }, + { + "name": "body", + "type": "string" + } + ] + } + } + } + ], "filters": [ { "name": "Select modification date range", @@ -657,6 +1137,111 @@ "type": "property" } ] + }, + { + "name": "Show property delimiter", + "condition": { + "expression": "format == 'delimited' && structuredSchemaRequired == true" + }, + "show": [ + { + "name": "delimiter" + } + ] + }, + { + "name": "Show property enableQuotedValues", + "condition": { + "expression": "(format == 'delimited' || format == 'csv' || format == 'tsv') && structuredSchemaRequired == true" + }, + "show": [ + { + "name": "enableQuotedValues" + } + ] + }, + { + "name": "Show property skipHeader", + "condition": { + "expression": "(format == 'delimited' || format == 'csv' || format == 'tsv' || format == 'xls') && structuredSchemaRequired == true" + }, + "show": [ + { + "name": "skipHeader" + } + ] + }, + { + "name": "Show property sheet, sheetValue, terminateIfEmptyRow", + "condition": { + "expression": "format == 'xls' && structuredSchemaRequired == true" + }, + "show": [ + { + "name": "sheet" + }, + { + "name": "sheetValue" + }, + { + "name": "terminateIfEmptyRow" + } + ] + }, + { + "name": "Show property format, get-schema, override, fileEncoding, sampleSize", + "condition": { + "expression": "structuredSchemaRequired == true" + }, + "show": [ + { + "name": "format" + }, + { + "widget-type": "get-schema" + }, + { + "name": "override" + }, + { + "name": "fileEncoding" + }, + { + "name": "sampleSize" + } + ] + }, + { + "name": "Show old properties when structuredSchemaRequired is false or null", + "condition": { + "expression": "structuredSchemaRequired != true" + }, + "show": [ + { + "name": "docsExportingFormat" + }, + { + "name": "sheetsExportingFormat" + }, + { + "name": "drawingsExportingFormat" + }, + { + "name": "presentationsExportingFormat" + }, + { + "name": "maxPartitionSize" + }, + { + "name": "bodyFormat" + }, + { + "name": "fileMetadataProperties" + }, + { + "name": "fileTypesToPull" + } + ] } ] } From 4dc520294a55260870cd9240fa37a1fefb4aa508 Mon Sep 17 00:00:00 2001 From: psainics Date: Tue, 14 Oct 2025 14:15:36 +0530 Subject: [PATCH 4/4] Add GDriveFS --- .../google/common/GoogleDriveClient.java | 2 +- .../drive/source/GoogleDriveFileSource.java | 50 +++++ .../drive/source/GoogleDriveSource.java | 30 ++- .../drive/source/GoogleDriveSourceConfig.java | 46 ++++ .../source/fs/GoogleDriveFileSystem.java | 143 ++++++++++++ .../source/fs/GoogleDriveInputStream.java | 36 +++ .../fs/GoogleDriveInputStreamWrapper.java | 116 ++++++++++ .../drive/source/fs/GoogleDriveUtils.java | 210 ++++++++++++++++++ .../sheets/source/SheetTransformer.java | 6 +- 9 files changed, 623 insertions(+), 16 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveFileSource.java create mode 100644 src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveFileSystem.java create mode 100644 src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveInputStream.java create mode 100644 src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveInputStreamWrapper.java create mode 100644 src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveUtils.java diff --git a/src/main/java/io/cdap/plugin/google/common/GoogleDriveClient.java b/src/main/java/io/cdap/plugin/google/common/GoogleDriveClient.java index d9235be8..bc1362ab 100644 --- a/src/main/java/io/cdap/plugin/google/common/GoogleDriveClient.java +++ b/src/main/java/io/cdap/plugin/google/common/GoogleDriveClient.java @@ -68,7 +68,7 @@ public GoogleDriveClient(C config) throws IOException { * @return {@link Drive} client. * @throws IOException on issues with service account file reading. */ - protected Drive getDriveClient() throws IOException { + public Drive getDriveClient() throws IOException { Drive drive; AuthType authType = config.getAuthType(); switch (authType) { diff --git a/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveFileSource.java b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveFileSource.java new file mode 100644 index 00000000..3085467b --- /dev/null +++ b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveFileSource.java @@ -0,0 +1,50 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.google.drive.source; + +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.plugin.format.plugin.AbstractFileSource; +import java.util.HashMap; +import java.util.Map; + +/** + * This class gives ability to read files from Google Drive with a structured schema. + */ +public class GoogleDriveFileSource extends AbstractFileSource { + public static final String NAME = "GoogleDrive"; + private final GoogleDriveSourceConfig config; + + public GoogleDriveFileSource(GoogleDriveSourceConfig config) { + super(config); + this.config = config; + } + + @Override + protected Map getFileSystemProperties(BatchSourceContext context) { + FailureCollector collector = context == null ? null : context.getFailureCollector(); + return new HashMap<>(config.getFileSystemProperties(collector)); + } + + @Override + protected boolean shouldGetSchema() { + return !config.containsMacro(GoogleDriveSourceConfig.NAME_FORMAT) + && !config.containsMacro(GoogleDriveSourceConfig.NAME_DELIMITER) + && !config.containsMacro(GoogleDriveSourceConfig.NAME_FILE_SYSTEM_PROPERTIES) + && !config.containsMacro(GoogleDriveSourceConfig.NAME_FILE_ENCODING); + } +} diff --git a/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSource.java b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSource.java index b142e895..bffd6a46 100644 --- a/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSource.java +++ b/src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSource.java @@ -23,14 +23,11 @@ import io.cdap.cdap.api.data.batch.Input; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; -import io.cdap.cdap.api.dataset.lib.KeyValue; -import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.plugin.common.LineageRecorder; -import io.cdap.plugin.google.drive.common.FileFromFolder; import org.apache.hadoop.io.NullWritable; import java.util.stream.Collectors; @@ -45,9 +42,11 @@ public class GoogleDriveSource extends BatchSource>() { }.getType(); + @Nullable @Name(FILE_METADATA_PROPERTIES) @Description("Properties that represent metadata of files. \n" + @@ -168,6 +184,7 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig impleme "skipped.") private String override; + @Name(NAME_DELIMITER) @Macro @Nullable @Description("The delimiter to use if the format is 'delimited'. The delimiter will be ignored if the format " @@ -186,12 +203,14 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig impleme "is 'csv', 'tsv' or 'delimited'. The default value is false.") protected Boolean enableQuotedValues; + @Name(NAME_FILE_SYSTEM_PROPERTIES) @Macro @Nullable @Description("Any additional properties to use when reading from the filesystem. " + "This is an advanced feature that requires knowledge of the properties supported by the underlying filesystem.") private String fileSystemProperties; + @Name(NAME_FILE_ENCODING) @Macro @Nullable @Description("File encoding for the source files. The default encoding is 'UTF-8'") @@ -588,4 +607,31 @@ public static GoogleDriveSourceConfig of(JsonObject properties) throws IOExcepti } return googleDriveSourceConfig; } + + public Map getFileSystemProperties(@Nullable FailureCollector collector) { + Map properties = new HashMap<>(); + properties.put(GOOGLE_DRIVE_FILESYSTEM_IMPL, GoogleDriveFileSystem.class.getName()); + properties.put(GoogleDriveInputFormatProvider.PROPERTY_CONFIG_JSON, GSON.toJson(getProperties())); + properties.put(GOOGLE_DRIVE_FILESYSTEM_DISABLE_CACHE_KEY, Boolean.TRUE.toString()); + + if (fileSystemProperties != null) { + try { + Map userProperties = GSON.fromJson(fileSystemProperties, MAP_STRING_STRING_TYPE); + for (Map.Entry entry : userProperties.entrySet()) { + if (entry.getKey() != null && !entry.getKey().isEmpty()) { + // let user override the default properties + properties.put(entry.getKey(), entry.getValue()); + } + } + } catch (Exception e) { + String errorMessage = String.format("Unable to parse filesystem properties, %s: %s", e.getClass().getName(), + e.getMessage()); + if (collector == null) { + throw new IllegalArgumentException(errorMessage, e); + } + collector.addFailure(errorMessage, "Ensure the properties are in a valid JSON format."); + } + } + return properties; + } } diff --git a/src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveFileSystem.java b/src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveFileSystem.java new file mode 100644 index 00000000..ba65c7b1 --- /dev/null +++ b/src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveFileSystem.java @@ -0,0 +1,143 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.google.drive.source.fs; + +import com.google.api.services.drive.Drive; +import com.google.common.base.Strings; +import io.cdap.plugin.google.common.GoogleDriveClient; +import io.cdap.plugin.google.drive.source.GoogleDriveInputFormatProvider; +import io.cdap.plugin.google.drive.source.GoogleDriveSourceConfig; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; + +/** + * A custom Hadoop FileSystem implementation for Google Drive. + * This class provides methods to interact with Google Drive files and directories. + */ +public class GoogleDriveFileSystem extends FileSystem { + private URI uri; + private Path workingDir; + private Drive driveService; + private String filter; + + @Override + public void initialize(URI name, Configuration conf) throws IOException { + super.initialize(name, conf); + setConf(conf); + this.uri = URI.create(name.getScheme() + "://" + name.getAuthority()); + this.workingDir = new Path("/"); + + GoogleDriveSourceConfig googleDriveSourceConfig = GoogleDriveInputFormatProvider.extractPropertiesFromConfig(conf); + GoogleDriveClient client = new GoogleDriveClient<>(googleDriveSourceConfig); + + // Initialize Google Drive service (using OAuth2 or service account) + this.driveService = client.getDriveClient(); + + // Initialize filter to be passed down to listStatus + if (!Strings.isNullOrEmpty(googleDriveSourceConfig.getFilter())) { + this.filter = googleDriveSourceConfig.getFilter(); + } + } + @Override + public URI getUri() { + return this.uri; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return new GoogleDriveInputStream(new GoogleDriveInputStreamWrapper(driveService, f)); + } + + @Override + public FSDataOutputStream create(Path f, + FsPermission permission, + boolean overwrite, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + throw new UnsupportedOperationException( + "GDrive does not support: FSDataOutputStream create(Path, FsPermission, boolean, int, " + + "short, long, Progressable)"); + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { + throw new UnsupportedOperationException( + "GDrive does not support: FSDataOutputStream append(Path, int, Progressable)"); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + throw new UnsupportedOperationException("GDrive does not support: boolean rename(Path, Path)"); + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + throw new UnsupportedOperationException("GDrive does not support: boolean delete(Path, boolean)"); + } + + @Override + public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { + if (isPathDirectory(f)) { + return GoogleDriveUtils.listStatus(driveService, f, filter); + } + return new FileStatus[]{GoogleDriveUtils.listObjectStatus(driveService, f, false, GoogleDriveUtils.getFileId(f))}; + } + + @Override + public void setWorkingDirectory(Path newDir) { + throw new UnsupportedOperationException("GDrive does not support: void setWorkingDirectory(Path)"); + } + + @Override + public Path getWorkingDirectory() { + return workingDir; + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + throw new UnsupportedOperationException( + "GDrive does not support: boolean mkdirs(Path, FsPermission)"); + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + if (isPathDirectory(f)) { + return GoogleDriveUtils.listObjectStatus(driveService, f, true, f.getName()); + } + return GoogleDriveUtils.listObjectStatus(driveService, f, false, GoogleDriveUtils.getFileId(f)); + } + + private boolean isPathDirectory(Path path) { + return path != null && path.toString().startsWith(String.format("%s://%s%s/", + GoogleDriveSourceConfig.GOOGLE_DRIVE_SCHEMA, + GoogleDriveSourceConfig.GOOGLE_DRIVE_AUTHORITY, + GoogleDriveSourceConfig.GOOGLE_DRIVE_FOLDER_PATH_PREFIX)); + } + +} diff --git a/src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveInputStream.java b/src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveInputStream.java new file mode 100644 index 00000000..60144715 --- /dev/null +++ b/src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveInputStream.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.google.drive.source.fs; + +import com.google.api.services.drive.Drive; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; + +import java.io.IOException; +import java.io.InputStream; + +/** + * InputStream implementation for reading files from Google Drive. + * This class extends FSDataInputStream to provide Hadoop-compatible file reading capabilities. + */ +public class GoogleDriveInputStream extends FSDataInputStream implements Seekable, PositionedReadable { + public GoogleDriveInputStream(InputStream is) throws IOException { + super(is); + } +} diff --git a/src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveInputStreamWrapper.java b/src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveInputStreamWrapper.java new file mode 100644 index 00000000..e91260a8 --- /dev/null +++ b/src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveInputStreamWrapper.java @@ -0,0 +1,116 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.google.drive.source.fs; + +import com.google.api.services.drive.Drive; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; + +import java.io.IOException; +import java.io.InputStream; + +/** + * A wrapper for InputStream that implements Seekable and PositionedReadable interfaces. + * This is used to read files from Google Drive in a way that is compatible with Hadoop's file system APIs. + */ +public class GoogleDriveInputStreamWrapper extends InputStream implements Seekable, PositionedReadable { + private final Drive driveService; + private InputStream googleDriveStream; + private long currentPos = 0; + + private final Path filePath; + public GoogleDriveInputStreamWrapper(Drive driveService, Path filePath) throws IOException { + this.filePath = filePath; + this.driveService = driveService; + this.googleDriveStream = getDriveInputStream(filePath, 0); + } + + private InputStream getDriveInputStream(Path filePath, long pos) throws IOException { + String fileId = GoogleDriveUtils.getFileId(filePath); + if (fileId == null) { + throw new IOException("File not found in Google Drive: " + filePath); + } + Drive.Files.Get get = driveService.files().get(fileId); + get.getMediaHttpDownloader().setDirectDownloadEnabled(true); + get.getRequestHeaders().setRange(String.format("bytes=%d-", pos)); + return get.executeMediaAsInputStream(); + } + + @Override + public int read() throws IOException { + int result = googleDriveStream.read(); + if (result != -1) { + currentPos++; + } + return result; + } + + @Override + public void seek(long pos) throws IOException { + googleDriveStream.close(); + googleDriveStream = getDriveInputStreamForPosition(pos); + currentPos = pos; + } + + private InputStream getDriveInputStreamForPosition(long pos) throws IOException { + return getDriveInputStream(filePath, pos); + } + + @Override + public long getPos() throws IOException { + return currentPos; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + InputStream tempStream = getDriveInputStreamForPosition(position); + int bytesRead = tempStream.read(buffer, offset, length); + tempStream.close(); + return bytesRead; + } + + @Override + public void close() throws IOException { + googleDriveStream.close(); + super.close(); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + if (offset < 0 || length < 0 || offset + length > buffer.length) { + throw new IndexOutOfBoundsException("Invalid offset or length for readFully."); + } + int bytesRead = read(position, buffer, offset, length); + if (bytesRead < length) { + throw new IOException("Could not read the requested number of bytes from the stream."); + } + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + if (buffer == null) { + throw new NullPointerException("Byte array cannot be null."); + } + readFully(position, buffer, 0, buffer.length); + } +} diff --git a/src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveUtils.java b/src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveUtils.java new file mode 100644 index 00000000..eea70f95 --- /dev/null +++ b/src/main/java/io/cdap/plugin/google/drive/source/fs/GoogleDriveUtils.java @@ -0,0 +1,210 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.google.drive.source.fs; + +import com.google.api.services.drive.Drive; +import com.google.api.services.drive.model.File; +import com.google.api.services.drive.model.FileList; +import com.google.common.base.Strings; +import io.cdap.plugin.google.drive.source.GoogleDriveSourceConfig; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Utility class for Google Drive operations. + */ +public class GoogleDriveUtils { + /** + * A set of MIME types representing Google-native files in Google Drive that are not + * directly downloadable (e.g., Google Docs, Sheets, Slides, etc.). + * + * This set is used to filter out these types when listing files, ensuring only + * downloadable files such as CSV, JSON, etc., are returned. + * + * Common Google-native MIME types: + *
    + *
  • Google Docs: {@code application/vnd.google-apps.document}
  • + *
  • Google Sheets: {@code application/vnd.google-apps.spreadsheet}
  • + *
  • Google Slides: {@code application/vnd.google-apps.presentation}
  • + *
  • Google Forms: {@code application/vnd.google-apps.form}
  • + *
  • Google Drawings: {@code application/vnd.google-apps.drawing}
  • + *
  • Google Apps Scripts: {@code application/vnd.google-apps.script}
  • + *
  • Google Maps: {@code application/vnd.google-apps.map}
  • + *
+ * + * @see Google Drive MIME types + */ + private static final Set GOOGLE_NATIVE_MIME_TYPES = new HashSet<>(Arrays.asList( + "application/vnd.google-apps.document", + "application/vnd.google-apps.spreadsheet", + "application/vnd.google-apps.presentation", + "application/vnd.google-apps.form", + "application/vnd.google-apps.drawing", + "application/vnd.google-apps.script", + "application/vnd.google-apps.map" + )); + private static final String FIELDS_TO_RETURN = "id, name, mimeType, size, modifiedTime"; + private static final String GOOGLE_DRIVE_FOLDER_MIME_TYPE = "application/vnd.google-apps.folder"; + + /** + * Retrieves the file ID for a given file or directory in Google Drive using its name. + * + * @param filePath The {@link Path} whose corresponding Google Drive file ID is to be retrieved. + * @return The file ID as a {@link String}, or {@code null} if the file is not found. + */ + public static String getFileId(Path filePath) { + return filePath.getParent().getName(); // name is file name and parent is the file id + } + + /** + * Lists the files and directories under a specified Google Drive directory and returns + * an array of {@link FileStatus} objects representing their metadata. + * + * For files, the actual size is returned. For directories, the size is set to 0. + * All entries are marked with default permission, and placeholder owner and group values. + * + * @param driveService The authenticated {@link Drive} service instance used to interact with the Drive API. + * @param dirPath The Hadoop {@link Path} representing the target directory in Google Drive. + * @return An array of {@link FileStatus} objects for all files and subdirectories found in the given directory. + * @throws IOException If the directory is not found or the Drive API request fails. + */ + public static FileStatus[] listStatus(Drive driveService, Path dirPath, String filter) throws IOException { + List fileStatuses = new ArrayList<>(); + + String dirId = dirPath.getName(); + if (dirId == null) { + throw new IOException("Directory not found: " + dirPath); + } + + // Query Google Drive for files in the directory + StringBuilder query = new StringBuilder("'" + dirId + "' in parents and trashed = false"); + query.append(" and ").append(getGoogleNativeExclusionQuery()); + if (!Strings.isNullOrEmpty(filter)) { + query.append(" and ").append(filter); + } + + FileList result = driveService.files().list() + .setQ(query.toString()) + .setFields("files(id, name, mimeType, size, modifiedTime)") + .setSupportsAllDrives(true) + .setSupportsTeamDrives(true) + .setIncludeItemsFromAllDrives(true) + .execute(); + + for (File file : result.getFiles()) { + boolean isDirectory = GOOGLE_DRIVE_FOLDER_MIME_TYPE.equals(file.getMimeType()); + + Path filePathWithFilePrefix; + if (isDirectory) { + filePathWithFilePrefix = new Path(String.format("%s://%s%s/%s", + GoogleDriveSourceConfig.GOOGLE_DRIVE_SCHEMA, + GoogleDriveSourceConfig.GOOGLE_DRIVE_AUTHORITY, + GoogleDriveSourceConfig.GOOGLE_DRIVE_FOLDER_PATH_PREFIX, + file.getId())); + } else { + filePathWithFilePrefix = new Path(String.format("%s://%s%s/%s/%s", + GoogleDriveSourceConfig.GOOGLE_DRIVE_SCHEMA, + GoogleDriveSourceConfig.GOOGLE_DRIVE_AUTHORITY, + GoogleDriveSourceConfig.GOOGLE_DRIVE_FILE_PATH_PREFIX, + file.getId(), file.getName())); + } + FileStatus fileStatus = new FileStatus( + isDirectory ? 0 : file.getSize(), + isDirectory, + 1, + file.getSize() == null ? 0 : file.getSize(), + file.getModifiedTime().getValue(), + 0, + FsPermission.getDefault(), + "owner", + "group", + filePathWithFilePrefix + ); + + fileStatuses.add(fileStatus); + } + + return fileStatuses.toArray(new FileStatus[0]); + } + + /** + * Retrieves the status of a specific file or directory in Google Drive. + * @param driveService The authenticated {@link Drive} service instance used to interact with the Drive API. + * @param path The Hadoop {@link Path} representing the target file or directory. + * @param isDir Indicates whether the path is a directory. + * @param objectId The Google Drive file ID corresponding to the path. + */ + public static FileStatus listObjectStatus(Drive driveService, Path path, boolean isDir, String objectId) + throws IOException { + File fileSummary = driveService.files() + .get(objectId) + .setFields(FIELDS_TO_RETURN) + .setSupportsAllDrives(true) + .execute(); + + Path pathWithFileName = path; + // if file name not equal to default file name, then we need to change the path with correct file name + if (!isDir && !fileSummary.getName().equals(GoogleDriveSourceConfig.GOOGLE_DRIVE_DEFAULT_FILENAME) && + path.getName().equals(GoogleDriveSourceConfig.GOOGLE_DRIVE_DEFAULT_FILENAME)) { + pathWithFileName = new Path(path.getParent(), fileSummary.getName()); + } + return new FileStatus( + fileSummary.getSize() == null ? 0 : fileSummary.getSize(), + isDir, + 1, + fileSummary.getSize() == null ? 0 : fileSummary.getSize(), + fileSummary.getModifiedTime().getValue(), + 0, + FsPermission.getDefault(), + "owner", + "group", + pathWithFileName + ); + } + + /** + * Builds a Google Drive API query string that excludes all Google-native file types + * defined in {@link #GOOGLE_NATIVE_MIME_TYPES}. + * + * The generated string will be used in the Drive `files().list().setQ(...)` query to + * filter out files such as Google Docs, Sheets, Slides, etc., which are not + * directly downloadable. + * + * Example output: + *
+   *   mimeType != 'application/vnd.google-apps.document' and
+   *   mimeType != 'application/vnd.google-apps.spreadsheet' and
+   *   ...
+   * 
+ * + * @return A query string that excludes all Google-native MIME types using "and" conditions. + */ + private static String getGoogleNativeExclusionQuery() { + return GOOGLE_NATIVE_MIME_TYPES.stream() + .map(mime -> "mimeType != '" + mime + "'") + .collect(Collectors.joining(" and ")); + } +} diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/SheetTransformer.java b/src/main/java/io/cdap/plugin/google/sheets/source/SheetTransformer.java index 33bdba90..55ed8f9a 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/SheetTransformer.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/SheetTransformer.java @@ -69,8 +69,8 @@ public static StructuredRecord transform(RowRecord rowRecord, Schema schema, boo builder.set(metadataRecordName, rowRecord.getMetadata()); } else { ComplexSingleValueColumn complexSingleValueColumn = rowRecord.getHeaderedCells().get(name); - if (complexSingleValueColumn == null || (complexSingleValueColumn.getData() == null - && complexSingleValueColumn.getSubColumns().isEmpty())) { + if (complexSingleValueColumn == null || complexSingleValueColumn.getData() == null + || complexSingleValueColumn.getSubColumns() == null || complexSingleValueColumn.getSubColumns().isEmpty()) { builder.set(name, null); } else { processCellData(builder, field, complexSingleValueColumn); @@ -129,7 +129,7 @@ private static StructuredRecord processRecord(Schema fieldSchema, ComplexSingleV for (Schema.Field subField : fieldSchema.getFields()) { String subFieldName = subField.getName(); ComplexSingleValueColumn complexSubColumn = complexSingleValueColumn.getSubColumns().get(subFieldName); - if (complexSubColumn.getData() == null) { + if (complexSubColumn == null || complexSubColumn.getData() == null) { builder.set(subFieldName, null); } else { processCellData(builder, subField, complexSubColumn);