forked from Snowflake-Labs/pg_lake
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathavro.patch
More file actions
227 lines (217 loc) · 6.99 KB
/
avro.patch
File metadata and controls
227 lines (217 loc) · 6.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
diff --git a/lang/c/src/avro/io.h b/lang/c/src/avro/io.h
index ffbb68dc5..5bb2ee699 100644
--- a/lang/c/src/avro/io.h
+++ b/lang/c/src/avro/io.h
@@ -109,12 +109,17 @@ int avro_file_writer_create_with_codec(const char *path,
int avro_file_writer_create_with_codec_fp(FILE *fp, const char *path, int should_close,
avro_schema_t schema, avro_file_writer_t * writer,
const char *codec, size_t block_size);
+int avro_file_writer_create_with_codec_metadata_fp(FILE *fp, const char *path, int should_close,
+ avro_schema_t schema, avro_file_writer_t * writer,
+ const char *codec, size_t block_size, avro_value_t *meta);
int avro_file_writer_open(const char *path, avro_file_writer_t * writer);
int avro_file_writer_open_bs(const char *path, avro_file_writer_t * writer, size_t block_size);
int avro_file_reader(const char *path, avro_file_reader_t * reader);
int avro_file_reader_fp(FILE *fp, const char *path, int should_close,
avro_file_reader_t * reader);
+int avro_file_reader_json_schema(const char *file_path, const char **json_schema);
+
avro_schema_t
avro_file_reader_get_writer_schema(avro_file_reader_t reader);
diff --git a/lang/c/src/datafile.c b/lang/c/src/datafile.c
index c9d4dfeb6..a2c9d54b1 100644
--- a/lang/c/src/datafile.c
+++ b/lang/c/src/datafile.c
@@ -73,7 +73,7 @@ static int write_sync(avro_file_writer_t w)
return avro_write(w->writer, w->sync, sizeof(w->sync));
}
-static int write_header(avro_file_writer_t w)
+static int write_header(avro_file_writer_t w, avro_value_t *extra_meta)
{
int rval;
uint8_t version = 1;
@@ -82,6 +82,15 @@ static int write_header(avro_file_writer_t w)
const avro_encoding_t *enc = &avro_binary_encoding;
int64_t schema_len;
+ const char *json_schema = NULL;
+ size_t json_schema_len = 0;
+ if (extra_meta != NULL)
+ {
+ avro_value_t meta_schema = {0};
+ check(rval, avro_value_get_by_name(extra_meta, "avroschema", &meta_schema, NULL));
+ check(rval, avro_value_get_string(&meta_schema, &json_schema, &json_schema_len));
+ }
+
/* Generate random sync */
generate_sync(w);
@@ -94,15 +103,24 @@ static int write_header(avro_file_writer_t w)
check(rval, enc->write_string(w->writer, "avro.schema"));
schema_writer =
avro_writer_memory(&w->schema_buf[0], sizeof(w->schema_buf));
- rval = avro_schema_to_json(w->writers_schema, schema_writer);
+
+ if (json_schema) {
+ rval = avro_write(schema_writer, (char *)json_schema, strlen(json_schema));
+ }
+ else {
+ rval = avro_schema_to_json(w->writers_schema, schema_writer);
+ }
+
if (rval) {
avro_writer_free(schema_writer);
return rval;
}
+
schema_len = avro_writer_tell(schema_writer);
avro_writer_free(schema_writer);
check(rval,
enc->write_bytes(w->writer, w->schema_buf, schema_len));
+
check(rval, enc->write_long(w->writer, 0));
return write_sync(w);
}
@@ -140,7 +158,7 @@ file_writer_init_fp(FILE *fp, const char *path, int should_close, const char *mo
#endif
static int
-file_writer_create(FILE *fp, const char *path, int should_close, avro_schema_t schema, avro_file_writer_t w, size_t block_size)
+file_writer_create(FILE *fp, const char *path, int should_close, avro_schema_t schema, avro_file_writer_t w, size_t block_size, avro_value_t *extra_meta)
{
int rval;
@@ -169,7 +187,7 @@ file_writer_create(FILE *fp, const char *path, int should_close, avro_schema_t s
}
w->writers_schema = avro_schema_incref(schema);
- return write_header(w);
+ return write_header(w, extra_meta);
}
int
@@ -196,6 +214,15 @@ int avro_file_writer_create_with_codec(const char *path,
int avro_file_writer_create_with_codec_fp(FILE *fp, const char *path, int should_close,
avro_schema_t schema, avro_file_writer_t * writer,
const char *codec, size_t block_size)
+{
+ return avro_file_writer_create_with_codec_metadata_fp(fp, path, should_close, schema, writer, codec,
+ block_size, NULL);
+}
+
+
+int avro_file_writer_create_with_codec_metadata_fp(FILE *fp, const char *path, int should_close,
+ avro_schema_t schema, avro_file_writer_t * writer,
+ const char *codec, size_t block_size, avro_value_t *extra_meta)
{
avro_file_writer_t w;
int rval;
@@ -226,7 +253,7 @@ int avro_file_writer_create_with_codec_fp(FILE *fp, const char *path, int should
avro_freet(struct avro_file_writer_t_, w);
return rval;
}
- rval = file_writer_create(fp, path, should_close, schema, w, block_size);
+ rval = file_writer_create(fp, path, should_close, schema, w, block_size, extra_meta);
if (rval) {
avro_codec_reset(w->codec);
avro_freet(struct avro_codec_t_, w->codec);
@@ -541,6 +568,93 @@ int avro_file_reader_fp(FILE *fp, const char *path, int should_close,
return 0;
}
+int
+avro_file_reader_json_schema(const char *file_path, const char **json_schema)
+{
+ FILE *file = fopen(file_path, "rb");
+ if (!file) {
+ avro_set_error("Error opening file: %s",
+ strerror(errno));
+ return errno;
+ }
+
+ avro_reader_t reader = avro_reader_file(file);
+
+ int rval;
+
+ char magic[4] = {0};
+
+ /* read magic footer */
+ check(rval, avro_read(reader, magic, sizeof(magic)));
+
+ if (magic[0] != 'O' || magic[1] != 'b' || magic[2] != 'j'
+ || magic[3] != 1)
+ {
+ avro_reader_free(reader);
+ avro_set_error("Incorrect Avro container file magic number");
+ return 1;
+ }
+
+ /* each value is bytes */
+ avro_schema_t meta_values_schema = avro_schema_bytes();
+
+ /* metadata is map */
+ avro_schema_t meta_schema = avro_schema_map(meta_values_schema);
+
+ /* prepare avro interface for the schema */
+ avro_value_iface_t *meta_iface = avro_generic_class_from_schema(meta_schema);
+
+ if (meta_iface == NULL)
+ {
+ avro_reader_free(reader);
+ avro_set_error("Cannot create metadata interface");
+ return 1;
+ }
+
+ /* read avro metadata */
+ avro_value_t meta;
+
+ if (avro_generic_value_new(meta_iface, &meta) != 0)
+ {
+ avro_reader_free(reader);
+ avro_set_error("Cannot create metadata value");
+ return 1;
+ }
+
+ if (avro_value_read(reader, &meta) != 0)
+ {
+ avro_reader_free(reader);
+ avro_set_error("Cannot read file header");
+ return 1;
+ }
+
+ /* read "avro.schema" from the metadata */
+ avro_value_t schema_bytes;
+
+ if (avro_value_get_by_name(&meta, "avro.schema", &schema_bytes, NULL) != 0)
+ {
+ avro_reader_free(reader);
+ avro_set_error("File header doesn't contain a schema");
+ return 1;
+ }
+
+ const void *p = NULL;
+ size_t len = 0;
+
+ avro_value_get_bytes(&schema_bytes, &p, &len);
+
+ char *schema = avro_malloc(len + 1);
+
+ memcpy((void *) schema, p, len);
+ schema[len] = '\0';
+
+ *json_schema = schema;
+
+ avro_reader_free(reader);
+
+ return 0;
+}
+
int avro_file_reader(const char *path, avro_file_reader_t * reader)
{
FILE *fp;
diff --git a/lang/c/src/schema.c b/lang/c/src/schema.c
index a4d8e9f89..cddea00cc 100644
--- a/lang/c/src/schema.c
+++ b/lang/c/src/schema.c
@@ -56,7 +56,7 @@ static int is_avro_id(const char *name)
}
for (i = 0; i < len; i++) {
if (!(isalpha(name[i])
- || name[i] == '_' || (i && isdigit(name[i])))) {
+ || name[i] == '_' || name[i] == '.' || (i && isdigit(name[i])))) {
return 0;
}
}