|
30 | 30 | import java.util.Set; |
31 | 31 | import java.util.concurrent.ForkJoinPool; |
32 | 32 | import java.util.concurrent.ForkJoinTask; |
| 33 | +import java.util.function.Function; |
33 | 34 | import java.util.stream.Collectors; |
34 | 35 | import java.util.stream.Stream; |
35 | 36 |
|
@@ -113,24 +114,60 @@ public byte[] evaluateAll(Evaluator evaluator, byte[] dictBytes, String[] dropCo |
113 | 114 | public Map<String, ?> evaluateAll(Evaluator evaluator, Map<String, ?> argumentsDict, Set<String> dropColumns, int parallelism){ |
114 | 115 | Table argumentsTable = parseDict(argumentsDict); |
115 | 116 |
|
116 | | - EvaluatorFunction function = new EvaluatorFunction(evaluator); |
| 117 | + Function<Map<String, ?>, Object> function; |
117 | 118 |
|
118 | | - List<ResultField> resultFields = Stream.concat( |
119 | | - (evaluator.getTargetFields()).stream(), |
120 | | - (evaluator.getOutputFields()).stream() |
121 | | - ) |
122 | | - .filter(resultField -> { |
123 | | - String name = resultField.getName(); |
| 119 | + TableCollector tableCollector; |
124 | 120 |
|
125 | | - if(dropColumns != null && dropColumns.contains(name)){ |
126 | | - return false; |
| 121 | + if(evaluator != null){ |
| 122 | + function = new EvaluatorFunction(evaluator); |
| 123 | + |
| 124 | + List<ResultField> resultFields = Stream.concat( |
| 125 | + (evaluator.getTargetFields()).stream(), |
| 126 | + (evaluator.getOutputFields()).stream() |
| 127 | + ) |
| 128 | + .filter(resultField -> { |
| 129 | + String name = resultField.getName(); |
| 130 | + |
| 131 | + if(dropColumns != null && dropColumns.contains(name)){ |
| 132 | + return false; |
| 133 | + } |
| 134 | + |
| 135 | + return true; |
| 136 | + }) |
| 137 | + .collect(Collectors.toList()); |
| 138 | + |
| 139 | + tableCollector = new ResultTableCollector(resultFields, true); |
| 140 | + } else |
| 141 | + |
| 142 | + { |
| 143 | + function = (arguments) -> arguments; |
| 144 | + |
| 145 | + tableCollector = new TableCollector(){ |
| 146 | + |
| 147 | + @Override |
| 148 | + protected Table createFinisherTable(int initialSize){ |
| 149 | + return super.createFinisherTable(initialSize); |
127 | 150 | } |
128 | 151 |
|
129 | | - return true; |
130 | | - }) |
131 | | - .collect(Collectors.toList()); |
| 152 | + @Override |
| 153 | + protected Table.Row createFinisherRow(Table table){ |
| 154 | + Table.Row result = table.new Row(0, -1){ |
| 155 | + |
| 156 | + @Override |
| 157 | + public Object put(String key, Object value){ |
132 | 158 |
|
133 | | - TableCollector tableCollector = new ResultTableCollector(resultFields, true); |
| 159 | + if(dropColumns != null && dropColumns.contains(key)){ |
| 160 | + return null; |
| 161 | + } |
| 162 | + |
| 163 | + return super.put(key, value); |
| 164 | + } |
| 165 | + }; |
| 166 | + |
| 167 | + return result; |
| 168 | + } |
| 169 | + }; |
| 170 | + } |
134 | 171 |
|
135 | 172 | Table resultsTable; |
136 | 173 |
|
|
0 commit comments