Skip to content
This repository was archived by the owner on Jan 23, 2021. It is now read-only.

Commit 5554b32

Browse files
author
Krzysztof Ploch
authored
CAF-3290 Ability to provide base workflow overlay / extensions (#14)
* CAF-3290 Ability to provide base workflow overlay / extensions * CAF-3290 Ability to provide base workflow overlay / extensions - code comments * CAF-3290 Ability to provide base workflow overlay / extensions - test issue
1 parent b9919db commit 5554b32

File tree

10 files changed

+1078
-22
lines changed

10 files changed

+1078
-22
lines changed

utils/data-processing-initialization/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,10 @@
5353
<groupId>commons-io</groupId>
5454
<artifactId>commons-io</artifactId>
5555
</dependency>
56+
<dependency>
57+
<groupId>junit</groupId>
58+
<artifactId>junit</artifactId>
59+
<scope>test</scope>
60+
</dependency>
5661
</dependencies>
5762
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Copyright 2015-2017 Hewlett Packard Enterprise Development LP.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.cafdataprocessing.utilities.initialization;
17+
18+
import com.github.cafdataprocessing.utilities.initialization.jsonobjects.ActionJson;
19+
import com.github.cafdataprocessing.utilities.initialization.jsonobjects.MergeMode;
20+
import com.github.cafdataprocessing.utilities.initialization.jsonobjects.ProcessingRuleJson;
21+
import com.github.cafdataprocessing.utilities.initialization.jsonobjects.WorkflowJson;
22+
import com.github.cafdataprocessing.utilities.initialization.jsonobjects.conditions.ConditionAdditionalJson;
23+
import com.github.cafdataprocessing.utilities.initialization.jsonobjects.conditions.ConditionJson;
24+
import com.google.common.base.Strings;
25+
import org.apache.commons.lang.NotImplementedException;
26+
27+
import java.util.LinkedHashMap;
28+
import java.util.Map;
29+
import java.util.Objects;
30+
import java.util.Optional;
31+
32+
/**
33+
* Class responsible for merging of two workflow objects - a base one and other one with overrides and / or extensions.
34+
*/
35+
public class WorkflowCombiner
36+
{
37+
/**
38+
* Combines (merges) two workflow objects.
39+
* Merging is based on the name attributes of subtypes. For example, if a {@link ProcessingRuleJson} has
40+
* a name "Metadata Processing" and the {@code overlayWorkflow} also has a {@link ProcessingRuleJson} with the same
41+
* name ("Metadata Processing"), properties of the processing rule in the overlay will be merged into the original
42+
* workflow. Merging will either extend the original workflow or will override values in the original object.
43+
* If a particular value is not set in the original workflow, it will be added. If it exists - it will be overridden.
44+
* There is an option to force complete replacement behaviour. {@link ProcessingRuleJson} and {@link ActionJson} have
45+
* {@link MergeMode} property. If this property is set to {@code REPLACE}, values in matched (by name) in
46+
* Processing Rule or Action will replace values in the {@code targetOriginalWorkflow}.
47+
* {@link MergeMode} value for both objects defaults to {@code MERGE}.
48+
*
49+
* @param targetOriginalWorkflow a base workflow to which apply the overrides and / or extensions
50+
* @param overlayWorkflow a workflow object with overrides and / or extensions
51+
*/
52+
public static void combineWorkflows(WorkflowJson targetOriginalWorkflow, WorkflowJson overlayWorkflow)
53+
{
54+
Objects.requireNonNull(targetOriginalWorkflow);
55+
if (overlayWorkflow == null) {
56+
return;
57+
}
58+
for (ProcessingRuleJson overlayProcessingRule : overlayWorkflow.processingRules) {
59+
Optional<ProcessingRuleJson> baseWorkflowProcessingRule = targetOriginalWorkflow.processingRules.stream().filter(rule -> rule.name.equals(overlayProcessingRule.name)).findFirst();
60+
if (baseWorkflowProcessingRule.isPresent()) {
61+
combineProcessingRule(baseWorkflowProcessingRule.get(), overlayProcessingRule);
62+
}
63+
else {
64+
targetOriginalWorkflow.processingRules.add(overlayProcessingRule);
65+
}
66+
}
67+
}
68+
69+
private static void combineProcessingRule(ProcessingRuleJson processingRule, ProcessingRuleJson overlayProcessingRule)
70+
{
71+
if (overlayProcessingRule.mergeMode == MergeMode.REPLACE) {
72+
processingRule.enabled = overlayProcessingRule.enabled;
73+
processingRule.priority = overlayProcessingRule.priority;
74+
processingRule.description = overlayProcessingRule.description;
75+
processingRule.actions = overlayProcessingRule.actions;
76+
processingRule.ruleConditions = overlayProcessingRule.ruleConditions;
77+
}
78+
if (overlayProcessingRule.enabled != null) {
79+
processingRule.enabled = overlayProcessingRule.enabled;
80+
}
81+
if (overlayProcessingRule.priority != null) {
82+
processingRule.priority = overlayProcessingRule.priority;
83+
}
84+
if (!Strings.isNullOrEmpty(overlayProcessingRule.description)) {
85+
processingRule.description = overlayProcessingRule.description;
86+
}
87+
if (overlayProcessingRule.actions != null) {
88+
for (ActionJson overlayAction : overlayProcessingRule.actions) {
89+
Optional<ActionJson> actionJson = processingRule.actions.stream().filter(action -> action.name.equals(overlayAction.name)).findFirst();
90+
if (actionJson.isPresent()) {
91+
combineAction(actionJson.get(), overlayAction);
92+
}
93+
else {
94+
processingRule.actions.add(overlayAction);
95+
}
96+
}
97+
}
98+
if (overlayProcessingRule.ruleConditions != null) {
99+
if (processingRule.ruleConditions == null) {
100+
processingRule.ruleConditions = overlayProcessingRule.ruleConditions;
101+
}
102+
else {
103+
for (ConditionJson overlayActionCondition : overlayProcessingRule.ruleConditions) {
104+
Optional<ConditionJson> originalCondition = processingRule.ruleConditions.stream().filter(condition -> condition.name.equals(overlayActionCondition.name)).findFirst();
105+
if (originalCondition.isPresent()) {
106+
combineCondition(originalCondition.get(), overlayActionCondition);
107+
}
108+
else {
109+
processingRule.ruleConditions.add(overlayActionCondition);
110+
}
111+
}
112+
}
113+
}
114+
}
115+
116+
private static void combineAction(ActionJson action, ActionJson overlayAction)
117+
{
118+
if (overlayAction.mergeMode == MergeMode.REPLACE) {
119+
action.order = overlayAction.order;
120+
action.typeId = overlayAction.typeId;
121+
action.typeName = overlayAction.typeName;
122+
action.description = overlayAction.description;
123+
action.settings = overlayAction.settings;
124+
action.actionConditions = overlayAction.actionConditions;
125+
return;
126+
}
127+
if (!Strings.isNullOrEmpty(overlayAction.name)) {
128+
action.name = overlayAction.name;
129+
}
130+
if (overlayAction.order != null) {
131+
action.order = overlayAction.order;
132+
}
133+
if (overlayAction.typeId != null) {
134+
action.typeId = overlayAction.typeId;
135+
}
136+
if (!Strings.isNullOrEmpty(overlayAction.typeName)) {
137+
action.typeName = overlayAction.typeName;
138+
}
139+
if (!Strings.isNullOrEmpty(overlayAction.description)) {
140+
action.description = overlayAction.description;
141+
}
142+
for (Map.Entry<String, Object> overlaySettingsEntry : overlayAction.settings.entrySet()) {
143+
action.settings.put(overlaySettingsEntry.getKey(), overlaySettingsEntry.getValue());
144+
}
145+
146+
if (overlayAction.actionConditions != null) {
147+
if (action.actionConditions == null) {
148+
action.actionConditions = overlayAction.actionConditions;
149+
}
150+
else {
151+
for (ConditionJson overlayActionCondition : overlayAction.actionConditions) {
152+
Optional<ConditionJson> originalCondition = action.actionConditions.stream().filter(condition -> condition.name.equals(overlayActionCondition.name)).findFirst();
153+
if (originalCondition.isPresent()) {
154+
combineCondition(originalCondition.get(), overlayActionCondition);
155+
}
156+
else {
157+
action.actionConditions.add(overlayActionCondition);
158+
}
159+
}
160+
}
161+
}
162+
}
163+
164+
165+
166+
private static void combineCondition(ConditionJson condition, ConditionJson overlayCondition)
167+
{
168+
ConditionAdditionalJson overlayAdditional = overlayCondition.additional;
169+
if (condition.additional == null) {
170+
condition.additional = overlayAdditional;
171+
return;
172+
}
173+
if (overlayAdditional != null) {
174+
if (!Strings.isNullOrEmpty(overlayAdditional.notes)) {
175+
condition.additional.notes = overlayAdditional.notes;
176+
}
177+
if (!Strings.isNullOrEmpty(overlayAdditional.type)) {
178+
condition.additional.type = overlayAdditional.type;
179+
}
180+
if (overlayAdditional.order != null) {
181+
condition.additional.order = overlayAdditional.order;
182+
}
183+
}
184+
}
185+
}

utils/data-processing-initialization/src/main/java/com/github/cafdataprocessing/utilities/initialization/WorkflowInitializer.java

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@
2828
import com.github.cafdataprocessing.utilities.initialization.jsonobjects.ProcessingRuleJson;
2929
import com.github.cafdataprocessing.utilities.initialization.jsonobjects.WorkflowJson;
3030
import com.github.cafdataprocessing.utilities.initialization.jsonobjects.conditions.ConditionJson;
31+
import com.google.common.base.Strings;
3132
import org.slf4j.Logger;
3233
import org.slf4j.LoggerFactory;
3334

3435
import java.io.File;
3536
import java.io.IOException;
37+
import java.io.InputStream;
3638
import java.util.ArrayList;
3739
import java.util.LinkedHashMap;
3840
import java.util.List;
@@ -117,16 +119,70 @@ private void retrieveActionTypeInformation(String projectId) throws ApiException
117119
* @throws IOException If unable to read the workflow input file.
118120
* @throws ApiException Thrown if there is an error communicating with the API or if error response received from API.
119121
*/
120-
public long initializeWorkflowBaseData(String inputFileName, String projectId) throws IOException, ApiException {
122+
public long initializeWorkflowBaseData(String inputFileName, String projectId) throws IOException, ApiException
123+
{
124+
return initializeWorkflowBaseData(inputFileName, null, projectId);
125+
}
126+
127+
/**
128+
* Creates a processing workflow derived from the information in the input file provided.
129+
* @param inputFileName File containing workflow in JSON format that should be created.
130+
* @param overlayFile Optional file containing extensions / overrides to merge with the base workflow specified in
131+
* inputFileName. Pass null if you do not want any overrides to be applied.
132+
* @param projectId ProjectId to create workflow under.
133+
* @return ID of created workflow.
134+
* @throws IOException If unable to read the workflow input file.
135+
* @throws ApiException Thrown if there is an error communicating with the API or if error response received from API.
136+
*/
137+
public long initializeWorkflowBaseData(String inputFileName, String overlayFile, String projectId) throws IOException, ApiException {
121138
//retrieve action types that this project ID has available
122-
retrieveActionTypeInformation(projectId);
139+
123140
//read in the json representation of the workflow
124-
WorkflowJson workflowToCreate = readInputFile(inputFileName);
141+
final WorkflowJson workflowToCreate = readInputFile(inputFileName);
142+
final WorkflowJson overlayWorkflow = Strings.isNullOrEmpty(overlayFile) ? null : readInputFile(overlayFile);
143+
125144
//create the workflow and return ID of created workflow
126-
return createWorkflow(workflowToCreate, projectId);
145+
return createWorkflow(workflowToCreate, overlayWorkflow, projectId);
127146
}
128147

129-
private long createWorkflow(WorkflowJson workflow, String projectId) throws ApiException {
148+
/**
149+
* Creates a processing workflow derived from the information in the input file provided.
150+
* @param workflowBaseDataStream Input stream containing workflow in JSON format that should be created.
151+
* @param workflowOverlayStream Optional input stream containing extensions / overrides to merge with the base
152+
* workflow specified in inputFileName. Pass null if you do not want any overrides
153+
* to be applied.
154+
* @param projectId ProjectId to create workflow under.
155+
* @return ID of created workflow.
156+
* @throws IOException If unable to read the workflow input file.
157+
* @throws ApiException Thrown if there is an error communicating with the API or if error response received from API.
158+
*/
159+
public long initializeWorkflowBaseData(InputStream workflowBaseDataStream, InputStream workflowOverlayStream, String projectId) throws IOException, ApiException {
160+
161+
//read in the json representation of the workflow
162+
final WorkflowJson workflowToCreate = readInputFile(workflowBaseDataStream);
163+
final WorkflowJson overlayWorkflow = workflowOverlayStream == null ? null : readInputFile(workflowOverlayStream);
164+
165+
//create the workflow and return ID of created workflow
166+
return createWorkflow(workflowToCreate, overlayWorkflow, projectId);
167+
}
168+
169+
/**
170+
* Creates a processing workflow derived from the information in the {@code workflow} object. Optional extensions or
171+
* overrides can be provided in the {@code workflowOverlay} parameter.
172+
* See {@link WorkflowCombiner} for more information on merging of two {@link WorkflowJson} objects.
173+
* @param workflow An object describing workflow to be created.
174+
* @param workflowOverlay An optional object describing extensions or overrides to the {@code workflow} parameter.
175+
* @param projectId A project id
176+
* @return Newly created workflow id.
177+
* @throws ApiException in case of a failure to create a workflow.
178+
*/
179+
public long createWorkflow(WorkflowJson workflow, WorkflowJson workflowOverlay, String projectId) throws ApiException {
180+
181+
//retrieve action types that this project ID has available
182+
retrieveActionTypeInformation(projectId);
183+
184+
WorkflowCombiner.combineWorkflows(workflow, workflowOverlay);
185+
130186
ExistingWorkflow createdWorkflow = workflowsApi.createWorkflow(projectId, workflow.toApiBaseWorkflow());
131187
long createdWorkflowId = createdWorkflow.getId();
132188
for(ProcessingRuleJson ruleToCreate: workflow.processingRules) {
@@ -182,6 +238,15 @@ private WorkflowJson readInputFile(String inputFile) throws IOException {
182238
}
183239
}
184240

241+
private WorkflowJson readInputFile(InputStream inputStream) throws IOException
242+
{
243+
try {
244+
return mapper.readValue(inputStream, WorkflowJson.class);
245+
} catch (IOException e) {
246+
throw new IOException("Failure trying to deserialize the workflow base data input file. Please check the format of the file contents.", e);
247+
}
248+
}
249+
185250
private void replaceBoilerplateNamesIfRequired(Action action){
186251
//determine if this is a boilerplate action
187252
Long actionTypeId = action.getTypeId();

utils/data-processing-initialization/src/main/java/com/github/cafdataprocessing/utilities/initialization/jsonobjects/ActionJson.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@
2929
* JSON representation of a data processing action for use with task submitter application
3030
*/
3131
public class ActionJson {
32-
public final String name;
33-
public final String description;
34-
public final Integer order;
35-
public final LinkedHashMap<String, Object> settings;
36-
public final List<ConditionJson> actionConditions;
32+
public String name;
33+
public String description;
34+
public Integer order;
35+
public LinkedHashMap<String, Object> settings;
36+
public List<ConditionJson> actionConditions;
37+
public MergeMode mergeMode;
3738

3839
//The name of the type will be used to retrieve the actual action type ID if set
3940
public String typeName;
@@ -46,18 +47,17 @@ public ActionJson(@JsonProperty(value= "name", required = true)String name,
4647
@JsonProperty(value= "settings")LinkedHashMap<String, Object> settings,
4748
@JsonProperty(value= "actionConditions")List<ConditionJson> actionConditions,
4849
@JsonProperty(value= "typeName")String typeName,
49-
@JsonProperty(value= "typeId")Long typeId){
50+
@JsonProperty(value= "typeId")Long typeId,
51+
@JsonProperty(value = "mergeMode") MergeMode mergeMode){
5052
this.name = name;
5153
this.description = description;
5254
this.order = order;
5355
this.settings = settings == null ? new LinkedHashMap<>() : settings;
5456
this.actionConditions = actionConditions;
55-
56-
if(typeId==null && Strings.isNullOrEmpty(typeName)){
57-
throw new RuntimeException(new JsonMappingException("'typeId' or 'typeName' property must be set on action. Neither currently set on action with name: "+name));
58-
}
5957
this.typeId = typeId;
6058
this.typeName = typeName;
59+
60+
this.mergeMode = mergeMode == null ? MergeMode.MERGE : mergeMode;
6161
}
6262

6363
public Action toApiAction(Map<String, Long> typeNamesToIds){
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2015-2017 Hewlett Packard Enterprise Development LP.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.cafdataprocessing.utilities.initialization.jsonobjects;
17+
18+
/**
19+
* Specifies behaviour when merging workflow processing rules and actions.
20+
*/
21+
public enum MergeMode
22+
{
23+
MERGE,
24+
REPLACE
25+
}

0 commit comments

Comments
 (0)