From cde97d67776ed4ab8082f1e2992cf45c41197fae Mon Sep 17 00:00:00 2001 From: muhammad Date: Thu, 5 Aug 2021 10:15:34 -0400 Subject: [PATCH 1/2] Implementation of a concurrent indexing algorithm minor fix fix comment in configure.ac --- configure.ac | 5 + src/duc/cmd-index.c | 13 +- src/libduc/buffer.c | 20 ++ src/libduc/buffer.h | 7 +- src/libduc/db.c | 3 +- src/libduc/duc.h | 14 +- src/libduc/index.c | 646 +++++++++++++++++++++++++++++++++----------- 7 files changed, 530 insertions(+), 178 deletions(-) diff --git a/configure.ac b/configure.ac index 6518f3ac..161399e3 100644 --- a/configure.ac +++ b/configure.ac @@ -165,6 +165,11 @@ The X11 library was not found, which is needed for x11 gui support. fi +AC_CHECK_LIB([pthread], [pthread_create],, [AC_MSG_ERROR([ +The pthread library was not found, which is needed for concurrency in the indexing algorithm. +])]) + + AC_CHECK_HEADERS([fcntl.h limits.h stdint.h stdlib.h string.h sys/ioctl.h unistd.h fnmatch.h termios.h]) AC_CHECK_HEADERS([ncurses.h ncurses/ncurses.h ncursesw/ncurses.h]) diff --git a/src/duc/cmd-index.c b/src/duc/cmd-index.c index c308611d..cbb6f955 100644 --- a/src/duc/cmd-index.c +++ b/src/duc/cmd-index.c @@ -26,6 +26,8 @@ static bool opt_hide_file_names = false; static char *opt_username = NULL; static int opt_uid = 0; static int opt_max_depth = 0; +static unsigned int opt_thread_count = 1; +static unsigned int opt_cutoff_depth = 2; static bool opt_one_file_system = false; static bool opt_progress = false; static bool opt_uncompressed = false; @@ -73,6 +75,8 @@ static int index_main(duc *duc, int argc, char **argv) if(opt_force) open_flags |= DUC_OPEN_FORCE; if(opt_max_depth) duc_index_req_set_maxdepth(req, opt_max_depth); + if(opt_thread_count) duc_index_set_worker_count(opt_thread_count); + if(opt_cutoff_depth) duc_index_set_cutoff_depth(opt_cutoff_depth); if(opt_one_file_system) index_flags |= DUC_INDEX_XDEV; if(opt_hide_file_names) index_flags |= DUC_INDEX_HIDE_FILE_NAMES; if(opt_check_hard_links) index_flags |= DUC_INDEX_CHECK_HARD_LINKS; @@ -174,11 +178,16 @@ static struct ducrc_option options[] = { "VAL is a comma separated list of file system types as found in your systems fstab, for example ext3,ext4,dosfs" }, { &opt_hide_file_names, "hide-file-names", 0 , DUCRC_TYPE_BOOL, "hide file names in index (privacy)", "the names of directories will be preserved, but the names of the individual files will be hidden" }, - { &opt_uid, "uid", 'U', DUCRC_TYPE_INT, "limit index to only files/dirs owned by uid" }, - { &opt_username, "username", 'u', DUCRC_TYPE_STRING, "limit index to only files/dirs owned by username" }, + { &opt_uid, "uid", 'U', DUCRC_TYPE_INT, "limit index to only files/dirs owned by uid" }, + { &opt_username, "username", 'u', DUCRC_TYPE_STRING, "limit index to only files/dirs owned by username" }, { &opt_max_depth, "max-depth", 'm', DUCRC_TYPE_INT, "limit directory names to given depth" , "when this option is given duc will traverse the complete file system, but will only the first VAL " "levels of directories in the database to reduce the size of the index" }, + { &opt_thread_count, "thread-count", 't', DUCRC_TYPE_INT, "stipulate index to use a given number of threads", + "when this option is given duc will multithread the indexing algorithm and use VAL workers to traverse the filesystem. " + "Default value is 1."}, + { &opt_cutoff_depth, "cutoff-depth", 'c', DUCRC_TYPE_INT, "ensures that each worker has at least VAL tasks before " + "other workers start taking its tasks", "In general the lower this is, the higher the concurrency. Default value is 2."}, { &opt_one_file_system, "one-file-system", 'x', DUCRC_TYPE_BOOL, "skip directories on different file systems" }, { &opt_progress, "progress", 'p', DUCRC_TYPE_BOOL, "show progress during indexing" }, { &opt_dryrun, "dry-run", 0 , DUCRC_TYPE_BOOL, "do not update database, just crawl" }, diff --git a/src/libduc/buffer.c b/src/libduc/buffer.c index b18346ed..f60983da 100644 --- a/src/libduc/buffer.c +++ b/src/libduc/buffer.c @@ -6,6 +6,7 @@ #include #include #include +#include #include "private.h" #include "buffer.h" @@ -23,6 +24,7 @@ struct buffer *buffer_new(void *data, size_t len) b = duc_malloc(sizeof(struct buffer)); b->ptr = 0; + pthread_rwlock_init(&b->data_lock, NULL); if(data) { b->max = len; @@ -40,6 +42,7 @@ struct buffer *buffer_new(void *data, size_t len) void buffer_free(struct buffer *b) { + pthread_rwlock_destroy(&b->data_lock); duc_free(b->data); duc_free(b); } @@ -162,20 +165,29 @@ static void buffer_get_size(struct buffer *b, struct duc_size *size) void buffer_put_dir(struct buffer *b, const struct duc_devino *devino, time_t mtime) { + /* Putting a directory into a buffer should be an atomic operation */ + pthread_rwlock_wrlock(&b->data_lock); + buffer_put_devino(b, devino); buffer_put_varint(b, mtime); + + pthread_rwlock_unlock(&b->data_lock); } void buffer_get_dir(struct buffer *b, struct duc_devino *devino, time_t *mtime) { + pthread_rwlock_rdlock(&b->data_lock); uint64_t v; buffer_get_devino(b, devino); buffer_get_varint(b, &v); *mtime = v; + pthread_rwlock_unlock(&b->data_lock); } void buffer_put_dirent(struct buffer *b, const struct duc_dirent *ent) { + /* Putting a directory entry into a buffer should be an atomic operation */ + pthread_rwlock_wrlock(&b->data_lock); buffer_put_string(b, ent->name); buffer_put_size(b, &ent->size); buffer_put_varint(b, ent->type); @@ -183,10 +195,12 @@ void buffer_put_dirent(struct buffer *b, const struct duc_dirent *ent) if(ent->type == DUC_FILE_TYPE_DIR) { buffer_put_devino(b, &ent->devino); } + pthread_rwlock_unlock(&b->data_lock); } void buffer_get_dirent(struct buffer *b, struct duc_dirent *ent) { + pthread_rwlock_rdlock(&b->data_lock); uint64_t v; buffer_get_string(b, &ent->name); @@ -196,11 +210,14 @@ void buffer_get_dirent(struct buffer *b, struct duc_dirent *ent) if(ent->type == DUC_FILE_TYPE_DIR) { buffer_get_devino(b, &ent->devino); } + pthread_rwlock_unlock(&b->data_lock); } void buffer_put_index_report(struct buffer *b, const struct duc_index_report *report) { + /* Putting the index report into the buffer should be an atomic operation */ + pthread_rwlock_wrlock(&b->data_lock); buffer_put_string(b, report->path); buffer_put_devino(b, &report->devino); buffer_put_varint(b, report->time_start.tv_sec); @@ -210,11 +227,13 @@ void buffer_put_index_report(struct buffer *b, const struct duc_index_report *re buffer_put_varint(b, report->file_count); buffer_put_varint(b, report->dir_count); buffer_put_size(b, &report->size); + pthread_rwlock_unlock(&b->data_lock); } void buffer_get_index_report(struct buffer *b, struct duc_index_report *report) { + pthread_rwlock_rdlock(&b->data_lock); char *vs = NULL; buffer_get_string(b, &vs); if(vs == NULL) return; @@ -230,6 +249,7 @@ void buffer_get_index_report(struct buffer *b, struct duc_index_report *report) buffer_get_varint(b, &vi); report->file_count = vi; buffer_get_varint(b, &vi); report->dir_count = vi; buffer_get_size(b, &report->size); + pthread_rwlock_unlock(&b->data_lock); } diff --git a/src/libduc/buffer.h b/src/libduc/buffer.h index 754445d0..50c0ae19 100644 --- a/src/libduc/buffer.h +++ b/src/libduc/buffer.h @@ -3,9 +3,10 @@ struct buffer { uint8_t *data; - size_t max; - size_t len; - size_t ptr; + pthread_rwlock_t data_lock; + _Atomic size_t max; + _Atomic size_t len; + _Atomic size_t ptr; }; struct buffer *buffer_new(void *data, size_t len); diff --git a/src/libduc/db.c b/src/libduc/db.c index 53e67114..38f786e1 100644 --- a/src/libduc/db.c +++ b/src/libduc/db.c @@ -34,9 +34,8 @@ duc_errno db_write_report(duc *duc, const struct duc_index_report *report) } else { db_put(duc->db, "duc_index_reports", 17, report->path, sizeof(report->path)); } - } else { - free(tmp); } + free(tmp); struct buffer *b = buffer_new(NULL, 0); diff --git a/src/libduc/duc.h b/src/libduc/duc.h index d683c528..3999edb4 100644 --- a/src/libduc/duc.h +++ b/src/libduc/duc.h @@ -90,18 +90,18 @@ struct duc_devino { }; struct duc_size { - off_t actual; - off_t apparent; - off_t count; + _Atomic off_t actual; + _Atomic off_t apparent; + _Atomic off_t count; }; struct duc_index_report { - char path[DUC_PATH_MAX]; /* Indexed path */ + char path[DUC_PATH_MAX]; /* Indexed path */ struct duc_devino devino; /* Index top device id and inode number */ struct timeval time_start; /* Index start time */ struct timeval time_stop; /* Index finished time */ - size_t file_count; /* Total number of files indexed */ - size_t dir_count; /* Total number of directories indexed */ + _Atomic size_t file_count; /* Total number of files indexed */ + _Atomic size_t dir_count; /* Total number of directories indexed */ struct duc_size size; /* Total size */ }; @@ -150,6 +150,8 @@ int duc_index_req_add_fstype_include(duc_index_req *req, const char *types); int duc_index_req_add_fstype_exclude(duc_index_req *req, const char *types); int duc_index_req_set_maxdepth(duc_index_req *req, int maxdepth); int duc_index_req_set_progress_cb(duc_index_req *req, duc_index_progress_cb fn, void *ptr); +int duc_index_set_cutoff_depth(unsigned int cutoff_depth); +int duc_index_set_worker_count(unsigned int worker_count); struct duc_index_report *duc_index(duc_index_req *req, const char *path, duc_index_flags flags); int duc_index_req_free(duc_index_req *req); int duc_index_report_free(struct duc_index_report *rep); diff --git a/src/libduc/index.c b/src/libduc/index.c index 74bede71..454a72b8 100644 --- a/src/libduc/index.c +++ b/src/libduc/index.c @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -17,6 +18,7 @@ #include #include #include +#include #ifdef HAVE_FNMATCH_H #include #endif @@ -28,6 +30,26 @@ #include "utlist.h" #include "buffer.h" +struct stack_node { + size_t length; + struct stack_node *next; + struct scanner* scanner; +}; + +unsigned int WORKER_COUNT = 1; +unsigned int CUTOFF_DEPTH = 2; +pthread_t *worker_threads; +struct stack_node **worker_stacks; +pthread_rwlock_t *worker_rw_locks; +pthread_mutex_t database_write_lock; + +/* + * Maintains a count of the number of workers that are polling other workers for work, checking + * if they need to terminate. Once this count reaches , it is a signal for all + * the workers to terminate. + */ +_Atomic unsigned int num_workers_terminating; + struct hard_link { struct duc_devino devino; UT_hash_handle hh; @@ -49,12 +71,12 @@ struct duc_index_req { struct exclude *exclude_list; duc_dev_t dev; duc_index_flags flags; - int maxdepth; - uid_t uid; - const char *username; + _Atomic int maxdepth; + uid_t uid; + const char *username; duc_index_progress_cb progress_fn; void *progress_fndata; - int progress_n; + _Atomic int progress_n; struct timeval progress_interval; struct timeval progress_time; struct hard_link *hard_link_map; @@ -65,17 +87,55 @@ struct duc_index_req { struct scanner { struct scanner *parent; - int depth; - DIR *d; + _Atomic int depth; struct buffer *buffer; struct duc *duc; struct duc_index_req *req; struct duc_index_report *rep; struct duc_dirent ent; + const char *absolute_path; /* Absolute path of the dirent this scanner refers to */ + _Atomic unsigned int completed_children; /* Number of children of this node that completed */ + _Atomic unsigned int num_children; /* Number of children that this node has */ + _Atomic bool done_processing; /* Indicates whether the node finished processing */ }; -static void scanner_free(struct scanner *scanner); +void stack_push(unsigned int worker_num, struct scanner *scanner) +{ + struct stack_node *old_head = worker_stacks[worker_num]; + struct stack_node *new_head = duc_malloc(sizeof(struct stack_node)); + new_head->next = old_head; + new_head->scanner = scanner; + new_head->length = old_head == NULL ? 1 : old_head->length + 1; + worker_stacks[worker_num] = new_head; +} + + +struct scanner *stack_pop(unsigned int worker_num) +{ + struct stack_node *old_head = worker_stacks[worker_num]; + if (!old_head) { + return NULL; + } + + struct scanner *scanner = old_head->scanner; + worker_stacks[worker_num] = old_head->next; + + duc_free(old_head); + return scanner; +} + + +size_t stack_len(struct stack_node *head) +{ + return head == NULL ? 0 : head->length; +} + + +bool stack_empty(struct stack_node *head) +{ + return !head; +} duc_index_req *duc_index_req_new(duc *duc) @@ -147,7 +207,7 @@ static struct fstype *add_fstype(duc_index_req *req, const char *types, struct f char *type = strtok(types_copy, ","); while(type) { struct fstype *fstype; - fstype = duc_malloc(sizeof *fstype); + fstype = duc_malloc(sizeof(*fstype)); fstype->type = duc_strdup(type); HASH_ADD_KEYPTR(hh, list, fstype->type, strlen(fstype->type), fstype); type = strtok(NULL, ","); @@ -177,8 +237,28 @@ int duc_index_req_set_maxdepth(duc_index_req *req, int maxdepth) return 0; } -/* We set both uid and username, since we cannot use -1 UID to check wether we're - limiting the search to just a specific UID, but we use UID for quicker compares. */ + +int duc_index_set_worker_count(unsigned int worker_count) +{ + WORKER_COUNT = worker_count; + worker_threads = duc_malloc(sizeof(pthread_t) * WORKER_COUNT); + worker_stacks = duc_malloc(sizeof(struct stack_node *) * WORKER_COUNT); + worker_rw_locks = duc_malloc(sizeof(pthread_rwlock_t) * WORKER_COUNT); + return 0; +} + + +int duc_index_set_cutoff_depth(unsigned int cutoff_depth) +{ + CUTOFF_DEPTH = cutoff_depth; + return 0; +} + + +/* + * We set both uid and username, since we cannot use -1 UID to check wether we're + * limiting the search to just a specific UID, but we use UID for quicker compares. + */ int duc_index_req_set_username(duc_index_req *req, const char *username ) { @@ -252,7 +332,7 @@ static void st_to_size(struct stat *st, struct duc_size *s) /* - * Conver struct stat to duc_devino. Windows does not support inodes + * Convert struct stat to duc_devino. Windows does not support inodes * and will always put 0 in st_ino. We fake inodes here by simply using * an incrementing counter. This *will* cause problems when re-indexing * existing databases. If anyone knows a better method to simulate @@ -283,7 +363,7 @@ static int is_duplicate(struct duc_index_req *req, struct duc_devino *devino) HASH_FIND(hh, req->hard_link_map, devino, sizeof(*devino), h); if(h) return 1; - h = duc_malloc(sizeof *h); + h = duc_malloc(sizeof(*h)); h->devino = *devino; HASH_ADD(hh, req->hard_link_map, devino, sizeof(h->devino), h); return 0; @@ -306,6 +386,34 @@ static void report_skip(struct duc *duc, const char *name, const char *fmt, ...) } +/* + * Given a parent scanner and a child's name, this function returns the + * absolute path to the child. + */ + +char *get_absolute_path(struct scanner *scanner_parent, char *child_name) +{ + char *absolute_path; + size_t path_suffix_size = strlen(child_name); + + if (!scanner_parent) { + absolute_path = duc_malloc0(sizeof(char) * (path_suffix_size + 1)); + strcpy(absolute_path, child_name); + return absolute_path; + } + + /* + * The absolute path referenced by a child scanner should be the absolute + * path referenced by the parent scanner + '/' + the name of the file + */ + + size_t parent_absolute_path_size = strlen(scanner_parent->absolute_path); + absolute_path = duc_malloc0(sizeof(char) * (parent_absolute_path_size + path_suffix_size + 2)); + sprintf(absolute_path, "%s/%s", scanner_parent->absolute_path, child_name); + return absolute_path; +} + + /* * Check if this file system type should be scanned, depending on the * fstypes_include and fstypes_exclude lists. If neither has any entries, all @@ -321,7 +429,6 @@ static int is_fstype_allowed(struct duc_index_req *req, const char *name) } /* Find file system type */ - char path_full[DUC_PATH_MAX]; char *res = realpath(name, path_full); if (res == NULL) { @@ -364,14 +471,19 @@ static int is_fstype_allowed(struct duc_index_req *req, const char *name) * Open dir and read file status */ -static struct scanner *scanner_new(struct duc *duc, struct scanner *scanner_parent, const char *path, struct stat *st) +static struct scanner *scanner_new(struct duc *duc, struct scanner *scanner_parent, const char *path, const char *dirent_name, struct stat *st) { struct scanner *scanner; - scanner = duc_malloc0(sizeof *scanner); + scanner = duc_malloc0(sizeof(*scanner)); struct stat st2; struct duc_devino devino_parent = { 0, 0 }; + scanner->absolute_path = path; + scanner->done_processing = false; + scanner->num_children = 0; + scanner->completed_children = 0; + if(scanner_parent) { scanner->depth = scanner_parent->depth + 1; scanner->duc = scanner_parent->duc; @@ -379,30 +491,26 @@ static struct scanner *scanner_new(struct duc *duc, struct scanner *scanner_pare scanner->rep = scanner_parent->rep; devino_parent = scanner_parent->ent.devino; } else { - int r = lstat(path, &st2); + int r = lstat(scanner->absolute_path, &st2); if(r == -1) { duc_log(duc, DUC_LOG_WRN, "Error statting %s: %s", path, strerror(errno)); + if(errno == ENOENT) duc->err = DUC_E_PATH_NOT_FOUND; goto err; } st = &st2; } - scanner->d = opendir(path); - if(scanner->d == NULL) { - report_skip(duc, path, strerror(errno)); - goto err; - } - + scanner->duc = duc; scanner->parent = scanner_parent; scanner->buffer = buffer_new(NULL, 32768); - scanner->ent.name = duc_strdup(path); + scanner->ent.name = duc_strdup(dirent_name); scanner->ent.type = DUC_FILE_TYPE_DIR, st_to_devino(st, &scanner->ent.devino); st_to_size(st, &scanner->ent.size); scanner->ent.size.apparent = 0; - + buffer_put_dir(scanner->buffer, &devino_parent, st->st_mtime); duc_log(duc, DUC_LOG_DMP, ">> %s", scanner->ent.name); @@ -410,226 +518,436 @@ static struct scanner *scanner_new(struct duc *duc, struct scanner *scanner_pare return scanner; err: - if(scanner->d) closedir(scanner->d); if(scanner) free(scanner); return NULL; } -static void scanner_scan(struct scanner *scanner_dir) +static void scanner_free(struct scanner *scanner) { - int r; - struct duc *duc = scanner_dir->duc; - struct duc_index_req *req = scanner_dir->req; - struct duc_index_report *report = scanner_dir->rep; + struct duc *duc = scanner->duc; + struct duc_index_req *req = scanner->req; + struct duc_index_report *report = scanner->rep; - report->dir_count ++; - duc_size_accum(&report->size, &scanner_dir->ent.size); + duc_log(duc, DUC_LOG_DMP, "<< %s actual:%jd apparent:%jd", + scanner->ent.name, scanner->ent.size.apparent, scanner->ent.size.actual); - r = chdir(scanner_dir->ent.name); - if(r != 0) { - report_skip(duc, scanner_dir->ent.name, strerror(errno)); - return; + if(scanner->parent) { + duc_size_accum(&scanner->parent->ent.size, &scanner->ent.size); + + if((req->maxdepth == 0) || (scanner->depth < req->maxdepth)) { + buffer_put_dirent(scanner->parent->buffer, &scanner->ent); + } } - /* Iterate directory entries */ + /* Progress reporting */ + if(req->progress_fn) { + if((!scanner->parent) || (req->progress_n++ == 100)) { + + struct timeval t_now; + gettimeofday(&t_now, NULL); - struct dirent *e; - while( (e = readdir(scanner_dir->d)) != NULL) { + if(!scanner->parent || timercmp(&t_now, &req->progress_time, > )) { + req->progress_fn(report, req->progress_fndata); + timeradd(&t_now, &req->progress_interval, &req->progress_time); + } + req->progress_n = 0; + } + } - /* Skip . and .. */ + if(!(req->flags & DUC_INDEX_DRY_RUN)) { + char key[32]; + struct duc_devino *devino = &scanner->ent.devino; + size_t keyl = snprintf(key, sizeof(key), "%jx/%jx", (uintmax_t)devino->dev, (uintmax_t)devino->ino); + pthread_mutex_lock(&database_write_lock); + int r = db_put(duc->db, key, keyl, scanner->buffer->data, scanner->buffer->len); + pthread_mutex_unlock(&database_write_lock); + if(r != 0) duc->err = r; + } + + duc_free((void *)scanner->absolute_path); - char *name = e->d_name; + buffer_free(scanner->buffer); + duc_free(scanner->ent.name); + duc_free(scanner); +} - if(name[0] == '.') { - if(name[1] == '\0') continue; - if((name[1] == '.') && (name[2] == '\0')) continue; + +static void read_mounts(duc_index_req *req) +{ + FILE *f; + + f = fopen("/proc/mounts", "r"); + + if(f == NULL) { + f = fopen("/etc/mtab", "r"); + } + + if(f == NULL) { + duc_log(req->duc, DUC_LOG_FTL, "Unable to get list of mounted file systems"); + return; + } + + char buf[DUC_PATH_MAX]; + + while(fgets(buf, sizeof(buf)-1, f) != NULL) { + (void)strtok(buf, " "); + char *path = strtok(NULL, " "); + char *type = strtok(NULL, " "); + if(path && type) { + struct fstype *fstype; + fstype = duc_malloc(sizeof(*fstype)); + fstype->type = duc_strdup(type); + fstype->path = duc_strdup(path); + HASH_ADD_KEYPTR(hh, req->fstypes_mounted, fstype->path, strlen(fstype->path), fstype); } + } + fclose(f); +} + + +void write_lock_worker_stack(unsigned int worker_num) +{ + assert(worker_num < WORKER_COUNT); + + pthread_rwlock_wrlock(&worker_rw_locks[worker_num]); +} + - if(match_exclude(name, req->exclude_list)) { - report_skip(duc, name, "Excluded by user"); +void read_lock_worker_stack(unsigned int worker_num) +{ + assert(worker_num < WORKER_COUNT); + + pthread_rwlock_rdlock(&worker_rw_locks[worker_num]); +} + + +void unlock_worker_stack(unsigned int worker_num) +{ + assert(worker_num < WORKER_COUNT); + + pthread_rwlock_unlock(&worker_rw_locks[worker_num]); +} + + +void free_nodes(struct scanner *scanner) +{ + /* + * LOGIC: + * This bottom-up algorithm to free nodes. The worker starts freeing at node and walks + * up the tree, ensuring that topological order is maintained. The algorithm is as follows: + + * LOOP: The worker frees this node and keeps track of its parent. + * The count of the node's parent is incremented (since the child was freed). + * If the freed node's parent is not done processing: + * The parent obviously cannot be freed because it hasn't finished processing. And so + * the loop is broken. + * Otherwise: + * The freeing of the parent was left as a duty to the LAST CHILD to finish. + * So the worker checks if this node is the last child. If it is, LOOP is repeated with + * the parent being the new node. Otherwise, the loop terminates. + */ + + struct scanner* s = scanner->parent; + scanner_free(scanner); + while(s) { + s->completed_children++; + + /* + * If the parent hasn't finished processing, then this child cannot free it. + * So the child can ignore its parent in this scenario. + */ + if(! s->done_processing) break; + s->done_processing = true; + + /* + * If the parent is done processing but this child is not the last child to process, + * then the child cannot free the parent (top sort demands ALL children finish before parent). + */ + if (s->completed_children != s->num_children) break; + + /* Free the parent */ + struct scanner* new_parent = s->parent; + scanner_free(s); + + s = new_parent; + } +} + + +/* + * Adds all relevant children that need to be processed onto 's DFS + * stack, and returns whether is a leaf node i.e whether it has any children. + */ + +bool process_node(unsigned int worker_num, struct scanner *scanner) +{ + DIR *dr = opendir(scanner->absolute_path); + if (dr == NULL) return true; + + struct dirent *de; + bool is_leaf = true; + while( (de = readdir(dr)) != NULL ) { + char *dirent_name = de->d_name; + struct duc *duc = scanner->duc; + struct duc_index_req *req = scanner->req; + struct duc_index_report *report = scanner->rep; + + if(strcmp(dirent_name, ".") == 0 || strcmp(dirent_name, "..") == 0) { + continue; + } + + if(match_exclude(dirent_name, req->exclude_list)) { + report_skip(duc, dirent_name, "Excluded by user"); continue; } - /* Get file info. Derive the file type from st.st_mode. It + /* + * Get file info. Derive the file type from st.st_mode. It * seems that we cannot trust e->d_type because it is not * guaranteed to contain a sane value on all file system types. - * See the readdir() man page for more details */ - + * See the readdir() man page for more details + */ + char *absolute_path = get_absolute_path(scanner, dirent_name); struct stat st_ent; - int r = lstat(name, &st_ent); + int r = lstat(absolute_path, &st_ent); if(r == -1) { - duc_log(duc, DUC_LOG_WRN, "Error statting %s: %s", name, strerror(errno)); + duc_log(duc, DUC_LOG_WRN, "Error statting %s: %s", absolute_path, strerror(errno)); + duc_free(absolute_path); continue; } - /* If this dirent lies on a different device, check the file system type of the new - * device and skip if it is not on the list of approved types */ - - if(st_ent.st_dev != scanner_dir->ent.devino.dev) { - if(!is_fstype_allowed(req, name)) { + /* + * If this dirent lies on a different device, check the file system type of the new + * device and skip if it is not on the list of approved types + */ + if(st_ent.st_dev != scanner->ent.devino.dev) { + if(!is_fstype_allowed(req, dirent_name)) { + duc_free(absolute_path); continue; } } /* Are we looking for data for only a specific user? */ if(req->username) { - if(st_ent.st_uid != req->uid) { - continue; - } + if(st_ent.st_uid != req->uid) { + duc_free(absolute_path); + continue; + } } /* Create duc_dirent from readdir() and fstatat() results */ - struct duc_dirent ent; - ent.name = name; + ent.name = dirent_name; ent.type = st_to_type(st_ent.st_mode); st_to_devino(&st_ent, &ent.devino); st_to_size(&st_ent, &ent.size); /* Skip hard link duplicates for any files with more then one hard link */ - if((ent.type != DUC_FILE_TYPE_DIR) && (req->flags & DUC_INDEX_CHECK_HARD_LINKS) && - (st_ent.st_nlink > 1) && is_duplicate(req, &ent.devino)) { + (st_ent.st_nlink > 1) && is_duplicate(req, &ent.devino)) { + duc_free(absolute_path); continue; } /* Check if we can cross file system boundaries */ - if((ent.type == DUC_FILE_TYPE_DIR) && (req->flags & DUC_INDEX_XDEV) && - (st_ent.st_dev != req->dev)) { - report_skip(duc, name, "Not crossing file system boundaries"); + (st_ent.st_dev != req->dev)) { + report_skip(duc, dirent_name, "Not crossing file system boundaries"); + duc_free(absolute_path); continue; } - - - /* Calculate size of this dirent */ + if(ent.type == DUC_FILE_TYPE_DIR) { /* Open and scan child directory */ + struct scanner *child_scanner = scanner_new(duc, scanner, absolute_path, dirent_name, &st_ent); - struct scanner *scanner_ent = scanner_new(duc, scanner_dir, name, &st_ent); - if(scanner_ent == NULL) + if(child_scanner == NULL) { + duc_free(absolute_path); continue; + } - scanner_scan(scanner_ent); - scanner_free(scanner_ent); + /* Obtain the write lock and push the child onto the stack */ + write_lock_worker_stack(worker_num); + stack_push(worker_num, child_scanner); + unlock_worker_stack(worker_num); + is_leaf = false; + ++scanner->num_children; + ++report->dir_count; } else { - - duc_size_accum(&scanner_dir->ent.size, &ent.size); + duc_size_accum(&scanner->ent.size, &ent.size); duc_size_accum(&report->size, &ent.size); - report->file_count ++; + ++report->file_count; duc_log(duc, DUC_LOG_DMP, " %c %jd %jd %s", - duc_file_type_char(ent.type), ent.size.apparent, ent.size.actual, name); + duc_file_type_char(ent.type), ent.size.apparent, ent.size.actual, dirent_name); + duc_free(absolute_path); /* Optionally hide file names */ - if(req->flags & DUC_INDEX_HIDE_FILE_NAMES) ent.name = ""; /* Store record */ - - if((req->maxdepth == 0) || (scanner_dir->depth < req->maxdepth)) { - buffer_put_dirent(scanner_dir->buffer, &ent); + if((req->maxdepth == 0) || (scanner->depth < req->maxdepth)) { + buffer_put_dirent(scanner->buffer, &ent); } } } + closedir(dr); + return is_leaf; +} + + +void dfs_worker(unsigned int worker_num) +{ + /* + * LOGIC: + * Our goal is to ensure a concurrent topological sort, so that parent nodes can aggregate stats of their children. + * Let us define a node "processing" as the process whereby a worker puts the node's relevant children onto its DFS stack. + * Each node maintains a count of the number of children it possesses , a boolean to indicate whether it is + * finished processing , as well as a count of its children which finished processing . + * + * When a node is being processed by a worker: + * Each child it possesses is placed onto the DFS stack and is incremented. + * After processing: + * The worker checks if either of two conditions are true for the node: + * - If the node is a leaf, then it has no dependencies in the topological sort, and so it can be freed. + * - If the node's count equals the , then this means that the children + * all finished processing (possible due to concurrency of the algorithm). Then topological order is guaranteed, + * and so the node can be freed. + * Otherwise: + * - The node cannot yet be freed, otherwise topological order will be violated. Thus, the worker marks the node + * as , and leaves the freeing of the node as a duty to the LAST CHILD of the node which finishes + * processing. + */ + + while(true) { + /* Pop a new node off the DFS stack */ + write_lock_worker_stack(worker_num); + if (stack_empty(worker_stacks[worker_num])) { + unlock_worker_stack(worker_num); + break; + } + struct scanner *scanner = stack_pop(worker_num); + unlock_worker_stack(worker_num); + + /* Process the node */ + bool is_leaf = process_node(worker_num, scanner); - r = chdir(".."); - if(r != 0) { - report_skip(duc, scanner_dir->ent.name, strerror(errno)); + /* Run the free algorithm on the node */ + if (is_leaf || + !scanner->done_processing && + scanner->completed_children == scanner->num_children) { + free_nodes(scanner); + } else { + scanner->done_processing = true; + } return; } - } -static void scanner_free(struct scanner *scanner) -{ - struct duc *duc = scanner->duc; - struct duc_index_req *req = scanner->req; - struct duc_index_report *report = scanner->rep; - - duc_log(duc, DUC_LOG_DMP, "<< %s actual:%jd apparent:%jd", - scanner->ent.name, scanner->ent.size.apparent, scanner->ent.size.actual); - - if(scanner->parent) { - duc_size_accum(&scanner->parent->ent.size, &scanner->ent.size); +/* + * This function polls all workers for work to add to 's stack. + * If there is work available on another worker's stack and the stack size exceeds + * the cutoff depth, then work is taken from the other worker. + * + * Returns false if the worker should fully terminate, i.e. if all other + * workers also have no work left, and are looking for work. + */ - if((req->maxdepth == 0) || (scanner->depth < req->maxdepth)) { - buffer_put_dirent(scanner->parent->buffer, &scanner->ent); +bool get_work_for(unsigned int worker_num) +{ + unsigned int target = 0; + ++num_workers_terminating; + while (num_workers_terminating != WORKER_COUNT) { + /* 'Target' is the target processor that we are taking tasks from */ + target = (target + 1) % WORKER_COUNT; + if (target == worker_num) continue; + write_lock_worker_stack(target); + + /* If this target processor has too many tasks then take one of its tasks */ + if (stack_len(worker_stacks[target]) > CUTOFF_DEPTH) { + --num_workers_terminating; + struct scanner* target_task = stack_pop(target); + unlock_worker_stack(target); + write_lock_worker_stack(worker_num); + stack_push(worker_num, target_task); + unlock_worker_stack(worker_num); + return true; } + unlock_worker_stack(target); } - - /* Progress reporting */ - if(req->progress_fn) { + return false; +} - if((!scanner->parent) || (req->progress_n++ == 100)) { - - struct timeval t_now; - gettimeofday(&t_now, NULL); - if(!scanner->parent || timercmp(&t_now, &req->progress_time, > )) { - req->progress_fn(report, req->progress_fndata); - timeradd(&t_now, &req->progress_interval, &req->progress_time); - } - req->progress_n = 0; +void *worker_main(void *worker_num_v) +{ + unsigned int worker_num = (uintptr_t) worker_num_v; + + while (true) { + read_lock_worker_stack(worker_num); + + if (stack_empty(worker_stacks[worker_num])) { + unlock_worker_stack(worker_num); + + /* + * If there is no work for this worker, then poll other workers for work. + * If the function returns false, it is an indicator to terminate. + */ + if (!get_work_for(worker_num)) break; + } else { + unlock_worker_stack(worker_num); } - } - - if(!(req->flags & DUC_INDEX_DRY_RUN)) { - char key[32]; - struct duc_devino *devino = &scanner->ent.devino; - size_t keyl = snprintf(key, sizeof(key), "%jx/%jx", (uintmax_t)devino->dev, (uintmax_t)devino->ino); - int r = db_put(duc->db, key, keyl, scanner->buffer->data, scanner->buffer->len); - if(r != 0) duc->err = r; + dfs_worker(worker_num); } - buffer_free(scanner->buffer); - closedir(scanner->d); - duc_free(scanner->ent.name); - duc_free(scanner); + return NULL; } -static void read_mounts(duc_index_req *req) +void initialize_workers(void) { - FILE *f; + for (unsigned int i = 0; i < WORKER_COUNT; i++) { + /* Initialize the worker's stack */ + worker_stacks[i] = NULL; - f = fopen("/proc/mounts", "r"); - - if(f == NULL) { - f = fopen("/etc/mtab", "r"); + /* Initialize the mutex for the worker's stack */ + pthread_rwlock_init(&worker_rw_locks[i], NULL); } +} - if(f == NULL) { - duc_log(req->duc, DUC_LOG_FTL, "Unable to get list of mounted file systems"); + +void create_worker_pool(struct scanner *scanner) +{ + for (uintptr_t worker_num = 0; worker_num < WORKER_COUNT; worker_num++) { + assert( + pthread_create( + &worker_threads[worker_num], + NULL, + &worker_main, + (void *) worker_num + ) + == 0); } +} - char buf[DUC_PATH_MAX]; - while(fgets(buf, sizeof(buf)-1, f) != NULL) { - (void)strtok(buf, " "); - char *path = strtok(NULL, " "); - char *type = strtok(NULL, " "); - if(path && type) { - struct fstype *fstype; - fstype = duc_malloc(sizeof *fstype); - fstype->type = duc_strdup(type); - fstype->path = duc_strdup(path); - HASH_ADD_KEYPTR(hh, req->fstypes_mounted, fstype->path, strlen(fstype->path), fstype); - } +void join_workers(void) +{ + for (unsigned int i = 0; i < WORKER_COUNT; i++) { + pthread_join(worker_threads[i], NULL); } - fclose(f); } @@ -640,7 +958,6 @@ struct duc_index_report *duc_index(duc_index_req *req, const char *path, duc_ind req->flags = flags; /* Canonicalize index path */ - char *path_canon = duc_canonicalize_path(path); if(path_canon == NULL) { duc_log(duc, DUC_LOG_WRN, "Error converting path %s: %s", path, strerror(errno)); @@ -651,47 +968,47 @@ struct duc_index_report *duc_index(duc_index_req *req, const char *path, duc_ind } /* Create report */ - struct duc_index_report *report = duc_malloc0(sizeof(struct duc_index_report)); gettimeofday(&report->time_start, NULL); snprintf(report->path, sizeof(report->path), "%s", path_canon); /* Read mounted file systems to find fs types */ - if(req->fstypes_include || req->fstypes_exclude) { read_mounts(req); } - /* Recursively index subdirectories */ - - struct scanner *scanner = scanner_new(duc, NULL, path_canon, NULL); - - if(scanner) { - scanner->req = req; - scanner->rep = report; + /* Index subdirectories using a worker pool with a parallel topological sort */ + struct scanner *scanner = scanner_new(duc, NULL, path_canon, path_canon, NULL); + if(! scanner) return NULL; - req->dev = scanner->ent.devino.dev; - report->devino = scanner->ent.devino; - - scanner_scan(scanner); - gettimeofday(&report->time_stop, NULL); - scanner_free(scanner); - } + scanner->req = req; + scanner->rep = report; + req->dev = scanner->ent.devino.dev; + report->devino = scanner->ent.devino; + + /* Initialize, create, and join the worker pool */ + pthread_mutex_init(&database_write_lock, NULL); + initialize_workers(); + stack_push(0, scanner); /* Push first task onto worker 0's stack */ + create_worker_pool(scanner); + join_workers(); - /* Store report */ + /* Free resources */ + pthread_mutex_destroy(&database_write_lock); + duc_free(worker_threads); + duc_free(worker_stacks); + duc_free(worker_rw_locks); + /* Store report */ if(!(req->flags & DUC_INDEX_DRY_RUN)) { gettimeofday(&report->time_stop, NULL); db_write_report(duc, report); } - free(path_canon); - return report; } - int duc_index_report_free(struct duc_index_report *rep) { free(rep); @@ -702,4 +1019,3 @@ int duc_index_report_free(struct duc_index_report *rep) /* * End */ - From 3c39f5c539ff0db2343e7ef4e6d3bae483aa3464 Mon Sep 17 00:00:00 2001 From: muhammad Date: Fri, 13 Aug 2021 15:17:18 -0400 Subject: [PATCH 2/2] Create the fix to two conditions that could create possible race conditions --- src/libduc/index.c | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/libduc/index.c b/src/libduc/index.c index 454a72b8..fad77141 100644 --- a/src/libduc/index.c +++ b/src/libduc/index.c @@ -97,6 +97,7 @@ struct scanner { _Atomic unsigned int completed_children; /* Number of children of this node that completed */ _Atomic unsigned int num_children; /* Number of children that this node has */ _Atomic bool done_processing; /* Indicates whether the node finished processing */ + pthread_mutex_t free_check_lock; /* Enforces that checks relating to freeing are atomic */ }; @@ -483,6 +484,7 @@ static struct scanner *scanner_new(struct duc *duc, struct scanner *scanner_pare scanner->done_processing = false; scanner->num_children = 0; scanner->completed_children = 0; + pthread_mutex_init(&scanner->free_check_lock, NULL); if(scanner_parent) { scanner->depth = scanner_parent->depth + 1; @@ -565,8 +567,9 @@ static void scanner_free(struct scanner *scanner) if(r != 0) duc->err = r; } + pthread_mutex_destroy(&scanner->free_check_lock); + duc_free((void *)scanner->absolute_path); - buffer_free(scanner->buffer); duc_free(scanner->ent.name); duc_free(scanner); @@ -651,20 +654,29 @@ void free_nodes(struct scanner *scanner) struct scanner* s = scanner->parent; scanner_free(scanner); while(s) { + pthread_mutex_lock(&s->free_check_lock); s->completed_children++; /* * If the parent hasn't finished processing, then this child cannot free it. * So the child can ignore its parent in this scenario. */ - if(! s->done_processing) break; + if(! s->done_processing) { + pthread_mutex_unlock(&s->free_check_lock); + break; + } s->done_processing = true; /* * If the parent is done processing but this child is not the last child to process, * then the child cannot free the parent (top sort demands ALL children finish before parent). */ - if (s->completed_children != s->num_children) break; + if (s->completed_children != s->num_children) { + pthread_mutex_unlock(&s->free_check_lock); + break; + } + + pthread_mutex_unlock(&s->free_check_lock); /* Free the parent */ struct scanner* new_parent = s->parent; @@ -841,12 +853,15 @@ void dfs_worker(unsigned int worker_num) bool is_leaf = process_node(worker_num, scanner); /* Run the free algorithm on the node */ + pthread_mutex_lock(&scanner->free_check_lock); if (is_leaf || !scanner->done_processing && scanner->completed_children == scanner->num_children) { + pthread_mutex_unlock(&scanner->free_check_lock); free_nodes(scanner); } else { scanner->done_processing = true; + pthread_mutex_unlock(&scanner->free_check_lock); } return; }