Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions include/cachemap.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,31 @@ struct zone_map_result {
struct zone_map_result
zn_cachemap_find(struct zn_cachemap *map, const uint32_t data_id);

/** @brief Begin compaction of a zone. This temporarily renders all chunks in that zone unavailable
until the compaction operation is finished, after which zn_cachemap_compact_end should be
called.
* @param[in] map the cachemap
* @param[in] zone_id the zone that will be compacted
* @param[out] data_ids an out pointer for an array. This outputs the list of data IDs that were associated with the zone.
* @param[out] locations an out pointer for an array. This outputs the list of locations (most importantly are the chunk locations) that were associated with the zone.
* @param[out] count an out pointer for the number of chunks that are still valid in the zone.
*/
void
zn_cachemap_compact_begin(struct zn_cachemap *map, const uint32_t zone_id, uint32_t **data_ids,
struct zn_pair **locations, uint32_t *count);

/** @brief Finish compaction of a zone. This temporarily renders all chunks in that zone unavailable
until the compaction operation is finished, after which zn_cachemap_compact_end should be
called.
* @param[in] map the cachemap
* @param[in] zone_id the zone that was compacted
* @param[in] data_ids An array that stores the list of data IDs that were associated with the zone.
* @param[in] locations An array that stores the list of locations that were associated with the zone.
* @param[in] count the number of chunks that were still valid in the zone.
*/
void
zn_cachemap_compact_end(struct zn_cachemap *map, const uint32_t zone_id, const uint32_t* data_ids, struct zn_pair* locations, uint32_t count);

/** @brief Inserts a new mapping into the data structure. Called by
* the thread when it's finished writing to the zone.
*
Expand Down
10 changes: 10 additions & 0 deletions include/zncache.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@ zn_init_cache(struct zn_cache *cache, struct zbd_info *info, size_t chunk_sz, ui
void
zn_destroy_cache(struct zn_cache *cache);

/**
* @brief Read the entire zone into the buffer. This is used to compact a zone.
*
* @param cache Pointer to the `zn_cache` structure
* @param zone_id The zone to read
* @param buffer the buffer to write to. It must have been allocated by the caller and big enough to store
* @return Buffer read from disk. It's the same buffer as the input
*/
unsigned char * zn_read_from_disk_whole(struct zn_cache *cache, uint32_t zone_id, void *buffer);

/**
* @brief Read a chunk from disk
*
Expand Down
2 changes: 2 additions & 0 deletions include/znutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define UTIL_H

#define SEED 42
#define ZN_DIRECT_ALIGNMENT 4096

#include "libzbd/zbd.h"

Expand Down Expand Up @@ -32,6 +33,7 @@ enum print_g_hash_table_type {
PRINT_G_HASH_TABLE_PROM_LRU_NODE = 1,
PRINT_G_HASH_TABLE_ZN_PAIR = 2,
PRINT_G_HASH_TABLE_ZN_PAIR_NODE = 3,
PRINT_G_HASH_TABLE_ZONE_MAP_RESULT = 4,
};

/**
Expand Down
12 changes: 11 additions & 1 deletion include/zone_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ zsm_get_active_zone(struct zone_state_manager *state, struct zn_pair *pair);
GArray
zsm_get_active_zone_batch(int chunks);

// Returns the active zone after it's written to

/** @brief Returns the active zone after it's written to.
* @param pair the chunk that was written to.
*/
int
zsm_return_active_zone(struct zone_state_manager *state, struct zn_pair *pair);

Expand All @@ -120,6 +123,13 @@ zsm_return_active_zone(struct zone_state_manager *state, struct zn_pair *pair);
int
zsm_evict(struct zone_state_manager *state, int zone_to_free);

/** @brief Resets the zone pointer, making it writeable again from the start. Once the thread is finished updating the zone, it should call zsm_return_active_zone.
* @param zone_id the zone to make free again
* @param count the number of chunks that are still valid.
*/
void
zsm_evict_and_write(struct zone_state_manager *state, uint32_t zone_id, uint32_t count);

void
zsm_failed_to_write(struct zone_state_manager *state, struct zn_pair pair);

Expand Down
19 changes: 17 additions & 2 deletions src/cache.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// For pread
#define _XOPEN_SOURCE 500
#include "cachemap.h"
#include <stdint.h>
#include <unistd.h>

Expand All @@ -14,8 +15,6 @@
#include "libzbd/zbd.h"
#include <inttypes.h>

#define ZN_DIRECT_ALIGNMENT 4096

void
zn_fg_evict(struct zn_cache *cache) {
uint32_t free_zones = zsm_get_num_free_zones(&cache->zone_state);
Expand Down Expand Up @@ -79,6 +78,7 @@ zn_cache_get(struct zn_cache *cache, const uint32_t id, unsigned char *random_bu
struct timespec start_time, end_time;
TIME_NOW(&start_time);
unsigned char *data = zn_read_from_disk(cache, &result.location);
result.location.id = id;
TIME_NOW(&end_time);
double t = TIME_DIFFERENCE_NSEC(start_time, end_time);
ZN_PROFILER_UPDATE(cache->profiler, ZN_PROFILER_METRIC_READ_LATENCY, t);
Expand Down Expand Up @@ -258,6 +258,21 @@ zn_destroy_cache(struct zn_cache *cache) {
/* g_mutex_clear(&cache->reader.lock); */
}

unsigned char *
zn_read_from_disk_whole(struct zn_cache *cache, uint32_t zone_id, void *buffer) {
assert(cache);
assert(buffer);

long write_ptr = CHUNK_POINTER(cache->zone_cap, cache->chunk_sz, 0, zone_id);
size_t bytes = pread(cache->fd, buffer, cache->zone_cap, write_ptr);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is sufficient. We need to do something like this

ZNCache/src/zncache.c

Lines 211 to 237 in 04a939c

read_workload(int fd, uint32_t *buffer, size_t size) {
size_t total_bytes_read = 0;
while (total_bytes_read < size) {
errno = 0;
ssize_t bytes_read = read(fd, buffer + total_bytes_read, size - total_bytes_read);
if (bytes_read < 0) {
if (errno == EINTR) {
// Interrupted
continue;
}
fprintf(stderr, "Couldn't read the file: '%s'\n", strerror(errno));
return 1;
}
if (bytes_read == 0) {
break;
}
total_bytes_read += bytes_read;
}
if (total_bytes_read != size) {
fprintf(stderr, "Couldn't read the file fully, read %zu bytes out of %zu\n", total_bytes_read, size);
return 1;
}
return 0;
}
. It's not guaranteed to always return all of the bytes and you might need to do further calls

if (bytes != cache->zone_cap) {
fprintf(stderr, "Couldn't read from fd\n");
return NULL;
}

return buffer;
}

unsigned char *
zn_read_from_disk(struct zn_cache *cache, struct zn_pair *zone_pair) {
unsigned char *data;
Expand Down
89 changes: 87 additions & 2 deletions src/cachemap.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "cachemap.h"
#include "znbackend.h"
#include "zncache.h"

#include "assert.h"
Expand Down Expand Up @@ -78,27 +79,111 @@ zn_cachemap_find(struct zn_cachemap *map, const uint32_t data_id) {
}

void
zn_cachemap_insert(struct zn_cachemap *map, const uint32_t data_id, struct zn_pair location) {
zn_cachemap_compact_begin(struct zn_cachemap *map, const uint32_t zone_id, uint32_t** data_ids, struct zn_pair** locations, uint32_t* count) {
assert(map);
assert(data_ids);
assert(locations);
assert(count);

// This function gathers all valid chunks in the zone and informs
// the calling thread of them.
// It also removes all entries in the hash map in the meantime,
// replacing them with condition variables until the thread is
// finished compacting the zone.

g_mutex_lock(&map->cache_map_mutex);

dbg_print_g_hash_table("map->data_map[location.zone]", map->data_map[location.zone], PRINT_G_HASH_TABLE_GINT);
*count = g_hash_table_size(map->data_map[zone_id]);
*locations = malloc(sizeof(struct zn_pair) * (*count));
*data_ids = malloc(sizeof(uint32_t) * (*count));
assert(*locations);
assert(*data_ids);

// Gather all valid chunks in the zone and temporarily invalidate them
GHashTableIter iter;
gpointer key = NULL;
gpointer value = NULL;
int i = 0;

// Iterate through all valid chunks. Invalid chunks should not be in the data map.
g_hash_table_iter_init (&iter, map->data_map[zone_id]);
while (g_hash_table_iter_next (&iter, &key, &value)) {
uint32_t chunk_offset = GPOINTER_TO_UINT(key);
uint32_t data_id = GPOINTER_TO_UINT(value);

// Find the entry in the hash table
dbg_print_g_hash_table("zone_map", map->zone_map, PRINT_G_HASH_TABLE_ZONE_MAP_RESULT);
struct zone_map_result* res = g_hash_table_lookup(map->zone_map, GUINT_TO_POINTER(data_id));
assert(res);
assert(res->type == RESULT_LOC);
assert(res->location.chunk_offset == chunk_offset);
assert(res->location.id == data_id);

// Invalidate it, replace with a temporary condition variable
res->type = RESULT_COND;
g_cond_init(&res->write_finished);

// Write to the out variables
(*data_ids)[i] = data_id;
(*locations)[i] = (struct zn_pair){.chunk_offset = chunk_offset, .zone = zone_id};

i += 1;
}

g_mutex_unlock(&map->cache_map_mutex);
}

static void
zn_cachemap_insert_nolock(struct zn_cachemap *map, const uint32_t data_id,
struct zn_pair location) {

// It must contain an entry if the thread called zn_cachemap_find beforehand
assert(g_hash_table_contains(map->zone_map, GUINT_TO_POINTER(data_id)));

struct zone_map_result *result = g_hash_table_lookup(map->zone_map, GUINT_TO_POINTER(data_id));
assert(result->type == RESULT_COND);

dbg_print_g_hash_table("zone_map", map->zone_map, PRINT_G_HASH_TABLE_ZONE_MAP_RESULT);
result->location = location; // Does this mutate the entry in the hash table?
result->type = RESULT_LOC;
dbg_print_g_hash_table("zone_map", map->zone_map, PRINT_G_HASH_TABLE_ZONE_MAP_RESULT);
assert(map->data_map[location.zone]);
g_hash_table_insert(map->data_map[location.zone], GUINT_TO_POINTER(location.chunk_offset), GINT_TO_POINTER(data_id));
g_cond_broadcast(&result->write_finished); // Wake up threads waiting for it

dbg_print_g_hash_table("map->data_map[location.zone]", map->data_map[location.zone], PRINT_G_HASH_TABLE_GINT);

}

void
zn_cachemap_compact_end(struct zn_cachemap *map, const uint32_t zone_id, const uint32_t* data_ids, struct zn_pair* locations, uint32_t count) {
assert(map);
assert(data_ids);
assert(locations);

// This function finishes compaction. It reinserts all valid
// chunks back into the cachemap, replacing the condition
// variables with actual locations.

g_mutex_lock(&map->cache_map_mutex);

// Replace with actual locations
for (uint32_t i = 0; i < count; i++) {
uint32_t zone = locations[i].zone;
assert(zone_id == zone);
zn_cachemap_insert_nolock(map, data_ids[i], locations[i]);
}

g_mutex_unlock(&map->cache_map_mutex);
}

void
zn_cachemap_insert(struct zn_cachemap *map, const uint32_t data_id, struct zn_pair location) {
assert(map);
g_mutex_lock(&map->cache_map_mutex);
dbg_print_g_hash_table("map->data_map[location.zone]", map->data_map[location.zone], PRINT_G_HASH_TABLE_GINT);
zn_cachemap_insert_nolock(map, data_id, location);
dbg_print_g_hash_table("map->data_map[location.zone]", map->data_map[location.zone], PRINT_G_HASH_TABLE_GINT);
g_mutex_unlock(&map->cache_map_mutex);
}

Expand Down
77 changes: 72 additions & 5 deletions src/eviction/chunk.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "cachemap.h"
#include "eviction_policy.h"
#include "eviction_policy_chunk.h"
#include "zncache.h"
#include "znutil.h"
#include "minheap.h"
#include "zone_state_manager.h"
Expand All @@ -10,6 +12,7 @@
#include <stdlib.h>
#include <glib.h>
#include <glibconfig.h>
#include <string.h>

void
zn_policy_chunk_update(policy_data_t _policy, struct zn_pair location,
Expand All @@ -28,7 +31,7 @@ zn_policy_chunk_update(policy_data_t _policy, struct zn_pair location,
struct eviction_policy_chunk_zone * zpc = &p->zone_pool[location.zone];
struct zn_pair * zp = &zpc->chunks[location.chunk_offset];

GList *node;
GList *node = NULL;
// Should always be present (might be NULL)
assert(g_hash_table_lookup_extended(p->chunk_to_lru_map, zp, NULL, (gpointer *)&node));

Expand Down Expand Up @@ -76,6 +79,60 @@ zn_policy_chunk_update(policy_data_t _policy, struct zn_pair location,
g_mutex_unlock(&p->policy_mutex);
}

/** @brief This function takes a full zone containing invalid chunks,
and compacts it so that only valid chunks remain.
*
* At a high level, this function copies all data from the zone,
removes all invalid chunks, and makes the zone available again as
an active zone.
*/
static void
zn_policy_compact_zone(struct zn_policy_chunk *p, struct eviction_policy_chunk_zone * old_zone) {
unsigned char* buf = zn_read_from_disk_whole(p->cache, old_zone->zone_id, p->chunk_buf);
assert(buf);

// These arrays are allocated inside the function
uint32_t* data_ids = NULL;
struct zn_pair *locations = NULL;
uint32_t count = 0;

// Get information about the valid chunks
zn_cachemap_compact_begin(&p->cache->cache_map, old_zone->zone_id, &data_ids, &locations, &count);

// Wait for all readers to finish
while (g_atomic_int_get(&p->cache->active_readers[old_zone->zone_id]) > 0) {}

// Reset the current zone, making it writeable again
zsm_evict_and_write(&p->cache->zone_state, old_zone->zone_id, count);

// Copy all the old zones
for (uint32_t i = 0; i < count; i++) {
// Read from the correct chunk offset indicated by the location
unsigned char* read_ptr = &buf[p->cache->chunk_sz * locations[i].chunk_offset];

// Write to the ith sequential chunk
unsigned long long wp =
CHUNK_POINTER(p->cache->zone_cap, p->cache->chunk_sz, i, locations[i].zone);
if (zn_write_out(p->cache->fd, p->cache->chunk_sz, read_ptr, ZN_WRITE_GRANULARITY, wp, p->cache->backend) != 0) {
dbg_printf("Couldn't write to fd at wp=%llu\n", wp);
}
}

// Update the zsm. The latest pair that we wrote to was to the
// (count - 1)th chunk (0 indexed), So indicate it as such in the
// chunk_offset field.
struct zn_pair end_pair = {
.zone = old_zone->zone_id,
.chunk_offset = count - 1,
};
int ret = zsm_return_active_zone(&p->cache->zone_state, &end_pair);
assert(!ret);

// Finish the compaction, updating the cachemap
zn_cachemap_compact_end(&p->cache->cache_map, old_zone->zone_id, data_ids,
locations, count);
}

static void
zn_policy_chunk_gc(policy_data_t policy) {
// TODO: If later separated from evict, lock here
Expand All @@ -88,6 +145,10 @@ zn_policy_chunk_gc(policy_data_t policy) {

while (free_zones < EVICT_LOW_THRESH_ZONES) {
struct zn_minheap_entry *ent = zn_minheap_extract_min(p->invalid_pqueue);
if (!ent) {
return;
}

struct eviction_policy_chunk_zone * old_zone = ent->data;
assert(old_zone);
dbg_printf("Found minheap_entry priority=%u, chunks_in_use=%u, zone=%u\n",
Expand All @@ -103,11 +164,15 @@ zn_policy_chunk_gc(policy_data_t policy) {

struct zn_pair new_location;
enum zsm_get_active_zone_error ret = zsm_get_active_zone(&p->cache->zone_state, &new_location);

// Not enough zones available. We are just going to compact the old zone
if (ret != ZSM_GET_ACTIVE_ZONE_SUCCESS) {
assert(!"TODO");
// TODO: ???
zn_policy_compact_zone(p, old_zone);
return;
}

// TODO: What if someone already wrote the existing data to a different location? We may need to check that first

// Read the chunk from the old zone
unsigned char *data = zn_read_from_disk(p->cache, &old_zone->chunks[i]);
assert(data);
Expand Down Expand Up @@ -149,8 +214,10 @@ int
zn_policy_chunk_evict(policy_data_t policy) {
struct zn_policy_chunk *p = policy;

g_mutex_lock(&p->policy_mutex);

gboolean locked_by_us = g_mutex_trylock(&p->policy_mutex);
if (!locked_by_us) {
return -1;
}

uint32_t in_lru = g_queue_get_length(&p->lru_queue);
uint32_t free_chunks = p->total_chunks - in_lru;
Expand Down
5 changes: 4 additions & 1 deletion src/eviction/promotional.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ int
zn_policy_promotional_get_zone_to_evict(policy_data_t policy) {
struct zn_policy_promotional *promote_policy = policy;

g_mutex_lock(&promote_policy->policy_mutex);
gboolean locked_by_us = g_mutex_trylock(&promote_policy->policy_mutex);
if (!locked_by_us) {
return -1;
}

dbg_print_g_queue("lru_queue", &promote_policy->lru_queue, PRINT_G_QUEUE_GINT);

Expand Down
Loading
Loading