diff --git a/.gitignore b/.gitignore index 6d5206b..93f41c8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +build + # Prerequisites *.d diff --git a/README.md b/README.md new file mode 100644 index 0000000..87ff5e9 --- /dev/null +++ b/README.md @@ -0,0 +1,126 @@ +# Rill + +A one-off specialized database for a 2 column schema that focuses on compressed +storage and read-only mmap files. + +## Building + +```shell +$ mkdir build && cd build +$ PREFIX=.. ../compile.sh +``` + +Currently only used in the `rill-rs` project which means that no install target +is currently provided. Build artifacts are the following: +- `src/rill.h` +- `build/rill.a` + +## Design Space + +- A pair is composed of a `u64` key and a `u64` value +- Key cardinality is in the order of 1 million +- Value cardinality is in the order of 100 million +- Infrequent batch query of keys over entire dataset +- Batch queries must finish within 5 minutes +- Pair ingestion must happen in real-time (order of 100k/sec) +- Pairs duplicates are very common +- Expire entire month of data older then 15 months +- Expect around 50 billion unique pairs in a single month +- Servers have around 250Gb of RAM and 2TB of SSD disk space + + +## Architecture + +Rill is split into the following major components: + +- `acc`: real-time data ingestion +- `store`: file storage format +- `rotation`: progressive merging and expiration of store files + +### Ingestion + + +### Storage + +Basic design philosophy: + +- Immutable +- Mutation through merge operation +- All pairs are sorted +- Memory mapped and queried directly. + + +#### Compression + +The main goal for storage is to fit the entire dataset on the disks of a single +server: + + 50B pairs * 15 months * 16bytes per pair = 12TB of disk space + +Given our 2TB of available disk space, we need to do some compression to store +everything. A general sketch of the compression is as follows: + +- Don't repeat keys +- Uniformize the namespace of the values +- Block encode (LEB128) the uniformized values + +Implemention basically begins by extracting all the unique values in the dataset +sorting them and storing them in a table. Using this table, we can then encode +indexes into our table instead of the values themselves. This means our +compression is dependent on the cardinality of the value set and not so much the +values themselves. + +Encoding the pairs is a simple scheme of writting the key in full, followed by a +list of all the value associated with that key. The list of values is a +block-encoding (LEB128) of the indexes into the value table. In other words, the +smaller the cardinality of the set the less byte we'll use on average to write a +value. + +Empirically, we were are able compress a single month of data down to less then +100GB which means that our dataset now sits comfortably on our 2TB disks. + + +#### Index + +We must also be able to quickly query a single key and extract all the +associated values for that key. Our compression requirements puts a bound on the +size of our index. A general sketch of the index is as follows: + +- Don't repeat keys +- Store the keys along with the offset of their value location in a table +- Search the table via tweaked binary search + +Implementation starts by building a table of all the keys and filling in their +offset as we encode the pairs. We also no longer store the keys with the pairs +as we can simply recover the key for a given list of value via it's implicit +index in the file. The index table is stored as is at the end of the file. + +Searching is done via a tweaked binary search over the index table. Empirically +this has proven to be fast enough to meet our 5 minutes batch query +requirements. Further optimizations are possible. We've also experimented with a +single pass interpolation search followed by a vectorized linear scan but +changes in the input data meant that the keys were no longer well distributed +which made the approach unusable. + + +#### Stamp + +Safe persistence is accomplished via a pseudo-2-phase commit scheme that uses a +stamp to mark the file as complete. Steps are as follow: + +- Write the entire file +- Flush to disk +- Write a magic stamp value in the header +- Flush to disk + +This guarantees that if the stamp is found at the beginning of the file then the +file has been completely written and persisted to disk. Note that rill relies on +the underlying file system to detect file corruption as no checksums are +computed or maintained. + +Note that after rill files are frequently deleted after being merged so the +stamping mechanism is critical to avoid deleting files that were not properly +merged. + + +### Rotation diff --git a/compile.sh b/compile.sh new file mode 100755 index 0000000..c5b3ed0 --- /dev/null +++ b/compile.sh @@ -0,0 +1,64 @@ +#! /usr/bin/env bash + +set -o errexit -o nounset -o pipefail -o xtrace + +: ${PREFIX:="."} + +declare -a SRC +SRC=(htable rng utils rows store acc rotate query) + +declare -a BIN +BIN=(load dump query rotate ingest merge count) + +declare -a TEST +TEST=(index coder store) + +CC=${OTHERC:-gcc} +LEAKCHECK_ENABLED=${LEAKCHECK_ENABLED:-} + +CFLAGS="-ggdb -O3 -march=native -pipe -std=gnu11 -D_GNU_SOURCE" +CFLAGS="$CFLAGS -I${PREFIX}/src" + +CFLAGS="$CFLAGS -Werror -Wall -Wextra" +CFLAGS="$CFLAGS -Wundef" +CFLAGS="$CFLAGS -Wcast-align" +CFLAGS="$CFLAGS -Wwrite-strings" +CFLAGS="$CFLAGS -Wunreachable-code" +CFLAGS="$CFLAGS -Wformat=2" +CFLAGS="$CFLAGS -Wswitch-enum" +CFLAGS="$CFLAGS -Wswitch-default" +CFLAGS="$CFLAGS -Winit-self" +CFLAGS="$CFLAGS -Wno-strict-aliasing" +CFLAGS="$CFLAGS -fno-strict-aliasing" +CFLAGS="$CFLAGS -Wno-implicit-fallthrough" + +OBJ="" +for src in "${SRC[@]}"; do + $CC -c -o "$src.o" "${PREFIX}/src/$src.c" $CFLAGS + OBJ="$OBJ $src.o" +done +ar rcs librill.a $OBJ + +for bin in "${BIN[@]}"; do + $CC -o "rill_$bin" "${PREFIX}/src/rill_$bin.c" librill.a $CFLAGS +done + +for test in "${TEST[@]}"; do + $CC -o "test_$test" "${PREFIX}/test/${test}_test.c" librill.a $CFLAGS + "./test_$test" +done + +# this one takes a while so it's usually run manually +$CC -o "test_rotate" "${PREFIX}/test/rotate_test.c" librill.a $CFLAGS + + +if [ -n "$LEAKCHECK_ENABLED" ]; then + for test in "{TEST[@]}"; do + valgrind \ + --leak-check=full \ + --track-origins=yes \ + --trace-children=yes \ + --error-exitcode=1 \ + "./test_$test" + done +fi diff --git a/src/acc.c b/src/acc.c new file mode 100644 index 0000000..eff7527 --- /dev/null +++ b/src/acc.c @@ -0,0 +1,233 @@ +/* acc.c + Rémi Attab (remi.attab@gmail.com), 16 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "rill.h" +#include "utils.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + + +// ----------------------------------------------------------------------------- +// acc +// ----------------------------------------------------------------------------- + +static const uint32_t version = 1; +static const uint32_t magic = 0x43434152; + +struct rill_packed header +{ + uint32_t magic; + uint32_t version; + + uint64_t len; + + atomic_size_t read; + atomic_size_t write; +}; + +struct rill_packed row +{ + uint64_t a, b; +}; + +struct rill_acc +{ + int fd; + const char *dir; + + void *vma; + size_t vma_len; + + struct header *head; + struct row *data; +}; + +enum { min_cap = 32 }; + +struct rill_acc *rill_acc_open(const char *dir, size_t cap) +{ + if (cap != rill_acc_read_only && cap < min_cap) cap = min_cap; + + // Add enough leeway to avoid contention between the reader and the writer. + // some might say this is an excessive amount of leeway but I don't care. + cap *= 2; + + struct rill_acc *acc = calloc(1, sizeof(*acc)); + if (!acc) { + rill_fail("unable to allocate memory for '%s'", dir); + goto fail_alloc_struct; + } + + acc->dir = strndup(dir, PATH_MAX); + if (!acc->dir) { + rill_fail("unable to allocate memory for '%s'", dir); + goto fail_alloc_dir; + } + + if (mkdir(dir, 0775) == -1 && errno != EEXIST) { + rill_fail_errno("unable to open create dir '%s'", dir); + goto fail_mkdir; + } + + char file[PATH_MAX]; + snprintf(file, sizeof(file), "%s/acc", dir); + + bool create = false; + struct stat stat_ret = {0}; + if (stat(file, &stat_ret) == -1) { + if (errno != ENOENT) { + rill_fail_errno("unable to stat '%s'", file); + goto fail_stat; + } + + if (cap == rill_acc_read_only) goto fail_read_only; + + create = true; + acc->fd = open(file, O_RDWR | O_CREAT | O_EXCL | O_NOATIME, 0644); + } + else acc->fd = open(file, O_RDWR); + + if (acc->fd == -1) { + rill_fail_errno("unable to create '%s'", file); + goto fail_open; + } + + if (create) { + acc->vma_len = to_vma_len(sizeof(*acc->head) + cap * sizeof(*acc->data)); + if (ftruncate(acc->fd, acc->vma_len) == -1) { + rill_fail_errno("unable to ftruncate '%s' to len '%lu'", file, acc->vma_len); + goto fail_truncate; + } + } + else { + size_t len = stat_ret.st_size; + if (len < sizeof(struct header)) { + rill_fail("invalid size for '%s'", file); + goto fail_size; + } + + acc->vma_len = to_vma_len(len); + } + + int prot = PROT_READ | PROT_WRITE; + int flags = MAP_SHARED | MAP_POPULATE; + acc->vma = mmap(NULL, acc->vma_len, prot, flags, acc->fd, 0); + if (acc->vma == MAP_FAILED) { + rill_fail_errno("unable to mmap '%s' of len '%lu'", file, acc->vma_len); + goto fail_mmap; + } + + acc->head = acc->vma; + acc->data = (void *) (acc->head + 1); + + if (create) { + acc->head->magic = magic; + acc->head->version = version; + acc->head->len = cap; + } + else { + if (acc->head->magic != magic) { + rill_fail("invalid magic '0x%x' for '%s'", acc->head->magic, file); + goto fail_magic; + } + + if (acc->head->version != version) { + rill_fail("unknown version '%du' for '%s'", acc->head->version, file); + goto fail_version; + } + } + + return acc; + + fail_version: + fail_magic: + munmap(acc->vma, acc->vma_len); + fail_mmap: + fail_size: + fail_truncate: + close(acc->fd); + fail_open: + fail_read_only: + fail_stat: + fail_mkdir: + free((char *) acc->dir); + fail_alloc_dir: + free(acc); + fail_alloc_struct: + return NULL; +} + +void rill_acc_close(struct rill_acc *acc) +{ + munmap(acc->vma, acc->vma_len); + close(acc->fd); + free((char *) acc->dir); + free(acc); +} + +void rill_acc_ingest(struct rill_acc *acc, rill_val_t a, rill_val_t b) +{ + assert(a && b); + + size_t write = atomic_load_explicit(&acc->head->write, memory_order_relaxed); + size_t index = write % acc->head->len; + struct row *row = &acc->data[index]; + + row->a = a; + row->b = b; + + atomic_store_explicit(&acc->head->write, write + 1, memory_order_release); +} + +bool rill_acc_write(struct rill_acc *acc, const char *file, rill_ts_t now) +{ + size_t start = atomic_load_explicit(&acc->head->read, memory_order_acquire); + size_t end = atomic_load_explicit(&acc->head->write, memory_order_acquire); + if (start == end) return true; + assert(start < end); + + if (end - start > acc->head->len) { + printf("acc lost '%lu' events: read=%lu, write=%lu, cap=%lu\n", + (end - start) - acc->head->len, start, end, acc->head->len); + start = end - acc->head->len; + } + + struct rill_rows rows = {0}; + if (!rill_rows_reserve(&rows, end - start)) goto fail_rows_reserve; + + for (size_t i = start; i < end; ++i) { + size_t index = i % acc->head->len; + struct row *row = &acc->data[index]; + + if (!rill_rows_push(&rows, row->a, row->b)) goto fail_rows_push; + } + + if (!rill_store_write(file, now, 0, &rows)) { + rill_fail("unable to write acc file '%s'", file); + goto fail_write; + } + + atomic_store_explicit(&acc->head->read, end, memory_order_release); + + rill_rows_free(&rows); + return true; + + fail_write: + fail_rows_push: + rill_rows_free(&rows); + fail_rows_reserve: + return false; +} diff --git a/src/coder.c b/src/coder.c new file mode 100644 index 0000000..d6176ae --- /dev/null +++ b/src/coder.c @@ -0,0 +1,221 @@ +/* coder.c + Rémi Attab (remi.attab@gmail.com), 10 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +// ----------------------------------------------------------------------------- +// leb128 +// ----------------------------------------------------------------------------- + +static inline uint8_t *leb128_encode(uint8_t *it, uint64_t val) +{ + static const size_t shift = 7; + static const uint64_t more_mask = 1UL << shift; + static const uint64_t body_mask = (1UL << shift) - 1; + + do { + *it = val & body_mask; + *it |= (val >>= shift) ? more_mask : 0; + it++; + } while (val); + + return it; +} + +static inline bool leb128_decode(uint8_t **it, uint8_t *end, uint64_t *val) +{ + static const size_t shift = 7; + static const uint64_t more_mask = 1UL << shift; + static const uint64_t body_mask = (1UL << shift) - 1; + + if (*it == end) return it; + + uint8_t data; + size_t pos = 0; + *val = 0; + + do { + data = **it; (*it)++; + *val |= (data & body_mask) << pos; + pos += shift; + } while ((data & more_mask) && *it != end); + + return !(data & more_mask); +} + + + +// ----------------------------------------------------------------------------- +// encode +// ----------------------------------------------------------------------------- + +static const size_t coder_max_val_len = sizeof(rill_val_t) + 2 + 1; + +struct encoder +{ + uint8_t *it, *start, *end; + + size_t keys; + rill_val_t key; + + vals_rev_t rev; + struct index *index; + + size_t rows; +}; + +static size_t coder_cap(size_t vals, size_t rows) +{ + size_t bytes = 1; + while (vals >= 1UL << (bytes * 7)) bytes++; + + return (bytes + 1) // + 1 -> end-of-values terminator + * (rows + 1); // + 1 -> end-of-rows terminator +} + +static uint64_t coder_off(struct encoder *coder) +{ + return coder->it - coder->start; +} + + +static inline bool coder_write_sep(struct encoder *coder) +{ + if (rill_unlikely(coder->it + 1 > coder->end)) { + rill_fail("not enough space to write sep: %p + 1 > %p\n", + (void *) coder->it, (void *) coder->end); + return false; + } + + *coder->it = 0; + coder->it++; + + return true; +} + +// \todo might want to just write directly to region since out-of-bounds are the +// rare case. +static inline bool coder_write_val(struct encoder *coder, rill_val_t val) +{ + val = vals_vtoi(&coder->rev, val); + + uint8_t buffer[coder_max_val_len]; + size_t len = leb128_encode(buffer, val) - buffer; + + if (rill_unlikely(coder->it + len > coder->end)) { + rill_fail("not enough space to write val: %p + %lu > %p\n", + (void *) coder->it, len, (void *) coder->end); + return false; + } + + memcpy(coder->it, buffer, len); + coder->it += len; + + return true; +} + +static bool coder_encode(struct encoder *coder, const struct rill_row *row) +{ + if (coder->key != row->a) { + if (rill_likely(coder->key)) { + if (!coder_write_sep(coder)) return false; + } + + index_put(coder->index, row->a, coder_off(coder)); + coder->key = row->a; + coder->keys++; + } + + if (!coder_write_val(coder, row->b)) return false; + + coder->rows++; + return true; +} + +static bool coder_finish(struct encoder *coder) +{ + if (!coder_write_sep(coder)) return false; + if (!coder_write_sep(coder)) return false; + return true; +} + +static void coder_close(struct encoder *coder) +{ + htable_reset(&coder->rev); +} + +static struct encoder make_encoder( + uint8_t *start, + uint8_t *end, + struct vals *vals, + struct index *index) +{ + struct encoder coder = { + .it = start, .start = start, .end = end, + .index = index, + }; + + vals_rev_make(vals, &coder.rev); + return coder; +} + + +// ----------------------------------------------------------------------------- +// decoder +// ----------------------------------------------------------------------------- + +struct decoder +{ + uint8_t *it, *end; + + size_t keys; + rill_val_t key; + + struct index *lookup; + struct index *index; + + struct vals *vals; +}; + +static inline bool coder_read_val(struct decoder *coder, rill_val_t *val) +{ + if (!leb128_decode(&coder->it, coder->end, val)) { + rill_fail("unable to decode value at '%p-%p'\n", + (void *) coder->it, (void *) coder->end); + return false; + } + + if (*val) *val = coder->lookup->data[*val - 1].key; + return true; +} + +static bool coder_decode(struct decoder *coder, struct rill_row *row) +{ + if (rill_likely(coder->key)) { + row->a = coder->key; + if (!coder_read_val(coder, &row->b)) return false; + if (row->b) return true; + } + + coder->key = index_get(coder->index, coder->keys); + coder->keys++; + + row->a = coder->key; + if (!row->a) return true; // eof + + return coder_read_val(coder, &row->b); +} + +static struct decoder make_decoder_at( + uint8_t *it, uint8_t *end, + struct index *lookup, + struct index *index, + size_t key_idx) +{ + return (struct decoder) { + .it = it, .end = end, + .keys = key_idx, + .lookup = lookup, + .index = index, + }; +} diff --git a/src/htable.c b/src/htable.c new file mode 100644 index 0000000..961167b --- /dev/null +++ b/src/htable.c @@ -0,0 +1,142 @@ +/* htable.c + Rémi Attab (remi.attab@gmail.com), 10 Mar 2016 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "htable.h" + +#include +#include +#include + +// ----------------------------------------------------------------------------- +// config +// ----------------------------------------------------------------------------- + +enum { probe_window = 8 }; + + +// ----------------------------------------------------------------------------- +// hash +// ----------------------------------------------------------------------------- + +// FNV-1a hash implementation: http://isthe.com/chongo/tech/comp/fnv/ +inline uint64_t hash_key(uint64_t key) +{ + const uint8_t *data = (uint8_t *) &key; + + uint64_t hash = 0xcbf29ce484222325; + for (size_t i = 0; i < sizeof(key); ++i) + hash = (hash ^ data[i]) * 0x100000001b3; + + assert(hash); // \todo Can't be 0 + return hash; +} + + +// ----------------------------------------------------------------------------- +// htable +// ----------------------------------------------------------------------------- + +void htable_reset(struct htable *ht) +{ + free(ht->table); + *ht = (struct htable) {0}; +} + +static bool table_put( + struct htable_bucket *table, size_t cap, + uint64_t key, uint64_t value) +{ + assert(key); + uint64_t hash = hash_key(key); + + for (size_t i = 0; i < probe_window; ++i) { + struct htable_bucket *bucket = &table[(hash + i) % cap]; + if (bucket->key) continue; + + bucket->key = key; + bucket->value = value; + return true; + } + + return false; +} + +static void htable_resize(struct htable *ht, size_t cap) +{ + if (cap <= ht->cap) return; + + size_t new_cap = ht->cap ? ht->cap : 1; + while (new_cap < cap) new_cap *= 2; + + struct htable_bucket *new_table = calloc(new_cap, sizeof(*new_table)); + for (size_t i = 0; i < ht->cap; ++i) { + struct htable_bucket *bucket = &ht->table[i]; + if (!bucket->key) continue; + + if (!table_put(new_table, new_cap, bucket->key, bucket->value)) { + free(new_table); + htable_resize(ht, new_cap * 2); + return; + } + } + + free(ht->table); + ht->cap = new_cap; + ht->table = new_table; +} + +void htable_reserve(struct htable *ht, size_t items) +{ + htable_resize(ht, items * 4); +} + + +// ----------------------------------------------------------------------------- +// ops +// ----------------------------------------------------------------------------- + +struct htable_ret htable_get(struct htable *ht, uint64_t key) +{ + assert(key); + + uint64_t hash = hash_key(key); + htable_resize(ht, probe_window); + + for (size_t i = 0; i < probe_window; ++i) { + struct htable_bucket *bucket = &ht->table[(hash + i) % ht->cap]; + + if (!bucket->key) continue; + if (bucket->key != key) continue; + + return (struct htable_ret) { .ok = true, .value = bucket->value }; + } + + return (struct htable_ret) { .ok = false }; +} + +struct htable_ret htable_put(struct htable *ht, uint64_t key, uint64_t value) +{ + assert(key); + + uint64_t hash = hash_key(key); + htable_resize(ht, probe_window); + + for (size_t i = 0; i < probe_window; ++i) { + struct htable_bucket *bucket = &ht->table[(hash + i) % ht->cap]; + + if (bucket->key) { + if (bucket->key != key) continue; + return (struct htable_ret) { .ok = false, .value = bucket->value }; + } + + ht->len++; + bucket->key = key; + bucket->value = value; + return (struct htable_ret) { .ok = true }; + } + + htable_resize(ht, ht->cap * 2); + return htable_put(ht, key, value); +} diff --git a/src/htable.h b/src/htable.h new file mode 100644 index 0000000..02ecadb --- /dev/null +++ b/src/htable.h @@ -0,0 +1,40 @@ +/* htable.h + Rémi Attab (remi.attab@gmail.com), 10 Mar 2016 + FreeBSD-style copyright and disclaimer apply +*/ + +#pragma once + +#include +#include +#include + + +// ----------------------------------------------------------------------------- +// struct +// ----------------------------------------------------------------------------- + +struct htable_bucket +{ + uint64_t key; + uint64_t value; +}; + +struct htable +{ + size_t len; + size_t cap; + struct htable_bucket *table; +}; + +struct htable_ret +{ + bool ok; + uint64_t value; +}; + + +void htable_reset(struct htable *); +void htable_reserve(struct htable *, size_t items); +struct htable_ret htable_get(struct htable *, uint64_t key); +struct htable_ret htable_put(struct htable *, uint64_t key, uint64_t value); diff --git a/src/index.c b/src/index.c new file mode 100644 index 0000000..02348ec --- /dev/null +++ b/src/index.c @@ -0,0 +1,59 @@ +/* index.c + Rémi Attab (remi.attab@gmail.com), 24 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + + +// ----------------------------------------------------------------------------- +// config +// ----------------------------------------------------------------------------- + +struct rill_packed index_kv +{ + uint64_t key; + uint64_t off; +}; + +struct rill_packed index +{ + uint64_t len; + uint64_t __unused; // kept for backwards compatibility + struct index_kv data[]; +}; + +static size_t index_cap(size_t len) +{ + return sizeof(struct index) + len * sizeof(struct index_kv); +} + +static void index_put(struct index *index, rill_val_t key, uint64_t off) +{ + index->data[index->len] = (struct index_kv) { .key = key, .off = off }; + index->len++; +} + +// RIP fancy pants interpolation search :( +static bool index_find( + struct index *index, rill_val_t key, size_t *key_idx, uint64_t *off) +{ + size_t idx = 0; + size_t len = index->len; + struct index_kv *low = index->data; + + while (len > 1) { + size_t mid = len / 2; + if (key < low[mid].key) len = mid; + else { low += mid; len -= mid; idx += mid;} + } + + struct index_kv *row = &index->data[idx]; + if (row->key != key) return false; + *key_idx = idx; + *off = row->off; + return true; +} + +static rill_val_t index_get(struct index *index, size_t i) +{ + return i < index->len ? index->data[i].key : 0; +} diff --git a/src/query.c b/src/query.c new file mode 100644 index 0000000..7470c1d --- /dev/null +++ b/src/query.c @@ -0,0 +1,99 @@ +/* rill.c + Rémi Attab (remi.attab@gmail.com), 03 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "rill.h" +#include "utils.h" + +#include +#include + +#include +#include +#include +#include + + +// ----------------------------------------------------------------------------- +// rill +// ----------------------------------------------------------------------------- + +struct rill_query +{ + const char *dir; + + size_t len; + struct rill_store *list[1024]; +}; + +struct rill_query * rill_query_open(const char *dir) +{ + struct rill_query *query = calloc(1, sizeof(*query)); + if (!query) { + rill_fail("unable to allocate memory for '%s'", dir); + goto fail_alloc_struct; + } + + query->dir = strndup(dir, PATH_MAX); + if (!query->dir) { + rill_fail("unable to allocate memory for '%s'", dir); + goto fail_alloc_dir; + } + + size_t cap = sizeof(query->list) / sizeof(query->list[0]); + query->len = rill_scan_dir(query->dir, query->list, cap); + + return query; + + free((char *) query->dir); + fail_alloc_dir: + free(query); + fail_alloc_struct: + return NULL; +} + +void rill_query_close(struct rill_query *query) +{ + for (size_t i = 0; i < query->len; ++i) + rill_store_close(query->list[i]); + + free((char *) query->dir); + free(query); +} + +bool rill_query_key( + const struct rill_query *query, + enum rill_col col, + rill_val_t key, + struct rill_rows *out) +{ + if (!key) return false; + + for (size_t i = 0; i < query->len; ++i) { + if (!rill_store_query(query->list[i], col, key, out)) + return false; + } + + rill_rows_compact(out); + return true; +} + +bool rill_query_keys( + const struct rill_query *query, + enum rill_col col, + const rill_val_t *keys, size_t len, + struct rill_rows *out) +{ + if (!len) return true; + + for (size_t i = 0; i < query->len; ++i) { + for (size_t j = 0; i < len; ++j) { + if (!rill_store_query(query->list[i], col, keys[j], out)) + return false; + } + } + + rill_rows_compact(out); + return true; +} diff --git a/src/rill.h b/src/rill.h new file mode 100644 index 0000000..8e2e107 --- /dev/null +++ b/src/rill.h @@ -0,0 +1,209 @@ +/* rill.h + Rémi Attab (remi.attab@gmail.com), 03 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#pragma once + +#include +#include +#include + + +// ----------------------------------------------------------------------------- +// error +// ----------------------------------------------------------------------------- + +enum { rill_err_msg_cap = 1024 }; + +struct rill_error +{ + const char *file; + int line; + + int errno_; // errno can be a macro hence the underscore. + char msg[rill_err_msg_cap]; +}; + +extern __thread struct rill_error rill_errno; + +void rill_perror(struct rill_error *err); +size_t rill_strerror(struct rill_error *err, char *dest, size_t len); + + +// ----------------------------------------------------------------------------- +// types +// ----------------------------------------------------------------------------- + +typedef uint64_t rill_ts_t; +typedef uint64_t rill_val_t; + + +// ----------------------------------------------------------------------------- +// col +// ----------------------------------------------------------------------------- + +enum { rill_cols = 2 }; +enum rill_col { rill_col_a = 0, rill_col_b = 1 }; + +inline enum rill_col rill_col_flip(enum rill_col col) +{ + return 1 - col; +} + + +// ----------------------------------------------------------------------------- +// row +// ----------------------------------------------------------------------------- + +struct rill_row +{ + rill_val_t a, b; +}; + +inline bool rill_row_nil(const struct rill_row *row) +{ + return !row->a && !row->b; +} + +inline int rill_row_cmp(const struct rill_row *lhs, const struct rill_row *rhs) +{ + if (lhs->a < rhs->a) return -1; + if (lhs->a > rhs->a) return +1; + + if (lhs->b < rhs->b) return -1; + if (lhs->b > rhs->b) return +1; + + return 0; +} + +inline rill_val_t rill_row_get(const struct rill_row *row, enum rill_col col) +{ + // Avoids branches but could be dangerous if col happens to be giberrish. + return ((rill_val_t *) row)[col]; +} + + +// ----------------------------------------------------------------------------- +// rows +// ----------------------------------------------------------------------------- + +struct rill_rows +{ + size_t len, cap; + struct rill_row *data; +}; + +void rill_rows_free(struct rill_rows *); + +bool rill_rows_push(struct rill_rows *, rill_val_t a, rill_val_t b); +bool rill_rows_reserve(struct rill_rows *, size_t cap); +void rill_rows_clear(struct rill_rows *); + +void rill_rows_invert(struct rill_rows *); +void rill_rows_compact(struct rill_rows *); +bool rill_rows_append(struct rill_rows *, struct rill_rows *other); + +bool rill_rows_copy(const struct rill_rows *, struct rill_rows *out); +void rill_rows_print(const struct rill_rows *); + + +// ----------------------------------------------------------------------------- +// store +// ----------------------------------------------------------------------------- + +struct rill_store; +struct rill_store_it; + +struct rill_store *rill_store_open(const char *file); +void rill_store_close(struct rill_store *store); + +bool rill_store_write( + const char *file, + rill_ts_t ts, + size_t quant, + struct rill_rows *rows); + +bool rill_store_merge( + const char *file, + rill_ts_t ts, size_t quant, + struct rill_store **list, size_t len); + +bool rill_store_rm(struct rill_store *); + +const char * rill_store_file(const struct rill_store *); +unsigned rill_store_version(const struct rill_store *); +rill_ts_t rill_store_ts(const struct rill_store *); +size_t rill_store_quant(const struct rill_store *); +size_t rill_store_rows(const struct rill_store *); + +size_t rill_store_vals( + const struct rill_store *, enum rill_col, rill_val_t *out, size_t len); +size_t rill_store_vals_count(const struct rill_store *, enum rill_col); + +bool rill_store_query( + const struct rill_store *, enum rill_col, rill_val_t, struct rill_rows *out); + +struct rill_store_it *rill_store_begin(const struct rill_store *, enum rill_col); +void rill_store_it_free(struct rill_store_it *); +bool rill_store_it_next(struct rill_store_it *, struct rill_row *out); + +struct rill_store_stats +{ + size_t header_bytes; + size_t index_bytes[2]; + size_t rows_bytes[2]; +}; + +void rill_store_stats(const struct rill_store *, struct rill_store_stats *); + + +// ----------------------------------------------------------------------------- +// acc +// ----------------------------------------------------------------------------- + +struct rill_acc; + +enum { rill_acc_read_only = 0 }; + +struct rill_acc *rill_acc_open(const char *dir, size_t cap); +void rill_acc_close(struct rill_acc *acc); + +void rill_acc_ingest(struct rill_acc *acc, rill_val_t a, rill_val_t b); +bool rill_acc_write(struct rill_acc *acc, const char *file, rill_ts_t now); + + +// ----------------------------------------------------------------------------- +// rotate +// ----------------------------------------------------------------------------- + +bool rill_rotate(const char *dir, rill_ts_t now); + + +// ----------------------------------------------------------------------------- +// query +// ----------------------------------------------------------------------------- + +struct rill_query; + +struct rill_query * rill_query_open(const char *dir); +void rill_query_close(struct rill_query *db); + +bool rill_query_key( + const struct rill_query *query, + enum rill_col col, + rill_val_t key, + struct rill_rows *out); + +bool rill_query_keys( + const struct rill_query *query, + enum rill_col col, + const rill_val_t *keys, size_t len, + struct rill_rows *out); + + +// ----------------------------------------------------------------------------- +// misc +// ----------------------------------------------------------------------------- + +size_t rill_scan_dir(const char *dir, struct rill_store **list, size_t cap); diff --git a/src/rill_count.c b/src/rill_count.c new file mode 100644 index 0000000..0a388a3 --- /dev/null +++ b/src/rill_count.c @@ -0,0 +1,77 @@ +/* rill_count.c + Rémi Attab (remi.attab@gmail.com), 07 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "rill.h" +#include "utils.h" + +#include +#include +#include + + +// ----------------------------------------------------------------------------- +// count +// ----------------------------------------------------------------------------- + +static void count(struct rill_store *store, enum rill_col col) +{ + struct rill_row row; + struct rill_store_it *it = rill_store_begin(store, col); + + rill_val_t key = 0; + size_t count = 0; + while (rill_store_it_next(it, &row)) { + if (rill_row_nil(&row)) break; + + if (row.a == key) count++; + else { + if (key) printf("%lu %p\n", count, (void *) key); + count = 1; + key = row.a; + } + } + + rill_store_it_free(it); +} + + +// ----------------------------------------------------------------------------- +// main +// ----------------------------------------------------------------------------- + +static void usage() +{ + fprintf(stderr, "rill_count - \n"); + exit(1); +} + +int main(int argc, char **argv) +{ + if (argc != 3) usage(); + + int opt = 0; + bool col_a = false, col_b = false; + + while ((opt = getopt(argc, argv, "+ab")) != -1) { + switch(opt) { + case 'a': col_a = true; break; + case 'b': col_b = true; break; + default: usage(); + } + } + + if (optind >= argc) usage(); + + enum rill_col col; + if (!rill_args_col(col_a, col_b, &col)) usage(); + + struct rill_store *store = rill_store_open(argv[optind]); + if (!store) rill_exit(1); + + count(store, col); + + rill_store_close(store); + return 0; +} diff --git a/src/rill_dump.c b/src/rill_dump.c new file mode 100644 index 0000000..686a89c --- /dev/null +++ b/src/rill_dump.c @@ -0,0 +1,124 @@ +/* dump.c + Rémi Attab (remi.attab@gmail.com), 07 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "rill.h" +#include "utils.h" + +#include +#include +#include + + +// ----------------------------------------------------------------------------- +// dump +// ----------------------------------------------------------------------------- + +static void dump_headers(struct rill_store *store) +{ + printf("version: %u\n", rill_store_version(store)); + printf("ts: %lu\n", rill_store_ts(store)); + printf("quant: %lu\n", rill_store_quant(store)); + printf("rows: %lu\n", rill_store_rows(store)); + printf("vals[a]: %zu\n", rill_store_vals_count(store, rill_col_a)); + printf("vals[b]: %zu\n", rill_store_vals_count(store, rill_col_b)); +} + +static void dump_stats(struct rill_store *store) +{ + struct rill_store_stats stats = {0}; + rill_store_stats(store, &stats); + + printf("header: %zu\n", stats.header_bytes); + printf("index[a]: %zu\n", stats.index_bytes[rill_col_a]); + printf("index[b]: %zu\n", stats.index_bytes[rill_col_b]); + printf("rows[a]: %zu\n", stats.rows_bytes[rill_col_a]); + printf("rows[b]: %zu\n", stats.rows_bytes[rill_col_b]); +} + +static void dump_vals(struct rill_store *store, enum rill_col col) +{ + const size_t vals_len = rill_store_vals_count(store, col); + rill_val_t *vals = calloc(vals_len, sizeof(*vals)); + + (void) rill_store_vals(store, col, vals, vals_len); + + for (size_t i = 0; i < vals_len; ++i) + printf("0x%lx\n", vals[i]); + + free(vals); +} + +static void dump_rows(struct rill_store *store, enum rill_col col) +{ + struct rill_store_it *it = rill_store_begin(store, col); + struct rill_row row = {0}; + + while (rill_store_it_next(it, &row)) { + if (rill_row_nil(&row)) break; + printf("0x%lx 0x%lx\n", row.a, row.b); + } + + rill_store_it_free(it); +} + + +// ----------------------------------------------------------------------------- +// main +// ----------------------------------------------------------------------------- + +static void usage() +{ + fprintf(stderr, "rill_dump - \n"); + fprintf(stderr, "rill_dump - - \n"); + exit(1); +} + +int main(int argc, char **argv) +{ + bool headers = false; + bool stats = false; + bool vals = false; + bool rows = false; + bool col_a = false; + bool col_b = false; + + int opt = 0; + while ((opt = getopt(argc, argv, "+hsvrab")) != -1) { + switch (opt) { + case 'h': headers = true; break; + case 's': stats = true; break; + case 'v': vals = true; break; + case 'r': rows = true; break; + case 'a': col_a = true; break; + case 'b': col_b = true; break; + default: + fprintf(stderr, "unknown argument: %c\n", opt); + usage(); + } + } + + if (optind >= argc) usage(); + + struct rill_store *store = rill_store_open(argv[optind]); + if (!store) rill_exit(1); + + if (!headers && !stats && !vals && !rows) usage(); + + if (headers || stats) { + printf("file: %s\n", rill_store_file(store)); + if (headers) dump_headers(store); + if (stats) dump_stats(store); + return 0; + } + + enum rill_col col; + if (!rill_args_col(col_a, col_b, &col)) usage(); + + if (vals) dump_vals(store, col); + if (rows) dump_rows(store, col); + + rill_store_close(store); + return 0; +} diff --git a/src/rill_ingest.c b/src/rill_ingest.c new file mode 100644 index 0000000..f3d5d2c --- /dev/null +++ b/src/rill_ingest.c @@ -0,0 +1,138 @@ +/* rill_ingest.c + Rémi Attab (remi.attab@gmail.com), 21 Nov 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "rill.h" +#include "utils.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +static inline uint64_t endian_btol(uint64_t x) +{ + return __builtin_bswap64(x); +} + +struct rill_store *load_file(const char *file, rill_ts_t ts, rill_ts_t quant) +{ + printf("loading: %s\n", file); + + struct stat st = {0}; + if (stat(file, &st) == -1) { + rill_fail_errno("unable to stat '%s'", file); + rill_exit(1); + } + + int fd = open(file, O_RDONLY); + if (fd == -1) { + rill_fail_errno("unable to open '%s'", file); + rill_exit(1); + } + + const int prot = PROT_READ | PROT_WRITE; + size_t len = to_vma_len(st.st_size); + + void *ptr = mmap(0, len + page_len, prot, MAP_ANONYMOUS | MAP_PRIVATE, 0, 0); + if (ptr == MAP_FAILED) { + rill_fail_errno("unable to mmap anon '%p'", (void *) (len + page_len)); + rill_exit(1); + } + + const int flags = MAP_PRIVATE | MAP_FIXED | MAP_POPULATE; + void *data = mmap((uint8_t *)ptr + page_len, len, prot, flags, fd, 0); + if (data == MAP_FAILED) { + rill_fail_errno("unable to mmap fixed '%d'", fd); + rill_exit(1); + } + + struct rill_row *it = data; + struct rill_row *end = it + (st.st_size / sizeof(*it)); + for (; it < end; ++it) { + rill_val_t a = endian_btol(it->a); + rill_val_t b = endian_btol(it->b); + *it = (struct rill_row) { .a = a, .b = b }; + } + + struct rill_rows *rows = ((struct rill_rows *)data) - 1; + if (!rows) rill_exit(1); + + rows->cap = rows->len = st.st_size / sizeof(rows->data[0]); + rill_rows_compact(rows); + + char file_rill[PATH_MAX]; + snprintf(file_rill, sizeof(file_rill), "%s.rill", file); + + if (!rill_store_write(file_rill, ts, quant, rows)) rill_exit(1); + munmap(ptr, page_len); + munmap(data, len); + + struct rill_store *store = rill_store_open(file_rill); + if (!store) rill_exit(1); + return store; +} + +void usage() +{ + fprintf(stderr, "rill_ingest -t -q -o \n"); + exit(1); +} + +int main(int argc, char **argv) +{ + rill_ts_t ts = 0; + rill_ts_t quant = 0; + char *output = NULL; + + int opt = 0; + while ((opt = getopt(argc, argv, "+t:q:o:")) != -1) { + switch (opt) { + case 't': ts = atol(optarg); break; + case 'q': quant = atol(optarg); break; + case 'o': output = optarg; break; + default: usage(); + } + } + + if (!ts || !quant || !output) usage(); + if (optind >= argc) usage(); + + struct rill_store *merge[64] = {0}; + + for (; optind < argc; optind++) { + struct rill_store *store = load_file(argv[optind], ts, quant); + for (size_t i = 0; i < 64; ++i) { + if (!merge[i]) { merge[i] = store; break; } + + printf("merging: %lu\n", i); + + char out[PATH_MAX]; + snprintf(out, sizeof(out), "%s.rill.%lu", argv[optind], i); + + struct rill_store *list[2] = { store, merge[i] }; + if (!rill_store_merge(out, ts, quant, list, 2)) rill_exit(1); + + store = rill_store_open(out); + if (!store) rill_exit(1); + + merge[i] = NULL; + rill_store_rm(list[0]); + rill_store_rm(list[1]); + + } + } + + if (!rill_store_merge(output, ts, quant, merge, 64)) rill_exit(1); + for (size_t i = 0; i < 64; ++i) { + if (!merge[i]) continue; + rill_store_rm(merge[i]); + } + + return 0; +} diff --git a/src/rill_load.c b/src/rill_load.c new file mode 100644 index 0000000..4f28734 --- /dev/null +++ b/src/rill_load.c @@ -0,0 +1,87 @@ +/* bench.c + Rémi Attab (remi.attab@gmail.com), 04 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "rill.h" +#include "rng.h" +#include "utils.h" + +#include +#include +#include + +#include +#include +#include + +void rm(const char *path) +{ + DIR *dir = opendir(path); + if (!dir) return; + + struct dirent *entry = NULL; + while ((entry = readdir(dir))) { + if (entry->d_type != DT_REG) continue; + + char file[PATH_MAX]; + snprintf(file, sizeof(file), "%s/%s", path, entry->d_name); + unlink(file); + } + + closedir(dir); + rmdir(path); +} + +void acc_dump(struct rill_acc *acc, const char *dir, rill_ts_t ts) +{ + char file[PATH_MAX]; + snprintf(file, sizeof(file), "%s/%010lu.rill", dir, ts); + + if (!rill_acc_write(acc, file, ts)) rill_abort(); +} + +int main(int argc, char **argv) +{ + (void) argc, (void) argv; + rm("db"); + + enum { + keys_per_sec = 200, + seconds = 1 * month_secs, + rotation_rate = 10 * min_secs, + + keys_range = 20 * 1000 * 1000, + vals_range = 100 * 1000, + vals_per_key = 4, + + acc_cap = keys_per_sec * vals_per_key * rotation_rate, + }; + + struct rill_acc *acc = rill_acc_open("db", acc_cap); + if (!acc) rill_abort(); + + struct rng rng = rng_make(0); + for (size_t ts = 0; ts < seconds; ++ts) { + for (size_t i = 0; i < keys_per_sec; ++i) { + uint64_t key = rng_gen_range(&rng, 0, keys_range) + 1; + + for (size_t j = 0; j < vals_per_key; ++j) { + uint64_t val = rng_gen_range(&rng, 0, vals_range) + 1; + rill_acc_ingest(acc, key, val); + } + } + + if (ts % rotation_rate == 0) { + acc_dump(acc, "db", ts); + if (!rill_rotate("db", ts)) rill_abort(); + } + } + + rill_ts_t ts = seconds + 60 * 60; + acc_dump(acc, "db", ts); + if (!rill_rotate("db", ts)) rill_abort(); + + rill_acc_close(acc); + return 0; +} diff --git a/src/rill_merge.c b/src/rill_merge.c new file mode 100644 index 0000000..3d38366 --- /dev/null +++ b/src/rill_merge.c @@ -0,0 +1,53 @@ +/* rill_merge.c + Rémi Attab (remi.attab@gmail.com), 23 Nov 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "rill.h" +#include "utils.h" + +#include +#include +#include + + +void usage() +{ + fprintf(stderr, "rill_ingest -t -q -o \n"); + exit(1); +} + +int main(int argc, char **argv) +{ + rill_ts_t ts = 0; + rill_ts_t quant = 0; + char *output = NULL; + + int opt = 0; + while ((opt = getopt(argc, argv, "+t:q:o:")) != -1) { + switch (opt) { + case 't': ts = atol(optarg); break; + case 'q': quant = atol(optarg); break; + case 'o': output = optarg; break; + default: usage(); + } + } + + if (!ts || !quant || !output) usage(); + if (optind >= argc) usage(); + + size_t len = argc - optind; + struct rill_store *stores[len]; + for (size_t i = 0; i < len; i++, optind++) { + stores[i] = rill_store_open(argv[optind]); + if (!stores[i]) rill_exit(1); + } + + if (!rill_store_merge(output, ts, quant, stores, len)) + rill_exit(1); + + for (size_t i = 0; i < len; ++i) + rill_store_rm(stores[i]); + + return 0; +} diff --git a/src/rill_query.c b/src/rill_query.c new file mode 100644 index 0000000..32bd86c --- /dev/null +++ b/src/rill_query.c @@ -0,0 +1,123 @@ +/* rill_query.c + Rémi Attab (remi.attab@gmail.com), 01 Oct 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "rill.h" +#include "utils.h" + +#include +#include + +#include +#include +#include + + +// ----------------------------------------------------------------------------- +// query +// ----------------------------------------------------------------------------- + +static bool is_file(const char *path) +{ + struct stat st = {0}; + stat(path, &st); + return S_ISREG(st.st_mode); +} + +static void query(const char *db, enum rill_col col, rill_val_t val) +{ + struct rill_rows rows = {0}; + + if (is_file(db)) { + struct rill_store *store = rill_store_open(db); + if (!store) rill_exit(1); + + if (!rill_store_query(store, col, val, &rows)) rill_exit(1); + + rill_store_close(store); + } + else { + struct rill_query *query = rill_query_open(db); + if (!query) rill_exit(1); + + if (!rill_query_key(query, col, val, &rows)) rill_exit(1); + + rill_query_close(query); + } + + for (size_t i = 0; i < rows.len; ++i) + printf("0x%lx 0x%lx\n", rows.data[i].a, rows.data[i].b); + + rill_rows_free(&rows); +} + + +// ----------------------------------------------------------------------------- +// main +// ----------------------------------------------------------------------------- + +static void usage() +{ + fprintf(stderr, "rill_query - \n"); + exit(1); +} + +static uint64_t read_u64(char *arg) +{ + size_t n = strnlen(arg, 128); + + bool is_hex = false; + if (n > 2 && arg[0] == '0' && arg[1] == 'x') { + if (n > 2 + 16) { + rill_fail("value too big '%s'\n", arg); + rill_exit(1); + } + + is_hex = true; + } + + uint64_t value = 0; + + for (size_t i = 2; i < n; ++i) { + char c = arg[i]; + value *= is_hex ? 16 : 10; + + if (c >= '0' && c <= '9') value += c - '0'; + else if (is_hex && c >= 'a' && c <= 'f') value += c - 'a' + 10; + else if (is_hex && c >= 'A' && c <= 'F') value += c - 'A' + 10; + else { + rill_fail("invalid character '%c' in '%s'\n", c, arg); + rill_exit(1); + } + } + + return value; +} + +int main(int argc, char *argv[]) +{ + bool col_a = false; + bool col_b = false; + + int opt = 0; + while ((opt = getopt(argc, argv, "+ab")) != -1) { + switch (opt) { + case 'a': col_a = true; break; + case 'b': col_b = true; break; + default: usage(); exit(1); + } + } + + if (optind + 1 >= argc) usage(); + + enum rill_col col; + if (!rill_args_col(col_a, col_b, &col)) usage(); + + rill_val_t val = read_u64(argv[optind]); + const char *db = argv[optind + 1]; + + query(db, col, val); + + return 0; +} diff --git a/src/rill_rotate.c b/src/rill_rotate.c new file mode 100644 index 0000000..bd2f5db --- /dev/null +++ b/src/rill_rotate.c @@ -0,0 +1,27 @@ +/* rill_rotate.c + Rémi Attab (remi.attab@gmail.com), 20 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "rill.h" +#include "utils.h" + +#include +#include + +int main(int argc, const char **argv) +{ + if (argc != 2) { + fprintf(stderr, "./rill_rotate \n"); + return 1; + } + + struct timespec ts; + (void) clock_gettime(CLOCK_REALTIME, &ts); + + printf("rotating '%s' at '%lu'\n", argv[1], ts.tv_sec); + if (!rill_rotate(argv[1], ts.tv_sec)) rill_exit(1); + + return 0; +} + diff --git a/src/rng.c b/src/rng.c new file mode 100644 index 0000000..03ede5d --- /dev/null +++ b/src/rng.c @@ -0,0 +1,56 @@ +/* rng.c + Rémi Attab (remi.attab@gmail.com), 25 Feb 2016 + FreeBSD-style copyright and disclaimer apply + + Xorshift random number generator for testing and statsd sampling + + See George Marsaglia (2003). Xorshift RNGs. DOI: 10.18637/jss.v008.i14 + http://www.jstatsoft.org/article/view/v008i14 + (section 4, function xor128) + + Current implementation is the xorshift64* variant which has better + statistical properties. +*/ + +#include "rng.h" + +#include + + +// ----------------------------------------------------------------------------- +// init +// ----------------------------------------------------------------------------- + +struct rng rng_make(uint64_t seed) +{ + // We xor the seed with a randomly chosen number to avoid ending up with a 0 + // state which would be bad. + struct rng rng = { .x = seed ^ UINT64_C(0xedef335f00e170b3) }; + assert(rng.x); + return rng; +} + + + +// ----------------------------------------------------------------------------- +// gen +// ----------------------------------------------------------------------------- + +uint64_t rng_gen(struct rng *rng) +{ + rng->x ^= rng->x >> 12; + rng->x ^= rng->x << 25; + rng->x ^= rng->x >> 27; + return rng->x * UINT64_C(2685821657736338717); +} + +uint64_t rng_gen_range(struct rng *rng, uint64_t min, uint64_t max) +{ + assert(max - min != 0); + return rng_gen(rng) % (max - min) + min; +} + +bool rng_gen_prob(struct rng *rng, double prob) +{ + return rng_gen(rng) <= (uint64_t) (prob * rng_max()); +} diff --git a/src/rng.h b/src/rng.h new file mode 100644 index 0000000..a2364ac --- /dev/null +++ b/src/rng.h @@ -0,0 +1,28 @@ +/* rng.h + Rémi Attab (remi.attab@gmail.com), 25 Feb 2016 + FreeBSD-style copyright and disclaimer apply +*/ + +#pragma once + +#include +#include + +// ----------------------------------------------------------------------------- +// rng +// ----------------------------------------------------------------------------- + +struct rng { uint64_t x; }; +struct rng rng_make(uint64_t seed); + +inline uint64_t rng_max() { return (uint64_t) -1UL; } + + +// ----------------------------------------------------------------------------- +// gen +// ----------------------------------------------------------------------------- + +uint64_t rng_gen(struct rng *rng); +uint64_t rng_gen_range(struct rng *rng, uint64_t min, uint64_t max); + +bool rng_gen_prob(struct rng *rng, double prob); diff --git a/src/rotate.c b/src/rotate.c new file mode 100644 index 0000000..a590f16 --- /dev/null +++ b/src/rotate.c @@ -0,0 +1,235 @@ +/* rotate.c + Rémi Attab (remi.attab@gmail.com), 16 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "rill.h" +#include "utils.h" + +#include +#include +#include + +#include +#include +#include +#include +#include + + +// ----------------------------------------------------------------------------- +// rotate +// ----------------------------------------------------------------------------- + +static ssize_t expire(rill_ts_t now, struct rill_store **list, ssize_t len) +{ + if (len < 0) return len; + if (now < expire_secs) return len; // mostly for tests. + + size_t i = 0; + for (; i < (size_t) len; ++i) { + if (rill_store_ts(list[i]) < (now - expire_secs)) break; + } + + size_t end = i; + for (; i < (size_t) len; ++i) { + rill_store_rm(list[i]); + list[i] = NULL; + } + + return end; +} + +static int file_exists(const char *file) +{ + struct stat s; + if (!stat(file, &s)) return 1; + if (errno == ENOENT) return 0; + + rill_fail_errno("unable to stat '%s'", file); + return -1; +} + + +static bool file_name( + const char *dir, rill_ts_t ts, rill_ts_t quant, char *out, size_t len) +{ + rill_ts_t month = ts / month_secs; + rill_ts_t week = (ts / week_secs) % weeks_in_month; + rill_ts_t day = (ts / day_secs) % days_in_week; + rill_ts_t hour = (ts / hour_secs) % hours_in_day; + + char base[NAME_MAX]; + if (quant == hour_secs) + snprintf(base, sizeof(base), "%s/%05lu-%02lu-%02lu-%02lu.rill", + dir, month, week, day, hour); + else if (quant == day_secs) + snprintf(base, sizeof(base), "%s/%05lu-%02lu-%02lu.rill", dir, month, week, day); + else if (quant == week_secs) + snprintf(base, sizeof(base), "%s/%05lu-%02lu.rill", dir, month, week); + else if (quant == month_secs) + snprintf(base, sizeof(base), "%s/%05lu.rill", dir, month); + else assert(false); + + strncpy(out, base, len < sizeof(base) ? len : sizeof(base)); + + int ret; + size_t i = 0; + while ((ret = file_exists(out)) == 1) + snprintf(out, len, "%s.%lu", base, i++); + + if (ret == -1) return false; + return true; +} + +static struct rill_store *merge( + const char *dir, + rill_ts_t ts, rill_ts_t quant, + struct rill_store **list, size_t len) +{ + assert(len > 0); + if (len == 1) { + struct rill_store *result = list[0]; + list[0] = NULL; + return result; + } + + char file[PATH_MAX]; + if (!file_name(dir, ts, quant, file, sizeof(file))) return NULL; + if (!rill_store_merge(file, ts, quant, list, len)) return NULL; + + for (size_t i = 0; i < len; ++i) { + rill_store_rm(list[i]); + list[i] = NULL; + } + + return rill_store_open(file); +} + +static ssize_t merge_quant( + const char *dir, + rill_ts_t now, rill_ts_t quant, + struct rill_store **list, ssize_t len) +{ + if (len <= 1) return len; + + size_t out_len = 0; + struct rill_store *out[(size_t) len]; + + size_t start = 0; + rill_ts_t current_quant = rill_store_ts(list[0]) / quant; + + for (size_t i = 0; i < (size_t) len; i++) { + size_t end = i + 1; + assert(i >= start); + assert(end > start); + + size_t next_ts = i + 1 != (size_t) len ? rill_store_ts(list[i + 1]) : -1UL; + + // Useful for debugging + /* printf("[%lu,%lu] file=%s, now=%lu(%lu), ts=%lu(%lu), next=%lu(%lu), earliest=%lu(%lu)\n", */ + /* quant, i, rill_store_file(list[i]), now, now / quant, */ + /* rill_store_ts(list[i]), rill_store_ts(list[i]) / quant, */ + /* i + 1 != (size_t) len ? rill_store_ts(list[i + 1]) : 0, */ + /* i + 1 != (size_t) len ? rill_store_ts(list[i + 1]) / quant : 0, */ + /* rill_store_ts(list[start]), rill_store_ts(list[start]) / quant); */ + + if (next_ts / quant == current_quant) continue; + + rill_ts_t earliest_ts = rill_store_ts(list[start]); + if (earliest_ts / quant != now / quant) { + struct rill_store *store = merge(dir, earliest_ts, quant, list + start, end - start); + if (!store) goto fail; + out[out_len++] = store; + } + + // if a file is in the quant represented by now then we don't want to + // merge it as we're still filling in this quant. Additionally, if it's + // in our current quant then it will also be in all bigger quants so we + // can just forget these files for the rest of the rotation. + else { + for (size_t j = start; j < end; ++j) { + rill_store_close(list[j]); + list[j] = NULL; + } + } + + current_quant = next_ts / quant; + start = i + 1; + } + + for (size_t i = 0; i < (size_t) len; ++i) assert(!list[i]); + memcpy(list, out, out_len * sizeof(out[0])); + return out_len; + + fail: + for (size_t i = 0; i < out_len; ++i) + rill_store_close(out[i]); + + return -1; +} + +static int store_cmp(const void *l, const void *r) +{ + const struct rill_store *const *lhs = l; + const struct rill_store *const *rhs = r; + + // earliest (biggest) to oldest (smallest) + if (rill_store_ts(*lhs) < rill_store_ts(*rhs)) return +1; + if (rill_store_ts(*lhs) > rill_store_ts(*rhs)) return -1; + return 0; +} + +// Note that an flock is released on process termination on linux. This means +// that we don't have to worry about cleaning up in case of segfaults or signal +// termination. +static int lock(const char *dir) +{ + int fd = open(dir, O_DIRECTORY | O_RDONLY); + if (fd == -1) { + rill_fail_errno("unable to open: %s\n", dir); + return -1; + } + + if (flock(fd, LOCK_EX | LOCK_NB) == -1) { + if (errno == EWOULDBLOCK) return 0; + + rill_fail_errno("unable acquire flock on '%s'\n", dir); + close(fd); + return -1; + } + + return fd; +} + +static void unlock(int fd) +{ + flock(fd, LOCK_UN); + close(fd); +} + +bool rill_rotate(const char *dir, rill_ts_t now) +{ + int fd = lock(dir); + if (!fd) return true; + if (fd == -1) return false; + + enum { cap = 1024 }; + struct rill_store *list[cap]; + size_t list_len = rill_scan_dir(dir, list, cap); + qsort(list, list_len, sizeof(list[0]), store_cmp); + + ssize_t len = list_len; + len = expire(now, list, len); + len = merge_quant(dir, now, hour_secs, list, len); + len = merge_quant(dir, now, day_secs, list, len); + len = merge_quant(dir, now, week_secs, list, len); + len = merge_quant(dir, now, month_secs, list, len); + + for (size_t i = 0; i < list_len; ++i) { + if (list[i]) rill_store_close(list[i]); + } + + unlock(fd); + return len >= 0; +} diff --git a/src/rows.c b/src/rows.c new file mode 100644 index 0000000..b666091 --- /dev/null +++ b/src/rows.c @@ -0,0 +1,136 @@ +/* rows.c + Rémi Attab (remi.attab@gmail.com), 02 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "rill.h" +#include "utils.h" + +#include +#include +#include + + +// ----------------------------------------------------------------------------- +// row +// ----------------------------------------------------------------------------- + +extern inline bool rill_row_nil(const struct rill_row *); +extern inline int rill_row_cmp(const struct rill_row *, const struct rill_row *); + + +// ----------------------------------------------------------------------------- +// rows +// ----------------------------------------------------------------------------- + +void rill_rows_free(struct rill_rows *rows) +{ + free(rows->data); +} + +void rill_rows_clear(struct rill_rows *rows) +{ + rows->len = 0; +} + +bool rill_rows_reserve(struct rill_rows *rows, size_t cap) +{ + if (rill_likely(cap <= rows->cap)) return true; + + size_t new_cap = rows->cap ? rows->cap : 1; + while (new_cap < cap) new_cap *= 2; + + rows->data = realloc(rows->data, new_cap * sizeof(rows->data[0])); + if (!rows->data) { + rill_fail("unable to realloc rows: cap=%lu", new_cap); + return false; + } + + rows->cap = new_cap; + return true; +} + +bool rill_rows_push(struct rill_rows *rows, rill_val_t a, rill_val_t b) +{ + assert(a && b); + if (!rill_rows_reserve(rows, rows->len + 1)) return false; + + rows->data[rows->len] = (struct rill_row) { .a = a, .b = b }; + rows->len++; + + return rows; +} + +static int row_cmp(const void *lhs, const void *rhs) +{ + return rill_row_cmp(lhs, rhs); +} + +void rill_rows_compact(struct rill_rows *rows) +{ + if (rows->len <= 1) return; + qsort(rows->data, rows->len, sizeof(*rows->data), &row_cmp); + + size_t j = 0; + for (size_t i = 1; i < rows->len; ++i) { + if (!rill_row_cmp(&rows->data[i], &rows->data[j])) continue; + ++j; + if (j != i) rows->data[j] = rows->data[i]; + } + + assert(j + 1 <= rows->len); + rows->len = j + 1; +} + +void rill_rows_invert(struct rill_rows* rows) +{ + for (size_t i = 0; i < rows->len; ++i) { + rows->data[i] = (struct rill_row) { + .a = rows->data[i].b, + .b = rows->data[i].a, + }; + } + + qsort(rows->data, rows->len, sizeof(*rows->data), &row_cmp); +} + +bool rill_rows_copy(const struct rill_rows *rows, struct rill_rows *out) +{ + if (!rill_rows_reserve(out, rows->len)) return false; + + memcpy(out->data, rows->data, rows->len * sizeof(rows->data[0])); + out->len = rows->len; + + return true; +} + +bool rill_rows_append(struct rill_rows *rows, struct rill_rows *other) +{ + if (!rill_rows_reserve(rows, rows->len + other->len)) return false; + memcpy(rows->data + rows->len, other->data, other->len * sizeof(other->data[0])); + rows->len += other->len; + + return true; +} + + +void rill_rows_print(const struct rill_rows *rows) +{ + const rill_val_t nil = -1ULL; + rill_val_t key = nil; + + printf("rows(%lu, %lu):\n", rows->len, rows->cap); + + for (size_t i = 0; i < rows->len; ++i) { + const struct rill_row *row = &rows->data[i]; + + if (row->a == key) printf(", %p", (void *) row->b); + else { + if (key != nil) printf("]\n"); + printf(" %p: [ %p", (void *) row->a, (void *) row->b); + key = row->a; + } + } + + if (rows->len) printf(" ]\n"); +} diff --git a/src/store.c b/src/store.c new file mode 100644 index 0000000..2007fe7 --- /dev/null +++ b/src/store.c @@ -0,0 +1,652 @@ +/* store.c + Rémi Attab (remi.attab@gmail.com), 02 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "rill.h" +#include "utils.h" +#include "htable.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + + +// ----------------------------------------------------------------------------- +// impl +// ----------------------------------------------------------------------------- + +#include "index.c" +#include "vals.c" +#include "coder.c" + +// ----------------------------------------------------------------------------- +// store +// ----------------------------------------------------------------------------- + +/* version 6 introduces reverse lookup, and massive db format changes */ +static const uint32_t version = 6; + +static const uint32_t magic = 0x4C4C4952; +static const uint64_t stamp = 0xFFFFFFFFFFFFFFFFUL; +/* version 6 can not support older dbs -- they'll need to be updated */ +static const uint32_t supported_versions[] = { 6 }; + +struct rill_packed header +{ + uint32_t magic; + uint32_t version; + + uint64_t ts; + uint64_t quant; + + uint64_t rows; + + uint64_t data_off[rill_cols]; + uint64_t index_off[rill_cols]; + + uint64_t __unused[2]; + + uint64_t stamp; +}; + +struct rill_store +{ + int fd; + const char *file; + + void *vma; + size_t vma_len; + + struct header *head; + struct vals *vals; + + uint8_t *data[rill_cols]; + struct index *index[rill_cols]; + uint8_t *end; +}; + + + +// ----------------------------------------------------------------------------- +// coder +// ----------------------------------------------------------------------------- + +static inline void *store_ptr(struct rill_store *store, uint64_t off) +{ + return (void *) ((uintptr_t) store->vma + off); +} + +static struct encoder store_encoder( + struct rill_store *store, + enum rill_col col, + struct vals *vals[rill_cols]) +{ + enum rill_col other_col = rill_col_flip(col); + + size_t start = store->head->data_off[col]; + size_t end = store->vma_len; + + return make_encoder( + store->vma + start, + store->vma + end, + vals[other_col], + store->index[col]); +} + +static struct decoder store_decoder_at( + const struct rill_store *store, + enum rill_col col, + size_t key_idx, + uint64_t off) +{ + enum rill_col other_col = rill_col_flip(col); + + struct index *index = store->index[col]; + struct index *lookup = store->index[other_col]; + + size_t start = store->head->data_off[col]; + size_t end = col == rill_col_a ? + store->head->data_off[other_col] : store->vma_len; + + return make_decoder_at( + store->vma + start + off, + store->vma + end, + lookup, index, + key_idx); +} + +static struct decoder store_decoder( + const struct rill_store *store, enum rill_col col) +{ + return store_decoder_at(store, col, 0, 0); +} + + +// ----------------------------------------------------------------------------- +// open +// ----------------------------------------------------------------------------- + +static bool is_supported_version(uint32_t version) +{ + for (size_t i = 0; i < array_len(supported_versions); ++i) + if (version == supported_versions[i]) return true; + return false; +} + +struct rill_store *rill_store_open(const char *file) +{ + struct rill_store *store = calloc(1, sizeof(*store)); + if (!store) { + rill_fail("unable to allocate memory for '%s'", file); + goto fail_alloc_struct; + } + + store->file = strndup(file, PATH_MAX); + if (!store->file) { + rill_fail("unable to allocate memory for '%s'", file); + goto fail_alloc_file; + } + + struct stat stat_ret = {0}; + if (stat(file, &stat_ret) == -1) { + rill_fail_errno("unable to stat '%s'", file); + goto fail_stat; + } + + size_t len = stat_ret.st_size; + if (len < sizeof(struct header)) { + rill_fail("invalid size '%lu' for '%s'", len, file); + goto fail_size; + } + + store->vma_len = to_vma_len(len); + + store->fd = open(file, O_RDONLY); + if (store->fd == -1) { + rill_fail_errno("unable to open '%s'", file); + goto fail_open; + } + + store->vma = mmap(NULL, store->vma_len, PROT_READ, MAP_SHARED, store->fd, 0); + if (store->vma == MAP_FAILED) { + rill_fail_errno("unable to mmap '%s' of len '%lu'", file, store->vma_len); + goto fail_mmap; + } + + store->head = store->vma; + for (size_t col = 0; col < rill_cols; ++col) { + store->index[col] = store_ptr(store, store->head->index_off[col]); + store->data[col] = store_ptr(store, store->head->data_off[col]); + } + store->end = store_ptr(store, store->vma_len); + + if (store->head->magic != magic) { + rill_fail("invalid magic '0x%x' for '%s'", store->head->magic, file); + goto fail_magic; + } + + if (!is_supported_version(store->head->version)) { + rill_fail("invalid version '%u' for '%s'", store->head->version, file); + goto fail_version; + } + + if (store->head->stamp != stamp) { + rill_fail("invalid stamp '%lx' for '%s'", store->head->stamp, file); + goto fail_stamp; + } + + return store; + + fail_version: + fail_magic: + fail_stamp: + munmap(store->vma, store->vma_len); + fail_mmap: + close(store->fd); + fail_open: + fail_size: + fail_stat: + free((char *) store->file); + fail_alloc_file: + free(store); + fail_alloc_struct: + return NULL; +} + +void rill_store_close(struct rill_store *store) +{ + munmap(store->vma, store->vma_len); + close(store->fd); + free((char *) store->file); + free(store); +} + +bool rill_store_rm(struct rill_store *store) +{ + if (unlink(store->file) == -1) { + rill_fail_errno("unable to unlink '%s'", store->file); + return false; + } + + rill_store_close(store); + return true; +} + + +// ----------------------------------------------------------------------------- +// writer +// ----------------------------------------------------------------------------- + +static bool writer_open( + struct rill_store *store, + const char *file, + struct vals *vals[rill_cols], + size_t rows, + rill_ts_t ts, + size_t quant) +{ + store->file = file; + + store->fd = open(file, O_RDWR | O_CREAT | O_EXCL, 0644); + if (store->fd == -1) { + rill_fail_errno("unable to open '%s'", file); + goto fail_open; + } + + size_t len = sizeof(struct header); + for (size_t col = 0; col < rill_cols; ++col) { + len += index_cap(vals[col]->len); + len += coder_cap(vals[col]->len, rows); + } + + if (ftruncate(store->fd, len) == -1) { + rill_fail_errno("unable to resize '%s'", file); + goto fail_truncate; + } + + store->vma_len = to_vma_len(len); + store->vma = mmap(NULL, store->vma_len, PROT_WRITE | PROT_READ, MAP_SHARED, store->fd, 0); + if (store->vma == MAP_FAILED) { + rill_fail_errno("unable to mmap '%s'", file); + goto fail_mmap; + } + + store->head = store->vma; + store->end = store_ptr(store, store->vma_len); + + *store->head = (struct header) { + .magic = magic, + .version = version, + .ts = ts, + .quant = quant, + }; + + return true; + + munmap(store->vma, store->vma_len); + fail_mmap: + fail_truncate: + close(store->fd); + fail_open: + return false; +} + +static void writer_close( + struct rill_store *store, size_t len) +{ + if (len) { + assert(len <= store->vma_len); + if (ftruncate(store->fd, len) == -1) + rill_fail_errno("unable to resize '%s'", store->file); + + if (fdatasync(store->fd) == -1) + rill_fail_errno("unable to fdatasync data '%s'", store->file); + + // Indicate that the file has been fully written and is ready for + // use. An additional sync is required for the stamp to ensure that the + // data is... + // - ... properly persisted before we delete it (durability) + // - ... only persisted after all the data has been persisted (ordering) + store->head->stamp = stamp; + if (fdatasync(store->fd) == -1) + rill_fail_errno("unable to fdatasync stamp '%s'", store->file); + } + else if (unlink(store->file) == -1) + rill_fail_errno("unable to unlink '%s'", store->file); + + munmap(store->vma, store->vma_len); + close(store->fd); +} + +static void writer_offsets_init( + struct rill_store *store, struct vals *vals[rill_cols]) +{ + uint64_t off = sizeof(struct header); + + store->head->index_off[rill_col_a] = off; + store->index[rill_col_a] = store_ptr(store, off); + + off += index_cap(vals[rill_col_a]->len); + + store->head->index_off[rill_col_b] = off; + store->index[rill_col_b] = store_ptr(store, off); + + off += index_cap(vals[rill_col_b]->len); + + store->head->data_off[rill_col_a] = off; + store->data[rill_col_a] = store_ptr(store, off); +} + +static void writer_offsets_finish(struct rill_store *store, size_t off) +{ + store->head->data_off[rill_col_b] = store->head->data_off[rill_col_a] + off; + store->data[rill_col_b] = store_ptr(store, off); +} + +bool rill_store_write( + const char *file, + rill_ts_t ts, size_t quant, + struct rill_rows *rows) +{ + rill_rows_compact(rows); + if (!rows->len) return true; + + struct vals *vals[rill_cols] = {0}; + for (size_t col = 0; col < rill_cols; ++col) { + vals[col] = vals_for_col(rows, col); + if (!vals[col]) goto fail_vals; + } + + struct rill_store store = {0}; + if (!writer_open(&store, file, vals, rows->len, ts, quant)) goto fail_open; + + writer_offsets_init(&store, vals); + + struct encoder coder_a = store_encoder(&store, rill_col_a, vals); + for (size_t i = 0; i < rows->len; ++i) { + if (!coder_encode(&coder_a, &rows->data[i])) goto fail_encode_a; + } + if (!coder_finish(&coder_a)) goto fail_encode_a; + + writer_offsets_finish(&store, coder_off(&coder_a)); + rill_rows_invert(rows); + + struct encoder coder_b = store_encoder(&store, rill_col_b, vals); + for (size_t i = 0; i < rows->len; ++i) { + if (!coder_encode(&coder_b, &rows->data[i])) goto fail_encode_b; + } + if (!coder_finish(&coder_b)) goto fail_encode_b; + + store.head->rows = rows->len; + writer_close(&store, store.head->data_off[rill_col_b] + coder_off(&coder_b)); + + coder_close(&coder_a); + coder_close(&coder_b); + + for (size_t col = 0; col < rill_cols; ++col) + free(vals[col]); + + return true; + + fail_encode_b: + coder_close(&coder_b); + fail_encode_a: + coder_close(&coder_a); + writer_close(&store, 0); + fail_open: + for (size_t col = 0; col < rill_cols; ++col) free(vals[col]); + fail_vals: + return false; +} + + +static bool store_merge_col( + struct rill_store** list, + size_t list_len, + enum rill_col col, + struct encoder* coder) +{ + struct rill_row rows[list_len]; + struct decoder decoders[list_len]; + + size_t it_len = 0; + for (size_t i = 0; i < list_len; ++i) { + if (!list[i]) continue; + decoders[it_len] = store_decoder(list[i], col); + it_len++; + } + assert(it_len); + + for (size_t i = 0; i < it_len; ++i) { + if (!(coder_decode(&decoders[i], &rows[i]))) goto fail_decoder; + } + + struct rill_row prev = {0}; + while (it_len > 0) { + size_t target = 0; + + for (size_t i = 1; i < it_len; ++i) { + if (rill_row_cmp(&rows[i], &rows[target]) < 0) + target = i; + } + + struct rill_row *row = &rows[target]; + struct decoder *decoder = &decoders[target]; + + if (rill_likely(rill_row_nil(&prev) || rill_row_cmp(&prev, row) < 0)) { + if (!coder_encode(coder, row)) goto fail_decoder; + prev = *row; + } + + if (!coder_decode(decoder, row)) goto fail_decoder; + if (rill_unlikely(rill_row_nil(row))) { + memmove(rows + target, + rows + target + 1, + (it_len - target - 1) * sizeof(rows[0])); + memmove(decoders + target, + decoders + target + 1, + (it_len - target - 1) * sizeof(decoders[0])); + it_len--; + } + } + + return true; + + fail_decoder: + return false; +} + +bool rill_store_merge( + const char *file, + rill_ts_t ts, size_t quant, + struct rill_store **list, size_t list_len) +{ + assert(list_len > 1); + + size_t rows = 0; + struct vals *vals[rill_cols] = {0}; + + for (size_t i = 0; i < list_len; ++i) { + if (!list[i]) continue; + + for (size_t col = 0; col < rill_cols; ++col) { + struct vals *ret = vals_add_index(vals[col], list[i]->index[col]); + if (!ret) goto fail_vals; + vals[col] = ret; + } + + rows += list[i]->head->rows; + } + + struct rill_store store = {0}; + if (!writer_open(&store, file, vals, rows, ts, quant)) goto fail_open; + + writer_offsets_init(&store, vals); + + struct encoder encoder_a = store_encoder(&store, rill_col_a, vals); + if (!store_merge_col(list, list_len, rill_col_a, &encoder_a)) goto fail_coder_a; + if (!coder_finish(&encoder_a)) goto fail_coder_a; + + writer_offsets_finish(&store, coder_off(&encoder_a)); + + struct encoder encoder_b = store_encoder(&store, rill_col_b, vals); + if (!store_merge_col(list, list_len, rill_col_b, &encoder_b)) goto fail_coder_b; + if (!coder_finish(&encoder_b)) goto fail_coder_b; + + store.head->rows = encoder_a.rows; + writer_close(&store, store.head->data_off[rill_col_b] + coder_off(&encoder_b)); + + coder_close(&encoder_a); + coder_close(&encoder_b); + + for (size_t col = 0; col < rill_cols; ++col) free(vals[col]); + return true; + + coder_close(&encoder_b); + fail_coder_b: + coder_close(&encoder_a); + fail_coder_a: + writer_close(&store, 0); + fail_open: + fail_vals: + for (size_t col = 0; col < rill_cols; ++col) free(vals[col]); + return false; +} + + +// ----------------------------------------------------------------------------- +// header info +// ----------------------------------------------------------------------------- + +const char * rill_store_file(const struct rill_store *store) +{ + return store->file; +} + +unsigned rill_store_version(const struct rill_store *store) +{ + return store->head->version; +} + +rill_ts_t rill_store_ts(const struct rill_store *store) +{ + return store->head->ts; +} + +size_t rill_store_quant(const struct rill_store *store) +{ + return store->head->quant; +} + +size_t rill_store_rows(const struct rill_store *store) +{ + return store->head->rows; +} + + +// ----------------------------------------------------------------------------- +// query +// ----------------------------------------------------------------------------- + +size_t rill_store_vals_count(const struct rill_store *store, enum rill_col col) +{ + return store->index[col]->len; +} + +size_t rill_store_vals( + const struct rill_store *store, + enum rill_col col, + rill_val_t *out, + size_t cap) +{ + const struct index* index = store->index[col]; + size_t len = cap < index->len ? cap : index->len; + + for (size_t i = 0; i < len; ++i) + out[i] = index->data[i].key; + + return len; +} + + +bool rill_store_query( + const struct rill_store *store, + enum rill_col col, + rill_val_t key, + struct rill_rows *out) +{ + uint64_t off = 0; + size_t key_idx = 0; + if (!index_find(store->index[col], key, &key_idx, &off)) return true; + + struct rill_row row = {0}; + struct decoder coder = store_decoder_at(store, col, key_idx, off); + + while (true) { + if (!coder_decode(&coder, &row)) return false; + if (rill_row_nil(&row) || row.a != key) break; + + if (!rill_rows_push(out, row.a, row.b)) return false; + } + + return true; +} + + +// ----------------------------------------------------------------------------- +// iterators +// ----------------------------------------------------------------------------- + +struct rill_store_it { struct decoder decoder; }; + +struct rill_store_it *rill_store_begin( + const struct rill_store *store, enum rill_col col) +{ + struct rill_store_it *it = calloc(1, sizeof(*it)); + if (!it) return NULL; + + it->decoder = store_decoder(store, col); + return it; +} + +void rill_store_it_free(struct rill_store_it *it) +{ + free(it); +} + +bool rill_store_it_next(struct rill_store_it *it, struct rill_row *row) +{ + return coder_decode(&it->decoder, row); +} + + +// ----------------------------------------------------------------------------- +// stats +// ----------------------------------------------------------------------------- + +void rill_store_stats( + const struct rill_store *store, struct rill_store_stats *out) +{ + *out = (struct rill_store_stats) { + .header_bytes = sizeof(*store->head), + + .index_bytes[rill_col_a] = store->head->index_off[rill_col_b] - + store->head->index_off[rill_col_a], + .index_bytes[rill_col_b] = store->head->data_off[rill_col_a] - + store->head->index_off[rill_col_b], + + .rows_bytes[rill_col_a] = store->head->data_off[rill_col_b] - + store->head->data_off[rill_col_a], + .rows_bytes[rill_col_b] = store->vma_len - + store->head->data_off[rill_col_b], + }; +} diff --git a/src/utils.c b/src/utils.c new file mode 100644 index 0000000..945b559 --- /dev/null +++ b/src/utils.c @@ -0,0 +1,122 @@ +/* utils.c + Rémi Attab (remi.attab@gmail.com), 17 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "rill.h" +#include "utils.h" + +#include +#include +#include +#include +#include + + +// ----------------------------------------------------------------------------- +// error +// ----------------------------------------------------------------------------- + +__thread struct rill_error rill_errno = { 0 }; + +void rill_abort() +{ + rill_perror(&rill_errno); + abort(); +} + +void rill_exit(int code) +{ + rill_perror(&rill_errno); + exit(code); +} + +size_t rill_strerror(struct rill_error *err, char *dest, size_t len) +{ + if (!err->errno_) { + return snprintf(dest, len, "%s:%d: %s\n", + err->file, err->line, err->msg); + } + else { + return snprintf(dest, len, "%s:%d: %s - %s(%d)\n", + err->file, err->line, err->msg, + strerror(err->errno_), err->errno_); + } +} + +void rill_perror(struct rill_error *err) +{ + char buf[128 + rill_err_msg_cap]; + size_t len = rill_strerror(err, buf, sizeof(buf)); + + if (write(2, buf, len) == -1) + fprintf(stderr, "rill_perror failed: %s", strerror(errno)); +} + + +void rill_vfail(const char *file, int line, const char *fmt, ...) +{ + rill_errno = (struct rill_error) { .errno_ = 0, .file = file, .line = line }; + + va_list args; + va_start(args, fmt); + (void) vsnprintf(rill_errno.msg, rill_err_msg_cap, fmt, args); + va_end(args); +} + +void rill_vfail_errno(const char *file, int line, const char *fmt, ...) +{ + rill_errno = (struct rill_error) { .errno_ = errno, .file = file, .line = line }; + + va_list args; + va_start(args, fmt); + (void) vsnprintf(rill_errno.msg, rill_err_msg_cap, fmt, args); + va_end(args); +} + + +// ----------------------------------------------------------------------------- +// scan_dir +// ----------------------------------------------------------------------------- + +static bool is_rill_file(const char *name) +{ + static const char ext[] = ".rill"; + + size_t len = strnlen(name, NAME_MAX); + if (len < sizeof(ext)) return false; + + return strstr(name, ext); +} + +size_t rill_scan_dir(const char *dir, struct rill_store **list, size_t cap) +{ + DIR *dir_handle = opendir(dir); + if (!dir_handle) { + if (errno == ENOENT) return 0; + rill_fail_errno("unable to open dir '%s'", dir); + return 0; + } + + size_t len = 0; + struct dirent *entry = NULL; + while ((entry = readdir(dir_handle))) { + // I found the one filesystem that doesn't support dirent->d_type... + if (!is_rill_file(entry->d_name)) continue; + + char file[PATH_MAX]; + snprintf(file, sizeof(file), "%s/%s", dir, entry->d_name); + + list[len] = rill_store_open(file); + if (!list[len]) continue; + + len++; + if (len == cap) { + rill_fail("rotate: too many files to rotate in '%s'", dir); + break; + } + } + + closedir(dir_handle); + return len; +} diff --git a/src/utils.h b/src/utils.h new file mode 100644 index 0000000..93b5eb0 --- /dev/null +++ b/src/utils.h @@ -0,0 +1,98 @@ +/* utils.h + Rémi Attab (remi.attab@gmail.com), 04 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#pragma once + + +#include +#include +#include +#include + + +// ----------------------------------------------------------------------------- +// compiler +// ----------------------------------------------------------------------------- + +#define rill_packed __attribute__((__packed__)) +#define rill_noreturn __attribute__((noreturn)) +#define rill_printf(x,y) __attribute__((format(printf, x, y))) +#define rill_likely(x) __builtin_expect(x, 1) +#define rill_unlikely(x) __builtin_expect(x, 0) + + +// ----------------------------------------------------------------------------- +// misc +// ----------------------------------------------------------------------------- + +#define array_len(arr) (sizeof((arr)) / sizeof((arr)[0])) + + +// ----------------------------------------------------------------------------- +// err +// ----------------------------------------------------------------------------- + +void rill_abort() rill_noreturn; +void rill_exit(int code) rill_noreturn; + +void rill_vfail(const char *file, int line, const char *fmt, ...) + rill_printf(3, 4); + +void rill_vfail_errno(const char *file, int line, const char *fmt, ...) + rill_printf(3, 4); + +#define rill_fail(...) \ + rill_vfail(__FILE__, __LINE__, __VA_ARGS__) + +#define rill_fail_errno(...) \ + rill_vfail_errno(__FILE__, __LINE__, __VA_ARGS__) + + +// ----------------------------------------------------------------------------- +// time +// ----------------------------------------------------------------------------- + +enum +{ + mins_in_hour = 60, + hours_in_day = 24, + days_in_week = 8, // more closely approximates a month + weeks_in_month = 4, + months_in_expire = 16, + + sec_secs = 1, + min_secs = 60 * sec_secs, + hour_secs = mins_in_hour * min_secs, + day_secs = hours_in_day * hour_secs, + week_secs = days_in_week * day_secs, + month_secs = weeks_in_month * week_secs, + expire_secs = months_in_expire * month_secs, +}; + + +// ----------------------------------------------------------------------------- +// vma +// ----------------------------------------------------------------------------- + +enum { page_len_s = 4096 }; +static const size_t page_len = page_len_s; + +static inline size_t to_vma_len(size_t len) +{ + if (!(len % page_len)) return len; + return (len & ~(page_len - 1)) + page_len; +} + + +// ----------------------------------------------------------------------------- +// args +// ----------------------------------------------------------------------------- + +inline bool rill_args_col(bool a, bool b, enum rill_col *out) +{ + if ((a && b) || (!a && !b)) return false; + *out = a ? rill_col_a : rill_col_b; + return true; +} diff --git a/src/vals.c b/src/vals.c new file mode 100644 index 0000000..c7c64f8 --- /dev/null +++ b/src/vals.c @@ -0,0 +1,112 @@ +/* vals.c + Rémi Attab (remi.attab@gmail.com), 01 Oct 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +// ----------------------------------------------------------------------------- +// vals +// ----------------------------------------------------------------------------- + +struct rill_packed vals +{ + uint64_t len; + uint64_t data[]; +}; + +typedef struct htable vals_rev_t; + +static size_t vals_vtoi(vals_rev_t *rev, rill_val_t val) +{ + if (!val) return 0; // \todo giant hack for coder_finish + + struct htable_ret ret = htable_get(rev, val); + assert(ret.ok); + return ret.value; +} + +// \todo should technically return bool for htable resize errors. Need to fix +// htable interface. +static void vals_rev_make(const struct vals *vals, vals_rev_t *rev) +{ + htable_reset(rev); + htable_reserve(rev, vals->len); + + for (size_t index = 1; index <= vals->len; ++index) { + struct htable_ret ret = htable_put(rev, vals->data[index-1], index); + assert(ret.ok); + } +} + +static int val_cmp(const void *l, const void *r) +{ + rill_val_t lhs = *((const rill_val_t *) l); + rill_val_t rhs = *((const rill_val_t *) r); + + if (lhs < rhs) return -1; + if (lhs > rhs) return 1; + return 0; +} + +static void vals_compact(struct vals *vals) +{ + assert(vals->len); + qsort(vals->data, vals->len, sizeof(vals->data[0]), &val_cmp); + + size_t j = 0; + for (size_t i = 1; i < vals->len; ++i) { + if (vals->data[j] == vals->data[i]) continue; + vals->data[++j] = vals->data[i]; + } + + assert(j + 1 <= vals->len); + vals->len = j + 1; +} + +static struct vals *vals_for_col(const struct rill_rows *rows, enum rill_col col) +{ + struct vals *vals = + calloc(1, sizeof(*vals) + sizeof(vals->data[0]) * rows->len); + + if (!vals) return NULL; + + vals->len = rows->len; + for (size_t i = 0; i < rows->len; ++i) + vals->data[i] = rill_row_get(&rows->data[i], col); + + vals_compact(vals); + return vals; +} + +static struct vals *vals_add_index(struct vals *vals, const struct index *index) +{ + assert(index); + + if (!vals) { + vals = calloc(1, sizeof(*vals) + index->len * sizeof(vals->data[0])); + if (!vals) { + rill_fail("unable to allocate memory for vals: %lu", index->len); + return NULL; + } + + for (size_t i = 0; i < index->len; ++i) + vals->data[i] = index->data[i].key; + vals->len = index->len; + + return vals; + } + + size_t len = vals->len + index->len; + vals = realloc(vals, sizeof(*vals) + len * sizeof(vals->data[0])); + if (!vals) { + rill_fail("unable to allocate memory for vals: %lu + %lu", + vals->len, index->len); + return NULL; + } + + for (size_t i = 0; i < index->len; ++i) + vals->data[vals->len + i] = index->data[i].key; + vals->len += index->len; + + vals_compact(vals); + return vals; +} diff --git a/test/coder_test.c b/test/coder_test.c new file mode 100644 index 0000000..036c8ee --- /dev/null +++ b/test/coder_test.c @@ -0,0 +1,287 @@ +/* coder_test.c + Rémi Attab (remi.attab@gmail.com), 11 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "test.h" + +#include "store.c" + + +// ----------------------------------------------------------------------------- +// utils +// ----------------------------------------------------------------------------- + +static struct index *index_alloc(size_t rows) +{ + return calloc(1, index_cap(rows)); +} + + +// ----------------------------------------------------------------------------- +// leb128 +// ----------------------------------------------------------------------------- + +static void check_leb128(uint64_t val) +{ + uint8_t data[10] = {0}; + + { + uint8_t *it = leb128_encode(data, val); + size_t len = (uintptr_t) it - (uintptr_t) data; + if (val < (1UL << 7)) assert(len == 1); + else if (val < (1UL << 14)) assert(len == 2); + else if (val < (1UL << 21)) assert(len == 3); + else if (val < (1UL << 28)) assert(len == 4); + else if (val < (1UL << 35)) assert(len == 5); + else if (val < (1UL << 42)) assert(len == 6); + else if (val < (1UL << 49)) assert(len == 7); + else if (val < (1UL << 56)) assert(len == 8); + else if (val < (1UL << 63)) assert(len == 9); + else assert(len == 10); + } + + { + uint8_t *it = data; + uint64_t result = 0; + assert(leb128_decode(&it, it + sizeof(data), &result)); + assert(val == result); + } +} + +bool test_leb128(void) +{ + check_leb128(0); + for (size_t i = 0; i < 64; ++i) check_leb128(1UL << i); + + struct rng rng = rng_make(0); + for (size_t i = 0; i < 64; ++i) { + for (size_t j = 0; j < 100; ++j) + check_leb128(rng_gen_range(&rng, 0, 1UL << i)); + } + + return true; +} + + +// ----------------------------------------------------------------------------- +// vals +// ----------------------------------------------------------------------------- + +#define make_vals(...) \ + ({ \ + rill_val_t vals[] = { __VA_ARGS__ }; \ + make_vals_impl(vals, sizeof(vals) / sizeof(vals[0])); \ + }) + +#define make_index(...) \ + ({ \ + rill_val_t vals[] = { __VA_ARGS__ }; \ + size_t len = sizeof(vals) / sizeof(vals[0]); \ + struct index *index = index_alloc(len); \ + for (size_t i = 0; i < len; ++i) \ + index_put(index, vals[i], 1); \ + index; \ + }) + +static struct vals *make_vals_impl(rill_val_t *list, size_t len) +{ + struct vals *vals = calloc(1, sizeof(struct vals) + sizeof(list[0]) * len); + + vals->len = len; + memcpy(vals->data, list, sizeof(list[0]) * len); + + vals_compact(vals); + return vals; +} + +static void check_vals(struct rill_rows rows, struct vals *exp) +{ + struct vals *vals = vals_for_col(&rows, rill_col_b); + + assert(vals->len == exp->len); + for (size_t i = 0; i < exp->len; ++i) + assert(vals->data[i] == exp->data[i]); + + vals_rev_t rev = {0}; + vals_rev_make(vals, &rev); + + for (size_t i = 0; i < exp->len; ++i) { + size_t index = vals_vtoi(&rev, exp->data[i]); + assert(vals->data[index - 1] == exp->data[i]); + } + + free(vals); + free(exp); + rill_rows_free(&rows); + htable_reset(&rev); +} + +static void check_vals_merge(struct vals *a, struct index *b, struct vals *exp) +{ + struct vals *result = vals_add_index(a, b); + + assert(result->len == exp->len); + for (size_t i = 0; i < exp->len; ++i) + assert(result->data[i] == exp->data[i]); + + free(result); + free(b); + free(exp); +} + +bool test_vals(void) +{ + check_vals(make_rows(row(1, 10)), make_vals(10)); + + check_vals(make_rows(row(1, 10), row(1, 10)), make_vals(10)); + check_vals(make_rows(row(1, 10), row(2, 10)), make_vals(10)); + + check_vals(make_rows(row(1, 10), row(1, 20)), make_vals(10, 20)); + check_vals(make_rows(row(1, 10), row(2, 20)), make_vals(10, 20)); + + check_vals(make_rows(row(2, 20), row(1, 10)), make_vals(10, 20)); + check_vals(make_rows(row(1, 20), row(1, 10)), make_vals(10, 20)); + + check_vals_merge(make_vals(10), make_index(10), make_vals(10)); + check_vals_merge(make_vals(10), make_index(20), make_vals(10, 20)); + + check_vals_merge(NULL, make_index(10, 20), make_vals(10, 20)); + check_vals_merge(make_vals(10, 20), make_index(20), make_vals(10, 20)); + check_vals_merge(make_vals(10, 20), make_index(20, 30), make_vals(10, 20, 30)); + check_vals_merge(make_vals(10, 20), make_index(20, 30, 40, 50, 60), + make_vals(10, 20, 30, 40, 50, 60)); + + return true; +} + + +// ----------------------------------------------------------------------------- +// coder +// ----------------------------------------------------------------------------- + +static struct index *lookup_alloc(struct vals *vals) +{ + struct index *lookup = calloc(1, index_cap(vals->len)); + lookup->len = vals->len; + + for (size_t i = 0; i < lookup->len; ++i) + lookup->data[i].key = vals->data[i]; + + return lookup; +} + +static void check_coder(struct rill_rows rows) +{ + rill_rows_compact(&rows); + + struct vals *vals[2] = { + vals_for_col(&rows, rill_col_a), + vals_for_col(&rows, rill_col_b), + }; + + struct index *index = index_alloc(vals[rill_col_a]->len); + struct index *lookup = lookup_alloc(vals[rill_col_b]); + + size_t cap = coder_cap(vals[rill_col_b]->len, rows.len); + uint8_t *buffer = calloc(1, cap); + + size_t len = 0; + { + struct encoder coder = + make_encoder(buffer, buffer + cap, vals[rill_col_b], index); + + for (size_t i = 0; i < rows.len; ++i) + assert(coder_encode(&coder, &rows.data[i])); + + assert(coder_finish(&coder)); + len = coder.it - buffer; + } + + if (false) { + printf("input: "); rill_rows_print(&rows); + + printf("buffer: start=%p, len=%lu\n", (void *) buffer, len); + hexdump(buffer, cap); + + printf("index: [ "); + for (size_t i = 0; i < index->len; ++i) { + struct index_kv *row = &index->data[i]; + printf("{%p, %p} ", (void *) row->key, (void *) row->off); + } + printf("]\n"); + + printf("lookup: [ "); + for (size_t i = 0; i < lookup->len; ++i) { + struct index_kv *row = &lookup->data[i]; + printf("%p ", (void *) row->key); + } + printf("]\n"); + } + + { + struct decoder coder = + make_decoder_at(buffer, buffer + len, lookup, index, 0); + + struct rill_row row = {0}; + for (size_t i = 0; i < rows.len; ++i) { + assert(coder_decode(&coder, &row)); + assert(rill_row_cmp(&row, &rows.data[i]) == 0); + } + + assert(coder_decode(&coder, &row)); + assert(rill_row_nil(&row)); + } + + for (size_t i = 0; i < rows.len; ++i) { + size_t key_idx; uint64_t off; + assert(index_find(index, rows.data[i].a, &key_idx, &off)); + struct decoder coder = make_decoder_at( + buffer + off, buffer + len, lookup, index, key_idx); + + struct rill_row row = {0}; + do { + assert(coder_decode(&coder, &row)); + assert(row.a == rows.data[i].a); + } while (row.b != rows.data[i].b); + } + + free(buffer); + free(lookup); + free(index); + for (size_t col = 0; col < rill_cols; ++col) free(vals[col]); + rill_rows_free(&rows); +} + + +bool test_coder(void) +{ + check_coder(make_rows(row(1, 10))); + check_coder(make_rows(row(1, 10), row(1, 20))); + check_coder(make_rows(row(1, 10), row(2, 20))); + check_coder(make_rows(row(1, 10), row(1, 20), row(2, 30))); + check_coder(make_rows(row(1, 10), row(1, 20), row(2, 10))); + + struct rng rng = rng_make(0); + for (size_t iterations = 0; iterations < 100; ++iterations) + check_coder(make_rng_rows(&rng)); + + return true; +} + + +// ----------------------------------------------------------------------------- +// main +// ----------------------------------------------------------------------------- + +int main(int argc, char **argv) +{ + (void) argc, (void) argv; + bool ret = true; + + ret = ret && test_leb128(); + ret = ret && test_vals(); + ret = ret && test_coder(); + + return ret ? 0 : 1; +} diff --git a/test/index_test.c b/test/index_test.c new file mode 100644 index 0000000..804ba2b --- /dev/null +++ b/test/index_test.c @@ -0,0 +1,118 @@ +#include "test.h" + +#include "index.c" + +// ----------------------------------------------------------------------------- +// utils +// ----------------------------------------------------------------------------- + +static struct index *index_alloc(size_t rows) +{ + struct index *index = calloc(1, index_cap(rows)); + + assert(index); + assert(index->len == 0); + + return index; +} + + +// ----------------------------------------------------------------------------- +// test_index_build +// ----------------------------------------------------------------------------- + +static bool test_index_build(void) +{ + enum { rows = 10 }; + + struct index *index = index_alloc(rows); + + rill_val_t data[rows] = {0}; + for (size_t i = 1; i < rows; ++i) data[i] = data[i - 1] += 2; + + for (size_t i = 0; i < rows; i++) + index_put(index, data[i], i); + + assert(index->len == rows); + for (size_t i = 0; i < index->len; i++) + assert(index_get(index, i) == data[i]); + + assert(index_get(index, index->len) == 0); + + free(index); + return true; +} + + +// ----------------------------------------------------------------------------- +// test_index_lookup +// ----------------------------------------------------------------------------- + +static struct index *make_index(rill_val_t *data, size_t n) +{ + struct index *index = index_alloc(n); + for (size_t i = 0; i < n; i++) + index_put(index, data[i], i); + + return index; +} + +#define index_from_keys(...) \ + ({ \ + rill_val_t keys[] = { __VA_ARGS__ }; \ + make_index(keys, sizeof(keys) / sizeof(keys[0])); \ + }) + +#define assert_found(index, ...) { \ + rill_val_t keys[] = { __VA_ARGS__ }; \ + size_t key_idx; \ + uint64_t val; \ + for (size_t i = 0; i < sizeof(keys) / sizeof(keys[0]); i++) { \ + assert(index_find(index, keys[i], &key_idx, &val)); \ + assert(key_idx == i); \ + assert(val == i); \ + } \ +} + +#define assert_not_found(index, ...) { \ + rill_val_t keys[] = { __VA_ARGS__ }; \ + size_t key_idx; \ + uint64_t val; \ + for (size_t i = 0; i < sizeof(keys) / sizeof(keys[0]); i++) \ + assert(!index_find(index, keys[i], &key_idx, &val)); \ +} + +bool test_index_lookup(void) +{ + struct index *index; + + index = index_from_keys(0, 3, 6, 9, 12, 15, 18, 21, 24, 27); + assert_found(index, 0, 3, 6, 9, 12, 15, 18, 21, 24, 27); + assert_not_found(index, 1, 5, 8, 10, 14, 17, 20, 22, 25, 100); + free(index); + + index = index_from_keys(0, 3, 4, 5, 6, 7, 8, 9, 12, 27); + assert_found(index, 0, 3, 4, 5, 6, 7, 8, 9, 12, 27); + free(index); + + index = index_from_keys(0, 3, 12, 13, 14, 15, 16, 17, 18, 27); + assert_found(index, 0, 3, 12, 13, 14, 15, 16, 17, 18, 27); + free(index); + + return true; +} + +// ----------------------------------------------------------------------------- +// main +// ----------------------------------------------------------------------------- + +int main(int argc, char **argv) +{ + (void) argc, (void) argv; + bool ret = true; + + ret = ret && test_index_build(); + ret = ret && test_index_lookup(); + + return ret ? 0 : 1; +} diff --git a/test/rotate_test.c b/test/rotate_test.c new file mode 100644 index 0000000..bfa4b09 --- /dev/null +++ b/test/rotate_test.c @@ -0,0 +1,100 @@ +/* rill_test.c + Rémi Attab (remi.attab@gmail.com), 13 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "test.h" + + +// ----------------------------------------------------------------------------- +// rotate +// ----------------------------------------------------------------------------- + +void acc_dump(struct rill_acc *acc, const char *dir, rill_ts_t ts) +{ + char file[PATH_MAX]; + snprintf(file, sizeof(file), "%s/%010lu.rill", dir, ts); + + if (!rill_acc_write(acc, file, ts)) rill_abort(); +} + +bool test_rotate(void) +{ + const char *dir = "test.rotate.db"; + + rm(dir); + + const uint64_t key = 1; + enum { step = 10 * min_secs }; + + struct rill_acc *acc = rill_acc_open(dir, 1); + + { + + for (rill_ts_t ts = 0; ts < expire_secs; ts += step) { + rill_acc_ingest(acc, key, ts + 1); + acc_dump(acc, dir, ts); + rill_rotate(dir, ts); + } + + acc_dump(acc, dir, expire_secs); + rill_rotate(dir, expire_secs); + } + + { + struct rill_query *query = rill_query_open(dir); + struct rill_rows rows = {0}; + assert(rill_query_key(query, rill_col_a, key, &rows)); + rill_query_close(query); + + size_t i = 0; + for (rill_ts_t ts = 0; ts < expire_secs; ts += step) { + assert(rows.data[i].a == key); + assert(rows.data[i].b == ts + 1); + ++i; + } + + rill_rows_free(&rows); + } + + for (size_t i = 1; i <= 6; ++i) { + rill_ts_t ts = (months_in_expire + i) * month_secs; + acc_dump(acc, dir, ts); + rill_rotate(dir, ts); + } + + rill_acc_close(acc); + + { + struct rill_query *query = rill_query_open(dir); + struct rill_rows rows = {0}; + assert(rill_query_key(query, rill_col_a, key, &rows)); + rill_query_close(query); + + for (size_t i = 0; i < rows.len; ++i) { + assert(rows.data[i].a == key); + assert(rows.data[i].b >= (5 * month_secs) + 1); + } + + rill_rows_free(&rows); + } + + rm(dir); + + return true; +} + + +// ----------------------------------------------------------------------------- +// main +// ----------------------------------------------------------------------------- + +int main(int argc, char **argv) +{ + (void) argc, (void) argv; + bool ret = true; + + ret = ret && test_rotate(); + + return ret ? 0 : 1; +} diff --git a/test/store_test.c b/test/store_test.c new file mode 100644 index 0000000..317d03f --- /dev/null +++ b/test/store_test.c @@ -0,0 +1,258 @@ +/* coder_test.c + Rémi Attab (remi.attab@gmail.com), 11 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#include "test.h" +#include "store.c" + + +// ----------------------------------------------------------------------------- +// utils +// ----------------------------------------------------------------------------- + +static struct rill_store *make_store(const char *name, struct rill_rows *rows) +{ + unlink(name); + assert(rill_store_write(name, 0, 0, rows)); + + struct rill_store *store = rill_store_open(name); + if (!store) rill_abort(); + + return store; +} + + +// ----------------------------------------------------------------------------- +// query +// ----------------------------------------------------------------------------- + +static void check_query(struct rill_rows rows) +{ + struct rill_rows expected = {0}; + rill_rows_copy(&rows, &expected); + rill_rows_compact(&expected); + + struct rill_store *store = make_store("test.store.query", &rows); + struct rill_rows result = {0}; + + for (size_t col = 0; col < rill_cols; ++col) { + for (size_t i = 0; i < expected.len;) { + rill_rows_clear(&result); + assert(rill_store_query(store, col, expected.data[i].a, &result)); + + assert(expected.len - i >= result.len); + for (size_t j = 0; j < result.len; ++j, ++i) + assert(!rill_row_cmp(&expected.data[i], &result.data[j])); + } + + rill_rows_invert(&expected); // setup for next iteration. + } + + rill_store_close(store); + rill_rows_free(&rows); + rill_rows_free(&expected); + rill_rows_free(&result); +} + +bool test_query(void) +{ + check_query(make_rows(row(1, 10))); + check_query(make_rows(row(1, 10), row(2, 20))); + check_query(make_rows(row(1, 10), row(1, 20), row(2, 20))); + check_query(make_rows(row(1, 10), row(1, 20), row(1, 20), row(1, 30))); + + struct rng rng = rng_make(0); + for (size_t iterations = 0; iterations < 10; ++iterations) + check_query(make_rng_rows(&rng)); + + return true; +} + + +// ----------------------------------------------------------------------------- +// vals +// ----------------------------------------------------------------------------- + +static void check_vals(struct rill_rows rows) +{ + struct vals *exp[2] = { + vals_for_col(&rows, rill_col_a), + vals_for_col(&rows, rill_col_b), + }; + + struct rill_store *store = make_store("test.store.vals", &rows); + + for (size_t col = 0; col < rill_cols; ++col) { + size_t len = rill_store_vals_count(store, col); + rill_val_t *vals = calloc(len, sizeof(*vals)); + + assert(rill_store_vals(store, col, vals, len) == len); + + for (size_t i = 0; i < len; ++i) + assert(vals[i] == exp[col]->data[i]); + + free(exp[col]); + free(vals); + } +} + +bool test_vals(void) +{ + check_vals(make_rows(row(1, 10))); + check_vals(make_rows(row(1, 10), row(1, 20))); + check_vals(make_rows(row(1, 10), row(2, 10))); + check_vals(make_rows(row(1, 10), row(1, 20), row(2, 10), row(2, 20))); + check_vals(make_rows(row(1, 10), row(1, 20), row(2, 20), row(3, 30))); + + struct rng rng = rng_make(0); + for (size_t iterations = 0; iterations < 10; ++iterations) + check_vals(make_rng_rows(&rng)); + + return true; +} + + +// ----------------------------------------------------------------------------- +// it +// ----------------------------------------------------------------------------- + +static void check_it(struct rill_rows rows) +{ + struct rill_rows expected = {0}; + rill_rows_copy(&rows, &expected); + rill_rows_compact(&expected); + + struct rill_store *store = make_store("test.store.it", &rows); + + for (size_t col = 0; col < rill_cols; ++col) { + struct rill_store_it *it = rill_store_begin(store, col); + + struct rill_row row = {0}; + for (size_t i = 0; i < expected.len; ++i) { + assert(rill_store_it_next(it, &row)); + assert(!rill_row_cmp(&expected.data[i], &row)); + } + + assert(rill_store_it_next(it, &row)); + assert(rill_row_nil(&row)); + + rill_store_it_free(it); + + rill_rows_invert(&expected); // setup for next iteration. + } + + rill_store_close(store); + rill_rows_free(&rows); + rill_rows_free(&expected); +} + +bool test_it(void) +{ + check_it(make_rows(row(1, 10))); + check_it(make_rows(row(1, 10), row(2, 20))); + check_it(make_rows(row(1, 10), row(1, 20), row(2, 20))); + check_it(make_rows(row(1, 10), row(1, 20), row(1, 20), row(1, 30))); + + struct rng rng = rng_make(0); + for (size_t iterations = 0; iterations < 10; ++iterations) + check_it(make_rng_rows(&rng)); + + return true; +} + + +// ----------------------------------------------------------------------------- +// merge +// ----------------------------------------------------------------------------- + +static void check_merge(struct rill_rows a, struct rill_rows b) +{ + struct rill_rows expected = {0}; + rill_rows_append(&expected, &a); + rill_rows_append(&expected, &b); + rill_rows_compact(&expected); + + struct rill_store *to_merge[] = { + make_store("test.store.merge.a", &a), + make_store("test.store.merge.b", &b) + }; + + const char *file = "test.store.merge.result"; + unlink(file); + if (!rill_store_merge(file, 0, 0, to_merge, 2)) rill_abort(); + struct rill_store *store = rill_store_open(file); + assert(store); + + + for (size_t col = 0; col < rill_cols; ++col) { + struct rill_store_it *it = rill_store_begin(store, col); + + struct rill_row row = {0}; + for (size_t i = 0; i < expected.len; ++i) { + assert(rill_store_it_next(it, &row)); + assert(!rill_row_cmp(&expected.data[i], &row)); + } + + assert(rill_store_it_next(it, &row)); + assert(rill_row_nil(&row)); + + rill_store_it_free(it); + + rill_rows_invert(&expected); // setup for next iteration. + } + + rill_store_close(to_merge[0]); + rill_store_close(to_merge[1]); + rill_store_close(store); + rill_rows_free(&a); + rill_rows_free(&b); + rill_rows_free(&expected); +} + +bool test_merge(void) +{ + check_merge( + make_rows(row(1, 10)), + make_rows(row(1, 10))); + check_merge( + make_rows(row(1, 10)), + make_rows(row(2, 20))); + check_merge( + make_rows(row(1, 10), row(1, 20)), + make_rows(row(2, 10), row(2, 20))); + check_merge( + make_rows(row(1, 10), row(1, 20)), + make_rows(row(1, 20), row(2, 10))); + check_merge( + make_rows(row(1, 10)), + make_rows(row(1, 10), row(2, 10), row(2, 20))); + check_merge( + make_rows(row(1, 10), row(2, 10), row(2, 20)), + make_rows(row(1, 10))); + + struct rng rng = rng_make(0); + for (size_t iterations = 0; iterations < 10; ++iterations) + check_merge(make_rng_rows(&rng), make_rng_rows(&rng)); + + return true; + +} + + +// ----------------------------------------------------------------------------- +// main +// ----------------------------------------------------------------------------- + +int main(int argc, char **argv) +{ + (void) argc, (void) argv; + bool ret = true; + + ret = ret && test_query(); + ret = ret && test_vals(); + ret = ret && test_it(); + ret = ret && test_merge(); + + return ret ? 0 : 1; +} diff --git a/test/test.h b/test/test.h new file mode 100644 index 0000000..9d4df57 --- /dev/null +++ b/test/test.h @@ -0,0 +1,105 @@ +/* test.h + Rémi Attab (remi.attab@gmail.com), 11 Sep 2017 + FreeBSD-style copyright and disclaimer apply +*/ + +#pragma once + +#include "rill.h" +#include "utils.h" +#include "htable.h" +#include "rng.h" + +#include +#include +#include +#include + +#include +#include +#include + + +// ----------------------------------------------------------------------------- +// rows +// ----------------------------------------------------------------------------- + +struct rill_row row(rill_val_t a, rill_val_t b) +{ + return (struct rill_row) { .a = a, .b = b }; +} + +#define make_rows(...) \ + ({ \ + struct rill_row rows[] = { __VA_ARGS__ }; \ + make_rows_impl(rows, sizeof(rows) / sizeof(rows[0])); \ + }) + +struct rill_rows make_rows_impl(const struct rill_row *rows, size_t len) +{ + struct rill_rows result = {0}; + assert(rill_rows_reserve(&result, len)); + + for (size_t i = 0; i < len; ++i) + assert(rill_rows_push(&result, rows[i].a, rows[i].b)); + + return result; +} + +enum { rng_range_a = 250, rng_range_b = 100 }; + +struct rill_rows make_rng_rows(struct rng *rng) +{ + const size_t len = 1UL << rng_gen_range(rng, 9, 12); + struct rill_rows rows = {0}; + rill_rows_reserve(&rows, len); + + for (size_t i = 0; i < len; ++i) { + uint64_t a = rng_gen_range(rng, 1, rng_range_a); + uint64_t b = rng_gen_range(rng, 1, rng_range_b); + rill_rows_push(&rows, a, b); + } + + return rows; +} + + +// ----------------------------------------------------------------------------- +// rm +// ----------------------------------------------------------------------------- + +void rm(const char *path) +{ + DIR *dir = opendir(path); + if (!dir) return; + + struct dirent *entry; + while (true) { + if (!(entry = readdir(dir))) break; + else if (entry->d_type != DT_REG) continue; + + char file[PATH_MAX]; + snprintf(file, sizeof(file), "%s/%s", path, entry->d_name); + unlink(file); + } + + closedir(dir); + rmdir(path); +} + + +// ----------------------------------------------------------------------------- +// hexdump +// ----------------------------------------------------------------------------- + +void hexdump(const uint8_t *buffer, size_t len) +{ + for (size_t i = 0; i < len;) { + printf("%6p: ", (void *) i); + for (size_t j = 0; j < 16 && i < len; ++i, ++j) { + if (j % 2 == 0) printf(" "); + printf("%02x", buffer[i]); + } + printf("\n"); + } +}