Conversation
| */ | ||
| public Schema getSchema() { | ||
| if (schema == null) { | ||
| throw new IllegalArgumentException("Schema cannot be null."); |
There was a problem hiding this comment.
this error type should not be thrown during validation phase. Should be InvalidConfigPropertyException. Also IAE usually thrown on illegal method param, in cases like this it looks more like IllegalStateException
There was a problem hiding this comment.
Changed to:
/**
* @return the schema of the dataset
*/
@Nullable
public Schema getSchema() {
try {
return null == schema ? null : Schema.parseJson(schema);
} catch (IOException e) {
throw new InvalidConfigPropertyException("Invalid schema", e, MongoDBConstants.SCHEMA);
}
}
| @Override | ||
| public void configurePipeline(PipelineConfigurer pipelineConfigurer) { | ||
| super.configurePipeline(pipelineConfigurer); | ||
| Schema schema = config.getSchema(); |
There was a problem hiding this comment.
Schema may be null on this stage
There was a problem hiding this comment.
Changed to validate the config first:
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
config.validate();
Schema schema = config.getSchema();
...
| return builder.build(); | ||
| } | ||
|
|
||
| public static void validateSchema(Schema schema) { |
There was a problem hiding this comment.
this class should not be responsible for validation
| case BOOLEAN: | ||
| case STRING: | ||
| return object; | ||
| case NULL: |
There was a problem hiding this comment.
I don't think this case is possible
| @Description("User to use to connect to the specified database. Required for databases that " + | ||
| "need authentication. Optional for databases that do not require authentication.") | ||
| @Nullable | ||
| public String user; |
There was a problem hiding this comment.
why no fields support macro?
| @Override | ||
| public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, BSONWritable>> emitter) | ||
| throws Exception { | ||
| BasicDBObjectBuilder bsonBuilder = BasicDBObjectBuilder.start(); |
There was a problem hiding this comment.
this transformation should also be excluded to separate class, usually with Transformer suffix
There was a problem hiding this comment.
RecordToBSONWritableTransformer class created.
| lineageRecorder.createExternalDataset(context.getInputSchema()); | ||
| List<Schema.Field> fields = context.getInputSchema().getFields(); | ||
| if (fields != null && !fields.isEmpty()) { | ||
| lineageRecorder.recordWrite("Write", "Wrote to MongoDB collection.", |
There was a problem hiding this comment.
we could add collection name here
There was a problem hiding this comment.
Collection name added.
| throw new InvalidConfigPropertyException("Host must be specified", MongoDBConstants.HOST); | ||
| } | ||
| if (!containsMacro(MongoDBConstants.PORT)) { | ||
| if (null == port) { |
There was a problem hiding this comment.
null should be on the right side
| @Override | ||
| public void configurePipeline(PipelineConfigurer pipelineConfigurer) { | ||
| super.configurePipeline(pipelineConfigurer); | ||
| config.validate(); |
There was a problem hiding this comment.
should we have input schema validation?
There was a problem hiding this comment.
MongoDBBatchSink#validateInputSchema method created to check for supported types:
private void validateInputSchema(Schema inputSchema) {
List<Schema.Field> fields = inputSchema.getFields();
if (fields == null || fields.isEmpty()) {
throw new InvalidStageException("Input schema should contain fields");
}
for (Schema.Field field : fields) {
Schema.Type fieldType = field.getSchema().isNullable() ?
field.getSchema().getNonNullable().getType() : field.getSchema().getType();
if (!SUPPORTED_FIELD_TYPES.contains(fieldType)) {
String supportedTypes = SUPPORTED_FIELD_TYPES.stream()
.map(Enum::name)
.map(String::toLowerCase)
.collect(Collectors.joining(", "));
String errorMessage = String.format("Field '%s' is of unsupported type '%s'. Supported types are: %s.",
field.getName(), fieldType, supportedTypes);
throw new InvalidStageException(errorMessage);
}
}
}
| super.validate(); | ||
| if (!containsMacro(MongoDBConstants.SCHEMA)) { | ||
| Schema parsedSchema = getSchema(); | ||
| if (null == parsedSchema) { |
There was a problem hiding this comment.
null should be on the right side
| private final Schema schema; | ||
|
|
||
| public BSONConverter(Schema schema) throws IOException { | ||
| public BSONObjectToRecordTransformer(Schema schema) { |
There was a problem hiding this comment.
please add unit tests for both transformers
There was a problem hiding this comment.
| @@ -18,9 +18,12 @@ | |||
|
|
|||
| import com.google.common.base.Strings; | |||
There was a problem hiding this comment.
please add unit tests for both configs
There was a problem hiding this comment.
JIRA: https://issues.cask.co/browse/CDAP-15266
WIKI: https://wiki.cask.co/display/CE/MongoDB+database+plugin
In the scope of this PR:
Plugins tested with current MongoDB release 4.0.10.