Skip to content

Commit abab89c

Browse files
author
Björn Bamberg
committed
Merge branch 'main' into using-key-aggregate
2 parents dcc8cb3 + b83205f commit abab89c

177 files changed

Lines changed: 3235 additions & 7700 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/regression/micro.csv

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,5 @@ benchmark/micro/logger/logging_overhead/parquet_q1_with_default_logging.benchmar
3232
benchmark/micro/logger/logging_overhead/duckdb_persistent_q1_with_default_logging.benchmark
3333
benchmark/micro/logger/storage/file/log_message_size/huge_string.benchmark
3434
benchmark/micro/logger/storage/file/log_message_size/small_string.benchmark
35-
benchmark/micro/filter/choose_correct_filter_function.benchmark
35+
benchmark/micro/filter/choose_correct_filter_function.benchmark
36+
benchmark/micro/optimizer/topn_window_elimination.benchmark
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# name: benchmark/micro/optimizer/topn_window_elimination.benchmark
2+
# description: Benchmark of top n window elimination
3+
# group: [optimizer]
4+
5+
name TopN Window Elimination
6+
group micro
7+
subgroup optimizer
8+
9+
load
10+
CREATE TABLE metrics AS (
11+
SELECT k, '2001-01-01 00:00:00'::TIMESTAMP + INTERVAL (v) MINUTE AS tm, v AS v1, v % 1000 AS v2, v % 100 as v3
12+
FROM range(0,100000) vals(v), range(0,100) keys(k)
13+
);
14+
CREATE TABLE tags AS (
15+
SELECT k, CAST(hash(k+1) AS VARCHAR) t1, CAST(hash(k+2) AS VARCHAR) t2, CAST(hash(k+3) AS VARCHAR) t3,
16+
FROM range(0,100) keys(k)
17+
);
18+
19+
run
20+
SELECT * FROM tags t INNER JOIN LATERAL (SELECT * FROM metrics m WHERE m.k = t.k ORDER BY tm DESC LIMIT 1) AS b ON true ORDER BY t.k, b.tm DESC;
21+
SELECT * FROM tags t INNER JOIN LATERAL (SELECT * FROM metrics m WHERE m.k = t.k ORDER BY tm DESC LIMIT 3) AS b ON true ORDER BY t.k, b.tm DESC;
22+
SELECT * FROM tags t, (SELECT *, row_number() OVER (PARTITION BY m.k ORDER BY m.tm DESC) rn FROM metrics m QUALIFY rn <= 1) m WHERE t.k = m.k;
23+
SELECT * FROM tags t, (SELECT *, row_number() OVER (PARTITION BY m.k ORDER BY m.tm DESC) rn FROM metrics m QUALIFY rn <= 3) m WHERE t.k = m.k;

extension/parquet/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ set(PARQUET_EXTENSION_FILES
2121
parquet_multi_file_info.cpp
2222
parquet_metadata.cpp
2323
parquet_reader.cpp
24+
parquet_field_id.cpp
2425
parquet_statistics.cpp
2526
parquet_timestamp.cpp
2627
parquet_writer.cpp
28+
parquet_shredding.cpp
2729
serialize_parquet.cpp
2830
zstd_file_system.cpp
2931
geo_parquet.cpp)

extension/parquet/column_writer.cpp

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,8 @@ void ColumnWriter::HandleDefineLevels(ColumnWriterState &state, ColumnWriterStat
246246

247247
ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::SchemaElement> &schemas,
248248
const LogicalType &type, const string &name,
249-
optional_ptr<const ChildFieldIDs> field_ids, idx_t max_repeat,
249+
optional_ptr<const ChildFieldIDs> field_ids,
250+
optional_ptr<const ShreddingType> shredding_types, idx_t max_repeat,
250251
idx_t max_define, bool can_have_nulls) {
251252
auto null_type = can_have_nulls ? FieldRepetitionType::OPTIONAL : FieldRepetitionType::REQUIRED;
252253
if (!can_have_nulls) {
@@ -263,6 +264,10 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
263264
child_field_ids = &field_id->child_field_ids;
264265
}
265266
}
267+
optional_ptr<const ShreddingType> shredding_type;
268+
if (shredding_types) {
269+
shredding_type = shredding_types->GetChild(name);
270+
}
266271

267272
if (type.id() == LogicalTypeId::STRUCT && type.GetAlias() == "PARQUET_VARIANT") {
268273
// variant type
@@ -273,32 +278,53 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
273278
// [<typed_value>]
274279
// }
275280

276-
const bool is_shredded = false;
281+
const bool is_shredded = shredding_type != nullptr;
282+
283+
child_list_t<LogicalType> child_types;
284+
child_types.emplace_back("metadata", LogicalType::BLOB);
285+
child_types.emplace_back("value", LogicalType::BLOB);
286+
if (is_shredded) {
287+
auto &typed_value_type = shredding_type->type;
288+
if (typed_value_type.id() != LogicalTypeId::ANY) {
289+
child_types.emplace_back("typed_value",
290+
VariantColumnWriter::TransformTypedValueRecursive(typed_value_type));
291+
}
292+
}
277293

278294
// variant group
279295
duckdb_parquet::SchemaElement top_element;
280296
top_element.repetition_type = null_type;
281-
top_element.num_children = is_shredded ? 3 : 2;
297+
top_element.num_children = child_types.size();
282298
top_element.logicalType.__isset.VARIANT = true;
283299
top_element.logicalType.VARIANT.__isset.specification_version = true;
284300
top_element.logicalType.VARIANT.specification_version = 1;
285301
top_element.__isset.logicalType = true;
286302
top_element.__isset.num_children = true;
287303
top_element.__isset.repetition_type = true;
304+
top_element.name = name;
288305
schemas.push_back(std::move(top_element));
289306

290-
child_list_t<LogicalType> child_types;
291-
child_types.emplace_back("metadata", LogicalType::BLOB);
292-
child_types.emplace_back("value", LogicalType::BLOB);
293-
if (is_shredded) {
294-
throw NotImplementedException("Writing shredded VARIANT isn't supported for Parquet yet");
295-
}
296-
297307
ParquetColumnSchema variant_column(name, type, max_define, max_repeat, schema_idx, 0);
298308
variant_column.children.reserve(child_types.size());
299309
for (auto &child_type : child_types) {
310+
auto &child_name = child_type.first;
311+
bool is_optional;
312+
if (child_name == "metadata") {
313+
is_optional = false;
314+
} else if (child_name == "value") {
315+
if (is_shredded) {
316+
//! When shredding the variant, the 'value' becomes optional
317+
is_optional = true;
318+
} else {
319+
is_optional = false;
320+
}
321+
} else {
322+
D_ASSERT(child_name == "typed_value");
323+
is_optional = true;
324+
}
300325
variant_column.children.emplace_back(FillParquetSchema(schemas, child_type.second, child_type.first,
301-
child_field_ids, max_repeat, max_define + 1, false));
326+
child_field_ids, shredding_type, max_repeat,
327+
max_define + 1, is_optional));
302328
}
303329
return variant_column;
304330
}
@@ -324,7 +350,8 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
324350
struct_column.children.reserve(child_types.size());
325351
for (auto &child_type : child_types) {
326352
struct_column.children.emplace_back(FillParquetSchema(schemas, child_type.second, child_type.first,
327-
child_field_ids, max_repeat, max_define + 1));
353+
child_field_ids, shredding_type, max_repeat,
354+
max_define + 1, true));
328355
}
329356
return struct_column;
330357
}
@@ -360,8 +387,8 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
360387
schemas.push_back(std::move(repeated_element));
361388

362389
ParquetColumnSchema list_column(name, type, max_define, max_repeat, schema_idx, 0);
363-
list_column.children.push_back(
364-
FillParquetSchema(schemas, child_type, "element", child_field_ids, max_repeat + 1, max_define + 2));
390+
list_column.children.push_back(FillParquetSchema(schemas, child_type, "element", child_field_ids,
391+
shredding_type, max_repeat + 1, max_define + 2, true));
365392
return list_column;
366393
}
367394
if (type.id() == LogicalTypeId::MAP) {
@@ -408,8 +435,8 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
408435
for (idx_t i = 0; i < 2; i++) {
409436
// key needs to be marked as REQUIRED
410437
bool is_key = i == 0;
411-
auto child_schema = FillParquetSchema(schemas, kv_types[i], kv_names[i], child_field_ids, max_repeat + 1,
412-
max_define + 2, !is_key);
438+
auto child_schema = FillParquetSchema(schemas, kv_types[i], kv_names[i], child_field_ids, shredding_type,
439+
max_repeat + 1, max_define + 2, !is_key);
413440

414441
map_column.children.push_back(std::move(child_schema));
415442
}
@@ -441,8 +468,6 @@ ColumnWriter::CreateWriterRecursive(ClientContext &context, ParquetWriter &write
441468
path_in_schema.push_back(schema.name);
442469

443470
if (type.id() == LogicalTypeId::STRUCT && type.GetAlias() == "PARQUET_VARIANT") {
444-
D_ASSERT(schema.children.size() == 2); //! NOTE: shredded variants not supported yet
445-
446471
vector<unique_ptr<ColumnWriter>> child_writers;
447472
child_writers.reserve(schema.children.size());
448473
for (idx_t i = 0; i < schema.children.size(); i++) {

extension/parquet/include/column_writer.hpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class ParquetWriter;
1818
class ColumnWriterPageState;
1919
class PrimitiveColumnWriterState;
2020
struct ChildFieldIDs;
21+
struct ShreddingType;
2122
class ResizeableBuffer;
2223
class ParquetBloomFilter;
2324

@@ -88,10 +89,10 @@ class ColumnWriter {
8889
return column_schema.max_repeat;
8990
}
9091

91-
static ParquetColumnSchema FillParquetSchema(vector<duckdb_parquet::SchemaElement> &schemas,
92-
const LogicalType &type, const string &name,
93-
optional_ptr<const ChildFieldIDs> field_ids, idx_t max_repeat = 0,
94-
idx_t max_define = 1, bool can_have_nulls = true);
92+
static ParquetColumnSchema
93+
FillParquetSchema(vector<duckdb_parquet::SchemaElement> &schemas, const LogicalType &type, const string &name,
94+
optional_ptr<const ChildFieldIDs> field_ids, optional_ptr<const ShreddingType> shredding_types,
95+
idx_t max_repeat = 0, idx_t max_define = 1, bool can_have_nulls = true);
9596
//! Create the column writer for a specific type recursively
9697
static unique_ptr<ColumnWriter> CreateWriterRecursive(ClientContext &context, ParquetWriter &writer,
9798
const vector<duckdb_parquet::SchemaElement> &parquet_schemas,

extension/parquet/include/parquet.json

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@
116116
{
117117
"class": "FieldID",
118118
"includes": [
119-
"parquet_writer.hpp"
119+
"parquet_field_id.hpp"
120120
],
121121
"members": [
122122
{
@@ -140,7 +140,7 @@
140140
{
141141
"class": "ChildFieldIDs",
142142
"includes": [
143-
"parquet_writer.hpp"
143+
"parquet_field_id.hpp"
144144
],
145145
"members": [
146146
{
@@ -152,5 +152,45 @@
152152
}
153153
],
154154
"pointer_type": "none"
155+
},
156+
{
157+
"class": "ShreddingType",
158+
"includes": [
159+
"parquet_shredding.hpp"
160+
],
161+
"members": [
162+
{
163+
"id": 100,
164+
"name": "set",
165+
"type": "bool"
166+
},
167+
{
168+
"id": 101,
169+
"name": "type",
170+
"type": "LogicalType"
171+
},
172+
{
173+
"id": 102,
174+
"name": "children",
175+
"type": "ChildShreddingTypes"
176+
}
177+
],
178+
"pointer_type": "none"
179+
},
180+
{
181+
"class": "ChildShreddingTypes",
182+
"includes": [
183+
"parquet_shredding.hpp"
184+
],
185+
"members": [
186+
{
187+
"id": 100,
188+
"name": "types",
189+
"type": "case_insensitive_map_t<ShreddingType>",
190+
"serialize_property": "types.operator*()",
191+
"deserialize_property": "types.operator*()"
192+
}
193+
],
194+
"pointer_type": "none"
155195
}
156196
]
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#pragma once
2+
3+
#include "duckdb/common/serializer/buffered_file_writer.hpp"
4+
#include "duckdb/common/case_insensitive_map.hpp"
5+
6+
namespace duckdb {
7+
8+
struct FieldID;
9+
struct ChildFieldIDs {
10+
ChildFieldIDs();
11+
ChildFieldIDs Copy() const;
12+
unique_ptr<case_insensitive_map_t<FieldID>> ids;
13+
14+
void Serialize(Serializer &serializer) const;
15+
static ChildFieldIDs Deserialize(Deserializer &source);
16+
};
17+
18+
struct FieldID {
19+
public:
20+
static constexpr const auto DUCKDB_FIELD_ID = "__duckdb_field_id";
21+
FieldID();
22+
explicit FieldID(int32_t field_id);
23+
FieldID Copy() const;
24+
bool set;
25+
int32_t field_id;
26+
ChildFieldIDs child_field_ids;
27+
28+
void Serialize(Serializer &serializer) const;
29+
static FieldID Deserialize(Deserializer &source);
30+
31+
public:
32+
static void GenerateFieldIDs(ChildFieldIDs &field_ids, idx_t &field_id, const vector<string> &names,
33+
const vector<LogicalType> &sql_types);
34+
static void GetFieldIDs(const Value &field_ids_value, ChildFieldIDs &field_ids,
35+
unordered_set<uint32_t> &unique_field_ids,
36+
const case_insensitive_map_t<LogicalType> &name_to_type_map);
37+
};
38+
39+
} // namespace duckdb
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#pragma once
2+
3+
#include "duckdb/common/serializer/buffered_file_writer.hpp"
4+
#include "duckdb/common/case_insensitive_map.hpp"
5+
#include "duckdb/common/types/variant.hpp"
6+
7+
namespace duckdb {
8+
9+
struct ShreddingType;
10+
11+
struct ChildShreddingTypes {
12+
public:
13+
ChildShreddingTypes();
14+
15+
public:
16+
ChildShreddingTypes Copy() const;
17+
18+
public:
19+
void Serialize(Serializer &serializer) const;
20+
static ChildShreddingTypes Deserialize(Deserializer &source);
21+
22+
public:
23+
unique_ptr<case_insensitive_map_t<ShreddingType>> types;
24+
};
25+
26+
struct ShreddingType {
27+
public:
28+
ShreddingType();
29+
explicit ShreddingType(const LogicalType &type);
30+
31+
public:
32+
ShreddingType Copy() const;
33+
34+
public:
35+
void Serialize(Serializer &serializer) const;
36+
static ShreddingType Deserialize(Deserializer &source);
37+
38+
public:
39+
static ShreddingType GetShreddingTypes(const Value &val);
40+
void AddChild(const string &name, ShreddingType &&child);
41+
optional_ptr<const ShreddingType> GetChild(const string &name) const;
42+
43+
public:
44+
bool set = false;
45+
LogicalType type;
46+
ChildShreddingTypes children;
47+
};
48+
49+
} // namespace duckdb

0 commit comments

Comments
 (0)