diff --git a/storage/file/BUILD b/storage/file/BUILD index 113526b..c80d744 100644 --- a/storage/file/BUILD +++ b/storage/file/BUILD @@ -13,6 +13,43 @@ cc_library( link_all_symbols=True ) +cc_library( + name = 'uri_utils', + srcs = ['uri_utils.cpp'], + deps = [ + '//toft/base/string:string', + ] +) + +cc_library( + name = 'hdfs_fs', + srcs = 'hdfs_file.cpp', + deps = [ + ':file', + ':uri_utils', + '//thirdparty/dfs-adapter:dfs' + ], + extra_cppflags = '-std=c++11', + link_all_symbols=True +) + +cc_library( + name = 'afs_fs', + srcs = 'afs_file.cpp', + deps = [ + ':file', + ':uri_utils', + '//thirdparty/dfs-adapter:dfs' + ], + extra_cppflags = '-std=c++11', + link_all_symbols=True +) + +cc_test( + name = 'uri_utils_test', + srcs = 'uri_utils_test.cpp', + deps = ':uri_utils', +) cc_test( name = 'file_test', srcs = 'file_test.cpp', diff --git a/storage/file/afs_file.cpp b/storage/file/afs_file.cpp new file mode 100644 index 0000000..60165b0 --- /dev/null +++ b/storage/file/afs_file.cpp @@ -0,0 +1,467 @@ +// Copyright (C) 2017, For authors. +// Author: An Qin (anqin.qin@gmail.com) +// +// Description: + +#include "toft/storage/file/afs_file.h" + +#include +#include +#include +#include + +#include "thirdparty/glog/logging.h" +#include "thirdparty/gflags/gflags.h" +#include "toft/base/string/algorithm.h" +#include "toft/base/unique_ptr.h" +#include "toft/storage/file/uri_utils.h" +#include "toft/storage/path/path.h" +#include "toft/text/wildcard.h" + +DEFINE_string(afs_log_level, "2", ""); + +namespace toft { + +// change mode string to flags. Used in opening a afs file. +static bool ModeToFlags(const char* mode, int* flags) { + // only read and write are supported + if (!mode) { + return false; + } + if (strcmp(mode, "r") == 0) { + *flags = O_RDONLY; + return true; + } + if (strcmp(mode, "w") == 0) { + *flags = O_WRONLY; + return true; + } + return false; +} + + +namespace { + +static inline dfs::FileType GetFileTypeFromInode(uint64_t id) { + return static_cast((id >> 49) & 0x0000000000000006); +} + +class AFSFileIterator : public FileIterator { +public: + explicit AFSFileIterator(AfsFS fs, const std::string& dir, + const std::string& pattern, + int include_types, int exclude_types) + : m_dir(dir), m_pattern(pattern), + m_include_types(include_types), m_exclude_types(exclude_types) { + fs->Readdir(dir.c_str(), &m_infos); + m_current_pos = 0; + } + ~AFSFileIterator() {} + + bool GetNext(FileEntry* entry) { + for (;; ++m_current_pos) { + if (static_cast(m_current_pos) >= m_infos.size() + || m_infos.size() == 0) { + return false; + } + std::string name = m_infos[m_current_pos].name; + if (strcmp(name.c_str(), ".") == 0 || strcmp(name.c_str(), "..") == 0) { + continue; + } + + int type = GetType(m_current_pos); + if ((type & m_include_types) == 0) + continue; + if ((type & m_exclude_types) != 0) + continue; + + if (!Wildcard::Match(m_pattern, name)) { + continue; + } + + entry->type = type; + entry->name = name; + m_current_pos++; + return true; + } + } + +private: + int GetType(int pos) const { + dfs::FileType ft = GetFileTypeFromInode(m_infos[pos].inode); + if (ft == dfs::FT_REGULAR) { + return FileType_Regular; + } else if (ft == dfs::FT_DIRECTORY) { + return FileType_Directory; + } + return FileType_None; + } + +private: + std::string m_dir; + std::string m_pattern; + int m_include_types; + int m_exclude_types; + + std::vector m_infos; + int m_current_pos; +}; + +} // namespace + + +//////////////////////////////////////////////////////////////// +// AFS FS Cache Impl +AFSFileSystem::AFSFSCache::~AFSFSCache() { + for (AFSFSMap::iterator i = fs_map_.begin(); i != fs_map_.end(); ++i) { + int32_t status = i->second->DisConnect(); + if (status != 0) { + // TODO: add error details + LOG(ERROR) << "afsDisconnect(\"" << i->first.first + << "\", " << i->first.second << ") failed: " + << " Error(" << status << ") "; + } + delete i->second; + } +} + +AfsFS AFSFileSystem::AFSFSCache::GetLocal() { + return NULL; +} + +AfsFS AFSFileSystem::AFSFSCache::GetConnection(const std::string& uri) { + return NULL; +} + +AfsFS AFSFileSystem::AFSFSCache::GetConnection(const std::string& uri, + const std::string& username, + const std::string& password) { + AFSFSMap::iterator i = fs_map_.find(std::make_pair(uri, username)); + if (i == fs_map_.end()) { + AfsFS conn = new dfs::AfsFileSystem(uri.c_str(), username.c_str(), + password.c_str(), NULL); + if (conn == NULL) { + return NULL; + } + + conn->SetConfigStr("hadoop.log.level", FLAGS_afs_log_level.c_str()); + if (conn->Connect() != 0) { + delete conn; + return NULL; + } + fs_map_.insert(std::make_pair(std::make_pair(uri, username), conn)); + return conn; + } else { + return i->second; + } +} + + +///////////////////////////////////////////////////////////////////////////// +// AFSFileSystem +// + +const char* AFSFileSystem::AFS = "afs"; + +AFSFileSystem::AFSFileSystem() { +} + +AFSFileSystem::~AFSFileSystem() { +} + +AfsFS AFSFileSystem::GetAFSFS(const std::string& file_path) { + std::map params; + return GetAFSFS(file_path, ¶ms); +} + +AfsFS AFSFileSystem::GetAFSFS(const std::string& file_path, + std::map* params) { + std::vector sections; + if (!UriUtils::Explode(file_path, '/', §ions)) { + return NULL; + } + + std::string schema = sections[0]; + + // handle local files + if (schema != "afs") { + return m_fs_cache.GetLocal(); + } + std::string cluster_name; + + if (!UriUtils::ParseParam(sections[1], &cluster_name, params)) { + return NULL; + } + + AfsFS fs; + std::string uri = schema + "://" + sections[1]; + // if there is username/password in params + if (params->count("username") == 1) { + std::map::iterator it; + it = params->find("username"); + std::string username = it->second; + it = params->find("password"); + std::string password = it->second; + if (it == params->end()) { + return NULL; + } + fs = m_fs_cache.GetConnection(uri, username, password); + } else { + fs = m_fs_cache.GetConnection(uri); + } + return fs; +} + +File* AFSFileSystem::Open(const std::string& file_path, const char* mode) { + std::map params; + AfsFS fs = GetAFSFS(file_path, ¶ms); + if (!fs) { + return NULL; + } + int flags = 0; + if (!ModeToFlags(mode, &flags)) { + return NULL; + } + + std::string shifted_path; + if (!UriUtils::Shift(file_path, &shifted_path, 2, '/')) { + return NULL; + } + + AfsReader* reader = NULL; + AfsWriter* writer = NULL; + if (flags == O_WRONLY) { + int32_t ret_code = fs->Create(shifted_path.c_str()); + if (ret_code != dfs::kExist && ret_code != dfs::kOk) { + LOG(ERROR) << "Create file failed, errno: " << ret_code; + return NULL; + } + writer = fs->OpenWriter(shifted_path.c_str()); + if (!writer) { + LOG(ERROR) << "Open file for write failed, errno: " + << dfs::GetErrno(); + return NULL; + } + } + + reader = fs->OpenReader(shifted_path.c_str()); + if (!reader) { + LOG(ERROR) << "Open file for read failed, errno: " + << dfs::GetErrno(); + return NULL; + } + return new AFSFile(reader, writer, fs, file_path, mode); +} + +bool AFSFileSystem::Exists(const std::string& file_path) { + AfsFS fs = GetAFSFS(file_path); + std::string shifted_path; + if (!UriUtils::Shift(file_path, &shifted_path, 2, '/')) { + return false; + } + if (!fs) { + return false; + } + return fs->Exist(shifted_path.c_str()) == 0; +} + +bool AFSFileSystem::Delete(const std::string& file_path) { + AfsFS fs = GetAFSFS(file_path); + std::string shifted_path; + if (!UriUtils::Shift(file_path, &shifted_path, 2, '/')) { + return false; + } + if (!fs) { + return false; + } + return fs->Delete(shifted_path.c_str()) == 0; +} + +bool AFSFileSystem::Rename(const std::string& from, const std::string& to) { + AfsFS from_fs = GetAFSFS(from); + AfsFS to_fs = GetAFSFS(to); + if (from_fs != to_fs) { + return false; + } + if (!from_fs) { + return false; + } + std::string shifted_from; + if (!UriUtils::Shift(from, &shifted_from, 2, '/')) { + return false; + } + std::string shifted_to; + if (!UriUtils::Shift(to, &shifted_to, 2, '/')) { + return false; + } + return from_fs->Rename(shifted_from.c_str(), shifted_to.c_str()) == 0; +} + +bool AFSFileSystem::GetTimes(const std::string& file_path, FileTimes* times) { + AfsFS fs = GetAFSFS(file_path); + if (!fs) { + return false; + } + std::string shifted_path; + if (!UriUtils::Shift(file_path, &shifted_path, 2, '/')) { + return false; + } + struct stat info; + int status = fs->Stat(shifted_path.c_str(), &info); + if (status != 0) { + return false; + } + times->access_time = info.st_atime; + times->modify_time = info.st_mtime; + return true; +} + +int64_t AFSFileSystem::GetSize(const std::string& file_path) { + AfsFS fs = GetAFSFS(file_path); + if (!fs) return -1; + + std::string shifted_path; + if (!UriUtils::Shift(file_path, &shifted_path, 2, '/')) { + return -1; + } + + struct stat info; + int status = fs->Stat(shifted_path.c_str(), &info); + if (status != 0) { + return -1; + } + int64_t file_size = info.st_size; + return file_size; +} + +FileIterator* AFSFileSystem::Iterate(const std::string& dir, + const std::string& pattern, + int include_types, + int exclude_types) { + AfsFS fs = GetAFSFS(dir); + if (!fs) return NULL; + std::string shifted_path; + if (!UriUtils::Shift(dir, &shifted_path, 2, '/')) { + return NULL; + } + return new AFSFileIterator(fs, shifted_path, pattern, + include_types, exclude_types); +} + +bool AFSFileSystem::Mkdir(const std::string& dir, int mode) { + AfsFS fs = GetAFSFS(dir); + if (!fs) { + return false; + } + std::string shifted_path; + if (!UriUtils::Shift(dir, &shifted_path, 2, '/')) { + return false; + } + return fs->Mkdir(shifted_path.c_str()) == 0; +} + +bool AFSFileSystem::Rmdir(const std::string& dir) { + AfsFS fs = GetAFSFS(dir); + if (!fs) { + return false; + } + std::string shifted_path; + if (!UriUtils::Shift(dir, &shifted_path, 2, '/')) { + return false; + } + return fs->Delete(shifted_path.c_str(), true) == 0; +} + +TOFT_REGISTER_FILE_SYSTEM("afs", AFSFileSystem); + +/////////////////////////////////////////////////////////////////////////////////////////// +// AFSFile Implementation +AFSFile::AFSFile(AfsReader* reader, AfsWriter* writer, AfsFS fs, + const std::string& file_path, const char* mode) + : File(file_path, mode), + m_reader(reader), + m_writer(writer), + m_fs(fs), + m_iseof(false), + m_closed(false) { +} + +AFSFile::~AFSFile() { + if (!m_closed) { + Close(); + m_closed = true; + } +} + +int64_t AFSFile::Read(void* buffer, int64_t size) { + if (!m_reader) { + LOG(ERROR) << "invalid operation for read on wirter handle"; + return -1; + } + int64_t read_size = m_reader->Read(buffer, size); + if (read_size != size) { + m_iseof = (Tell() == File::GetSize(file_path())); + } + return read_size; +} + +int64_t AFSFile::Write(const void* buffer, int64_t size) { + if (!m_writer) { + LOG(ERROR) << "invalid operation for write on reader handle"; + return -1; + } + return m_writer->Append(buffer, size); +} + +bool AFSFile::Flush() { + return m_writer->Flush() == 0; +} + +bool AFSFile::Close() { + m_closed = true; + int32_t ret = 0; + if (m_reader) { + ret |= m_fs->CloseReader(m_reader); + } + if (m_writer) { + ret |= m_fs->CloseWriter(m_writer); + } + return ret == 0; +} + +bool AFSFile::Seek(int64_t offset, int whence) { + if (whence != SEEK_SET) { + return false; + } + if (!m_reader) { + LOG(ERROR) << "invalid operation for seek on wirter handle"; + return -1; + } + m_reader->Seek(offset); + return true; +} + +int64_t AFSFile::Tell() { + if (m_writer) { + return m_writer->Tell(); + } + if (!m_reader) { + LOG(ERROR) << "invalid operation for seek on wirter handle"; + return -1; + } + return m_reader->Tell(); +} + +bool AFSFile::ReadLine(std::string* line, size_t max_size) { + return false; +} + +bool AFSFile::ReadLineWithLineEnding(std::string* line, size_t max_size) { + return false; +} + +bool AFSFile::IsEof() { + return m_iseof; +} + +} // namespace toft diff --git a/storage/file/afs_file.h b/storage/file/afs_file.h new file mode 100644 index 0000000..1b310bc --- /dev/null +++ b/storage/file/afs_file.h @@ -0,0 +1,129 @@ +// Copyright (C) 2017, Baidu Inc. +// Author: An Qin (anqin.qin@gmail.com) +// +// Description: A FileSystem implementation for AFS API + +#ifndef TOFT_STORAGE_FILE_AFS_FILE_H +#define TOFT_STORAGE_FILE_AFS_FILE_H + +#include + +#include +#include + +#include "thirdparty/dfs-adapter/afs.h" +#include "toft/storage/file/file.h" + +typedef dfs::AfsFileSystem* AfsFS; +typedef dfs::Reader AfsReader; +typedef dfs::Writer AfsWriter; + +namespace toft { + +// a FileSystem like implementation of AFS +// Note that AFSFileSystem instances need to out-live AFSFile created from it. +class AFSFileSystem : public FileSystem { +public: + AFSFileSystem(); + virtual ~AFSFileSystem(); + virtual File* Open(const std::string& file_path, const char* mode); + virtual bool Exists(const std::string& file_path); + virtual bool Delete(const std::string& file_path); + virtual bool Rename(const std::string& from, const std::string& to); + virtual bool GetTimes(const std::string& file_path, FileTimes* times); + virtual int64_t GetSize(const std::string& file_path); + virtual FileIterator* Iterate(const std::string& dir, + const std::string& pattern, + int include_types, + int exclude_types); + virtual bool Mkdir(const std::string& dir, int mode); + virtual bool Rmdir(const std::string& dir); + + static AFSFileSystem* GetRegisteredFileSystem() { + return static_cast(TOFT_GET_FILE_SYSTEM(AFS)); + } + + /** + * A util method to get a AfsFS obj from path. + * @param file_path path to file, valid for both AFS path like "/afs/szwg-ecomon:5313/user..." + * or local path like "/home/work/..." + * @return a AfsFS obj if success, NULL if fail. + */ + AfsFS GetAFSFS(const std::string& file_path); + +public: + static const char* AFS /* = "afs" */; + +protected: + // A (process-wide) cache of AfsFS objects. + // These connections are shared across all threads and kept + // open until the process terminates. + // (Calls to afsDisconnect() by individual threads would terminate all + // other connections handed out via afsConnect() to the same URI.) + class AFSFSCache { + public: + ~AFSFSCache(); + + AfsFS GetLocal(); + // Get connection to specific fs by specifying the name node's + // ipaddress or hostname and port. + AfsFS GetConnection(const std::string& uri); + + // Get connection to specific fs by specifying the name node's + // ipaddress or hostname, port, username and password. + AfsFS GetConnection(const std::string& uri, + const std::string& username, + const std::string& password); + + private: + typedef std::map, AfsFS> AFSFSMap; + AFSFSMap fs_map_; + }; + + /** + * A util method to get a AfsFS obj from path. + * @param file_path path to file, valid for both AFS path like "/afs/szwg-ecomon:5313/user..." + * or local path like "/home/work/..." + * @param params params parsed from second section of URI. + * @return a AfsFS obj if success, NULL if fail. + */ + AfsFS GetAFSFS(const std::string& file_path, + std::map* params); + + // a internal afs connection cache + AFSFSCache m_fs_cache; +}; + +// Represent a file object on local mounted file system +class AFSFile : public File { +public: + virtual ~AFSFile(); + + // Implement File interface. + // + virtual int64_t Read(void* buffer, int64_t size); + virtual int64_t Write(const void* buffer, int64_t size); + virtual bool Flush(); + virtual bool Close(); + virtual bool Seek(int64_t offset, int whence); + virtual int64_t Tell(); + virtual bool ReadLine(std::string* line, size_t max_size); + virtual bool ReadLineWithLineEnding(std::string* line, size_t max_size); + virtual bool IsEof(); +private: + friend class AFSFileSystem; + AFSFile(AfsReader* reader, AfsWriter* writer, AfsFS fs, + const std::string& file_path, + const char* mode); + + AfsReader* m_reader; + AfsWriter* m_writer; + AfsFS m_fs; + + bool m_iseof; + bool m_closed; +}; + +} // namespace toft + +#endif //TOFT_STORAGE_FILE_AFS_FILE_H diff --git a/storage/file/file.h b/storage/file/file.h index 8e1e83a..006fa7d 100644 --- a/storage/file/file.h +++ b/storage/file/file.h @@ -60,7 +60,7 @@ class File { protected: // You can't construct a File object, you must carete it by the static Open // method. - File(); + File(const std::string& file_path, const char* mode); public: virtual ~File(); @@ -92,6 +92,12 @@ class File { // Read next text line into *line, end of line will be stripped. // Read at most max_size if no eol found. virtual bool ReadLine(std::string* line, size_t max_size = 65536) = 0; + virtual bool ReadLineWithLineEnding(std::string* line, size_t max_size = 65536) = 0; + + virtual bool IsEof() = 0; + + std::string file_path() { return m_file_path; } + std::string mode() { return m_mode; } public: // The returned File* object is created by new and can be deleted. @@ -110,6 +116,9 @@ class File { // Get times attributes of path static bool GetTimes(const std::string& file_path, FileTimes* times); + // Get size attributes of path + static int64_t GetSize(const std::string& file_path); + // Read all bytes into *buffer, at most max_size if file too large. static bool ReadAll(const std::string& file_path, std::string* buffer, size_t max_size = 64*1024*1024); @@ -125,6 +134,16 @@ class File { int include_type = FileType_All, int exclude_type = FileType_None); + // Create new directory + static bool Mkdir(const std::string& path, int dir); + + // Remove directory + static bool Rmdir(const std::string& dir); + +protected: + std::string m_file_path; + std::string m_mode; + private: static FileSystem* GetFileSystemByPath(const std::string& file_path); }; @@ -142,6 +161,7 @@ class FileSystem { virtual bool Delete(const std::string& file_path) = 0; virtual bool Rename(const std::string& from, const std::string& to) = 0; virtual bool GetTimes(const std::string& file_path, FileTimes* times) = 0; + virtual int64_t GetSize(const std::string& file_path) = 0; virtual bool ReadAll(const std::string& file_path, std::string* buffer, size_t max_size); virtual bool ReadLines(const std::string& file_path, @@ -150,6 +170,9 @@ class FileSystem { const std::string& pattern, int include_types, int exclude_types) = 0; + + virtual bool Mkdir(const std::string& path, int mode) = 0; + virtual bool Rmdir(const std::string& path) = 0; }; // Defile the file_system class registry, user can register their own diff --git a/storage/file/hdfs_file.cpp b/storage/file/hdfs_file.cpp new file mode 100644 index 0000000..dbc6766 --- /dev/null +++ b/storage/file/hdfs_file.cpp @@ -0,0 +1,451 @@ +// Copyright 2013, For authors. +// Author: An Qin (anqin.qin@gmail.com) +// A FileSystem implementation for libhdfs. + +#include "toft/storage/file/hdfs_file.h" + +#include +#include +#include +#include + +#include "thirdparty/glog/logging.h" +#include "toft/base/string/algorithm.h" +#include "toft/base/unique_ptr.h" +#include "toft/storage/file/uri_utils.h" +#include "toft/storage/path/path.h" +#include "toft/text/wildcard.h" + +namespace toft { + +using namespace std; + +// change mode string to flags. Used in opening a hdfs file. +static bool ModeToFlags(const char* mode, int* flags) { + // only read and write are supported + if (!mode) { + return false; + } + if (strcmp(mode, "r") == 0) { + *flags = O_RDONLY; + return true; + } + if (strcmp(mode, "w") == 0) { + *flags = O_WRONLY; + return true; + } + return false; +} + + +namespace { + +class HDFSFileIterator : public FileIterator { +public: + explicit HDFSFileIterator(hdfsFS fs, const std::string& dir, + const std::string& pattern, + int include_types, int exclude_types) + : m_dir(dir), m_pattern(pattern), + m_include_types(include_types), m_exclude_types(exclude_types) { + m_infos = hdfsListDirectory(fs, dir.c_str(), &m_size); + m_current_pos = 0; + } + ~HDFSFileIterator() {} + + bool GetNext(FileEntry* entry) { + for (;; ++m_current_pos) { + if (m_current_pos >= m_size || m_size == 0) { + return false; + } + std::string full_file_name = m_infos[m_current_pos].mName; + + // NOTE: hdfsListDirectory will set full file name in mName, to keep the semantic + // consistency with local file iterator, we will get the base file name + std::string name = toft::Path::GetBaseName(full_file_name); + if (strcmp(name.c_str(), ".") == 0 || strcmp(name.c_str(), "..") == 0) { + continue; + } + + int type = GetType(m_current_pos); + if ((type & m_include_types) == 0) + continue; + if ((type & m_exclude_types) != 0) + continue; + + if (!Wildcard::Match(m_pattern, name)) + continue; + + entry->type = type; + entry->name = name; + m_current_pos++; + return true; + } + } + +private: + int GetType(int pos) const { + tObjectKind hdfs_type = m_infos[pos].mKind; + if (hdfs_type == kObjectKindFile) { + return FileType_Regular; + } else if (hdfs_type == kObjectKindDirectory) { + return FileType_Directory; + } + return FileType_None; + } + +private: + std::string m_dir; + std::string m_pattern; + int m_include_types; + int m_exclude_types; + + hdfsFileInfo* m_infos; + int m_current_pos; + int m_size; +}; + +} // namespace + + +//////////////////////////////////////////////////////////////// +// HDFS FS Cache Impl +HDFSFileSystem::HDFSFSCache::~HDFSFSCache() { + for (HDFSFSMap::iterator i = fs_map_.begin(); i != fs_map_.end(); ++i) { + int status = hdfsDisconnect(i->second); + if (status != 0) { + // TODO: add error details + LOG(ERROR) << "hdfsDisconnect(\"" << i->first.first << "\", " << i->first.second + << ") failed: " << " Error(" << errno << "): " << strerror(errno); + } + } +} + +hdfsFS HDFSFileSystem::HDFSFSCache::GetLocal() { + return hdfsConnect(NULL, 0); +} + +hdfsFS HDFSFileSystem::HDFSFSCache::GetConnection(const std::string& schema, + const std::string& host_port) { + vector split; + if (!UriUtils::Explode(host_port, ':', &split) || split.size() < 2) { + return NULL; + } + int port = atoi(split[1].c_str()); + string host = split[0]; + string fs_uri = schema + "://" + host_port; + string schema_host = schema + "://" + host; + + HDFSFSMap::iterator i = fs_map_.find(std::make_pair(fs_uri, "")); + if (i == fs_map_.end()) { + hdfsFS conn = hdfsConnect(schema_host.c_str(), port); + if (conn == NULL) { + return NULL; + } + fs_map_.insert(std::make_pair(std::make_pair(fs_uri, ""), conn)); + return conn; + } else { + return i->second; + } +} + +hdfsFS HDFSFileSystem::HDFSFSCache::GetConnection(const std::string& schema, + const std::string& host_port, + const std::string& username, + const std::string& password) { + vector split; + if (!UriUtils::Explode(host_port, ':', &split)) { + return NULL; + } + int port = atoi(split[1].c_str()); + string host = split[0]; + string fs_uri = schema + "://" + host_port; + string schema_host = schema + "://" + host; + + HDFSFSMap::iterator i = fs_map_.find(std::make_pair(fs_uri, username)); + if (i == fs_map_.end()) { + hdfsFS conn = hdfsConnectAsUser(schema_host.c_str(), port, + username.c_str(), password.c_str()); + if (conn == NULL) { + return NULL; + } + fs_map_.insert(std::make_pair(std::make_pair(fs_uri, username), conn)); + return conn; + } else { + return i->second; + } +} + + +///////////////////////////////////////////////////////////////////////////// +// HDFSFileSystem +// + +const char* HDFSFileSystem::HDFS = "hdfs"; + +HDFSFileSystem::HDFSFileSystem() { +} + +HDFSFileSystem::~HDFSFileSystem() { +} + +hdfsFS HDFSFileSystem::GetHDFSFS(const std::string& file_path) { + std::map params; + return GetHDFSFS(file_path, ¶ms); +} + + +hdfsFS HDFSFileSystem::GetHDFSFS(const std::string& file_path, + std::map* params) { + std::vector sections; + if (!UriUtils::Explode(file_path, '/', §ions)) { + return NULL; + } + + std::string schema = sections[0]; + + // handle local files + if (schema != "hdfs") { + return m_fs_cache.GetLocal(); + } + std::string cluster_name; + + if (!UriUtils::ParseParam(sections[1], &cluster_name, params)) { + return NULL; + } + hdfsFS fs; + // if there is username/password in params + if (params->count("username") == 1) { + std::map::iterator it; + it = params->find("username"); + std::string username = it->second; + it = params->find("password"); + std::string password = it->second; + if (it == params->end()) { + return NULL; + } + fs = m_fs_cache.GetConnection(schema, cluster_name, username, password); + } else { + fs = m_fs_cache.GetConnection(schema, cluster_name); + } + return fs; +} + +File* HDFSFileSystem::Open(const std::string& file_path, const char* mode) { + std::map params; + hdfsFS fs = GetHDFSFS(file_path, ¶ms); + if (!fs) return NULL; + int flags = 0; + if (!ModeToFlags(mode, &flags)) { + return NULL; + } + + int bufferSize = HDFS_FS_DEFAULT_BUFFER_SIZE; + int replication = HDFS_FS_DEFAULT_REPLICA; + int blocksize = HDFS_FS_DEFAULT_BLOCKSIZE; + if (params.count("bufferSize") == 1) { + bufferSize = atoi(params["bufferSize"].c_str()); + } + if (params.count("replication") == 1) { + replication = atoi(params["replication"].c_str()); + } + if (params.count("blocksize") == 1) { + blocksize = atoi(params["blocksize"].c_str()); + } + std::string shifted_path; + if (!UriUtils::Shift(file_path, &shifted_path, 2, '/')) { + return NULL; + } + hdfsFile file = hdfsOpenFile(fs, shifted_path.c_str(), flags, + bufferSize, replication, blocksize); + if (!file) { + return NULL; + } + return new HDFSFile(file, fs, file_path, mode); +} + +bool HDFSFileSystem::Exists(const std::string& file_path) { + hdfsFS fs = GetHDFSFS(file_path); + std::string shifted_path; + if (!UriUtils::Shift(file_path, &shifted_path, 2, '/')) { + return false; + } + if (!fs) return false; + return hdfsExists(fs, shifted_path.c_str()) == 0; +} + +bool HDFSFileSystem::Delete(const std::string& file_path) { + hdfsFS fs = GetHDFSFS(file_path); + std::string shifted_path; + if (!UriUtils::Shift(file_path, &shifted_path, 2, '/')) { + return false; + } + if (!fs) return false; + return hdfsDelete(fs, shifted_path.c_str()) == 0; +} + +bool HDFSFileSystem::Rename(const std::string& from, const std::string& to) { + hdfsFS from_fs = GetHDFSFS(from); + hdfsFS to_fs = GetHDFSFS(from); + if (from_fs != to_fs) { + return false; + } + if (!from_fs) return false; + std::string shifted_from; + if (!UriUtils::Shift(from, &shifted_from, 2, '/')) { + return false; + } + std::string shifted_to; + if (!UriUtils::Shift(to, &shifted_to, 2, '/')) { + return false; + } + return (hdfsRename(from_fs, shifted_from.c_str(), shifted_to.c_str()) == 0); +} + +bool HDFSFileSystem::GetTimes(const std::string& file_path, FileTimes* times) { + hdfsFS fs = GetHDFSFS(file_path); + if (!fs) return false; + std::string shifted_path; + if (!UriUtils::Shift(file_path, &shifted_path, 2, '/')) { + return false; + } + hdfsFileInfo* info = hdfsGetPathInfo(fs, shifted_path.c_str()); + if (!info) { + return false; + } + times->access_time = info->mLastAccess; + times->modify_time = info->mLastMod; + hdfsFreeFileInfo(info, 1); + return true; +} + +int64_t HDFSFileSystem::GetSize(const std::string& file_path) { + hdfsFS fs = GetHDFSFS(file_path); + if (!fs) return -1; + + std::string shifted_path; + if (!UriUtils::Shift(file_path, &shifted_path, 2, '/')) { + return -1; + } + + int64_t file_size = -1; + hdfsFileInfo* info = hdfsGetPathInfo(fs, shifted_path.c_str()); + if (info) { + file_size = info->mSize; + hdfsFreeFileInfo(info, 1); + } + return file_size; +} + +FileIterator* HDFSFileSystem::Iterate(const std::string& dir, + const std::string& pattern, + int include_types, + int exclude_types) { + hdfsFS fs = GetHDFSFS(dir); + if (!fs) return NULL; + std::string shifted_path; + if (!UriUtils::Shift(dir, &shifted_path, 2, '/')) { + return NULL; + } + return new HDFSFileIterator(fs, shifted_path, pattern, include_types, exclude_types); +} + +bool HDFSFileSystem::Mkdir(const std::string& dir, int mode) { + hdfsFS fs = GetHDFSFS(dir); + if (!fs) return false; + std::string shifted_path; + if (!UriUtils::Shift(dir, &shifted_path, 2, '/')) { + return false; + } + return hdfsCreateDirectory(fs, shifted_path.c_str()); +} + +bool HDFSFileSystem::Rmdir(const std::string& dir) { + hdfsFS fs = GetHDFSFS(dir); + if (!fs) return false; + std::string shifted_path; + if (!UriUtils::Shift(dir, &shifted_path, 2, '/')) { + return false; + } + return hdfsDelete(fs, shifted_path.c_str()); +} + +TOFT_REGISTER_FILE_SYSTEM("hdfs", HDFSFileSystem); + +/////////////////////////////////////////////////////////////////////////////////////////// +// HDFSFile Implementation +HDFSFile::HDFSFile(hdfsFile file, hdfsFS fs, const std::string& file_path, const char* mode) + : File(file_path, mode), + m_file(file), + m_fs(fs), + m_line_reader(NULL), + m_iseof(false), + m_closed(false) { +} + +HDFSFile::~HDFSFile() { + if (!m_closed) { + Close(); + m_closed = true; + } +} + +int64_t HDFSFile::Read(void* buffer, int64_t size) { + int64_t read_size = hdfsRead(m_fs, m_file, buffer, size); + if (read_size != size) { + m_iseof = (Tell() == File::GetSize(file_path())); + } + return read_size; +} +int64_t HDFSFile::Write(const void* buffer, int64_t size) { + return hdfsWrite(m_fs, m_file, buffer, size); +} +bool HDFSFile::Flush() { + return hdfsFlush(m_fs, m_file) == 0; +} + +bool HDFSFile::Close() { + m_closed = true; + if (m_line_reader) { + closeLineReader(m_line_reader); + } + return hdfsCloseFile(m_fs, m_file) == 0; +} + +bool HDFSFile::Seek(int64_t offset, int whence) { + if (whence != SEEK_SET) { + return false; + } + return hdfsSeek(m_fs, m_file, offset) == 0; +} +int64_t HDFSFile::Tell() { + return hdfsTell(m_fs, m_file); +} +bool HDFSFile::ReadLine(std::string* line, size_t max_size) { + if (!m_line_reader) { + m_line_reader = createLineReader(m_file); + } + if (!m_line_reader) { + return false; + } + + void* raw_line; + int n_bytes = readLineByLineReader(m_fs, m_line_reader, &raw_line); + if (-1 == n_bytes) m_iseof = true; + + if (!(n_bytes > 0) || !(static_cast(n_bytes) < max_size)){ + return false; + } + line->assign(reinterpret_cast(raw_line), n_bytes); + return true; +} + +bool HDFSFile::ReadLineWithLineEnding(std::string* line, size_t max_size) { + return false; +} + +bool HDFSFile::IsEof() { + return m_iseof; +} + + +} // namespace toft diff --git a/storage/file/hdfs_file.h b/storage/file/hdfs_file.h new file mode 100644 index 0000000..92edd71 --- /dev/null +++ b/storage/file/hdfs_file.h @@ -0,0 +1,132 @@ +// Copyright 2013, Baidu Inc. +// Author: An Qin (anqin.qin@gmail.com) +// A FileSystem implementation for libhdfs. + + +#ifndef TOFT_STORAGE_FILE_HDFS_FILE_H_ +#define TOFT_STORAGE_FILE_HDFS_FILE_H_ + +#include + +#include +#include + +#include "thirdparty/dfs-adapter/hdfs.h" +//#include "thirdparty/hybridfs/hybridfs.h" +#include "toft/storage/file/file.h" + +#define HDFS_FS_DEFAULT_BUFFER_SIZE 4 * 1024 * 1024 +#define HDFS_FS_DEFAULT_REPLICA 3 +#define HDFS_FS_DEFAULT_BLOCKSIZE 256 * 1024 * 1024 + +namespace toft { + +// a FileSystem like implementation of HDFS +// Note that HDFSFileSystem instances need to out-live HDFSFile created from it. +class HDFSFileSystem : public FileSystem { +public: + HDFSFileSystem(); + virtual ~HDFSFileSystem(); + virtual File* Open(const std::string& file_path, const char* mode); + virtual bool Exists(const std::string& file_path); + virtual bool Delete(const std::string& file_path); + virtual bool Rename(const std::string& from, const std::string& to); + virtual bool GetTimes(const std::string& file_path, FileTimes* times); + virtual int64_t GetSize(const std::string& file_path); + virtual FileIterator* Iterate(const std::string& dir, + const std::string& pattern, + int include_types, + int exclude_types); + virtual bool Mkdir(const std::string& dir, int mode); + virtual bool Rmdir(const std::string& dir); + + static HDFSFileSystem* GetRegisteredFileSystem() { + return static_cast(TOFT_GET_FILE_SYSTEM(HDFS)); + } + + /** + * A util method to get a hdfsFS obj from path. + * @param file_path path to file, valid for both HDFS path like "/hdfs/szwg-ecomon:5313/user..." + * or local path like "/home/work/..." + * @return a hdfsFS obj if success, NULL if fail. + */ + hdfsFS GetHDFSFS(const std::string& file_path); + +public: + static const char* HDFS /* = "hdfs" */; + +protected: + // A (process-wide) cache of hdfsFS objects. + // These connections are shared across all threads and kept + // open until the process terminates. + // (Calls to hdfsDisconnect() by individual threads would terminate all + // other connections handed out via hdfsConnect() to the same URI.) + class HDFSFSCache { + public: + ~HDFSFSCache(); + + hdfsFS GetLocal(); + // Get connection to specific fs by specifying the name node's + // ipaddress or hostname and port. + hdfsFS GetConnection(const std::string& schema, + const std::string& host_port); + + // Get connection to specific fs by specifying the name node's + // ipaddress or hostname, port, username and password. + hdfsFS GetConnection(const std::string& schema, + const std::string& host_port, + const std::string& username, + const std::string& password); + + private: + typedef std::map, hdfsFS> HDFSFSMap; + HDFSFSMap fs_map_; + }; + + /** + * A util method to get a hdfsFS obj from path. + * @param file_path path to file, valid for both HDFS path like "/hdfs/szwg-ecomon:5313/user..." + * or local path like "/home/work/..." + * @param params params parsed from second section of URI. + * @return a hdfsFS obj if success, NULL if fail. + */ + hdfsFS GetHDFSFS(const std::string& file_path, + std::map* params); + + // a internal hdfs connection cache + HDFSFSCache m_fs_cache; +}; + +// Represent a file object on local mounted file system +class HDFSFile : public File { +public: + virtual ~HDFSFile(); + + // Implement File interface. + // + virtual int64_t Read(void* buffer, int64_t size); + virtual int64_t Write(const void* buffer, int64_t size); + virtual bool Flush(); + virtual bool Close(); + virtual bool Seek(int64_t offset, int whence); + virtual int64_t Tell(); + virtual bool ReadLine(std::string* line, size_t max_size); + virtual bool ReadLineWithLineEnding(std::string* line, size_t max_size); + virtual bool IsEof(); +private: + friend class HDFSFileSystem; + HDFSFile(hdfsFile file, hdfsFS fs, const std::string& file_path, const char* mode); + + hdfsFile m_file; // internal hdfsFile obj + hdfsFS m_fs; // internal hdfsFS obj + LineReader m_line_reader; //internal hdfsLineReader obj + + bool m_iseof; + bool m_closed; +}; + +} + +#endif //TOFT_STORAGE_FILE_HDFS_FILE_H_ + +/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */ diff --git a/storage/file/uri_utils.cpp b/storage/file/uri_utils.cpp new file mode 100644 index 0000000..77bab28 --- /dev/null +++ b/storage/file/uri_utils.cpp @@ -0,0 +1,70 @@ +// Copyright 2013, For authors. +// Author: An Qin (anqin.qin@gmail.com) +// A Util class for processing URI. + +#include "uri_utils.h" + +#include +#include +#include +#include + +#include "toft/base/string/algorithm.h" + +namespace toft{ + +bool UriUtils::Explode(std::string const & uri, char delim, std::vector* exploded) { + std::istringstream iss(uri); + + for (std::string token; std::getline(iss, token, delim); ) { + if (token == "") { + continue; + } + exploded->push_back(token); + } + return true; +} + + +bool UriUtils::ParseParam(const std::string& section, std::string* cluster_name, + std::map* params) { + std::vector main_tmp; + if (!Explode(section, '?', &main_tmp)) { + return false; + } + *cluster_name = main_tmp[0]; + if (main_tmp.size() == 1) { + return true; + } + std::string remains = main_tmp[1]; + + std::vector pairs; + Explode(remains, ',', &pairs); + for (size_t i = 0; i < pairs.size(); i++) { + std::vector kv; + SplitString(pairs[i], "=", &kv); + if (kv.size() == 1) { + kv.push_back(""); + } + (*params)[kv[0]] = kv[1]; + } + return true; +} + +bool UriUtils::Shift(const std::string& path, std::string* shifted, int n, char delim) { + size_t last_found = 0; + int i = 0; + while (i < n) { + last_found = path.find(delim, last_found + 1); + if (last_found == std::string::npos) { + return false; + } + i++; + } + *shifted = path.substr(last_found); + return true; +} + +} + +/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */ diff --git a/storage/file/uri_utils.h b/storage/file/uri_utils.h new file mode 100644 index 0000000..bdf7c05 --- /dev/null +++ b/storage/file/uri_utils.h @@ -0,0 +1,58 @@ +// Copyright 2013, Baidu Inc. +// Author: An Qin (anqin.qin@gmail.com) +// A Util class for processing URI. + +#ifndef TOFT_STORAGE_FILE_URI_UTILS_H_ +#define TOFT_STORAGE_FILE_URI_UTILS_H_ + +#include +#include +#include + +namespace toft { + +class UriUtils { +public: + /** + * A normal explode for string. + * @param s string to explode + * @param delim delim char used in explode + * @return a vector of exploded string pieces + */ + static bool Explode(std::string const & s, char delim, std::vector* exploded); + /** + * Parse params in URI sections. URI could have params, the pattern is like: + * /hdfs/szwg-ecomon:2243?username=abcd,password=dfcg,blockSize=256000000/user/... + * <--main_str----> <--key-> <--key-> <--key--> <---v---> + * <-------------------------section------------------------------> + * delims is fixed: + * ? -- start of params part + * , -- kv-pair delim + * = -- delimiting key and value. + * @param section certain to parse from, passing a string not meeting desc above, + * result will be uncertain. + * @param main_str main string parsed from section + * @param params key-value pairs + * @return parse succeeded or failed + */ + static bool ParseParam(const std::string& section, std::string* cluster_name, + std::map* params); + /** + * Shift sections from a path. Like from path + * /hdfs/szwg-ecomon:23234/user/logging/... + * shifting 2 section will resulting in + * /user/logging/... + * @param path path to shift + * @param shifted returned shifted string + * @param n section count to shift + * @param delim delim between sections + * @return succeeded or failed + */ + static bool Shift(const std::string& path, std::string* shifted, int n, char delim); +}; + +} // namespace toft + +#endif //__URI_UTILS_H_ + +/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */ diff --git a/storage/file/uri_utils_test.cpp b/storage/file/uri_utils_test.cpp new file mode 100644 index 0000000..c32fa6d --- /dev/null +++ b/storage/file/uri_utils_test.cpp @@ -0,0 +1,70 @@ +// Copyright 2013, For authors. +// Author: An Qin (anqin.qin@gmail.com) +// A test for URI Utils +#include "uri_utils.h" + +#include +#include +#include + +#include "gtest/gtest.h" + +namespace toft { + +TEST(UriUtilsTest, ExplodeTest) { + std::vector result; + ASSERT_TRUE(UriUtils::Explode("/hdfs/szwg-ecomon?username=logging,password=abcd/user/logging", '/', &result)); + EXPECT_STREQ(result[0].c_str(), "hdfs"); + EXPECT_STREQ(result[1].c_str(), "szwg-ecomon?username=logging,password=abcd"); + EXPECT_STREQ(result[2].c_str(), "user"); + EXPECT_STREQ(result[3].c_str(), "logging"); + ASSERT_TRUE(UriUtils::Explode("hdfs/szwg-ecomon?username=logging,password=abcd", '/', &result)); + EXPECT_STREQ(result[0].c_str(), "hdfs"); + EXPECT_STREQ(result[1].c_str(), "szwg-ecomon?username=logging,password=abcd"); + +} + +TEST(UriUtilsTest, ParseParamTest) { + std::string main_str; + std::map params; + ASSERT_TRUE(UriUtils::ParseParam("szwg-ecomon?username=logging,password=abcd", + &main_str, ¶ms)); + EXPECT_STREQ(main_str.c_str(), "szwg-ecomon"); + EXPECT_STREQ(params["username"].c_str(), "logging"); + EXPECT_STREQ(params["password"].c_str(), "abcd"); + + // bad cases + ASSERT_TRUE(UriUtils::ParseParam("szwg-ecomon", + &main_str, ¶ms)); + EXPECT_STREQ(main_str.c_str(), "szwg-ecomon"); + ASSERT_TRUE(UriUtils::ParseParam("szwg-ecomon?username=logging,password", + &main_str, ¶ms)); + EXPECT_STREQ(main_str.c_str(), "szwg-ecomon"); + EXPECT_STREQ(params["username"].c_str(), "logging"); + ASSERT_TRUE(UriUtils::ParseParam("szwg-ecomon?username=logging,", + &main_str, ¶ms)); + EXPECT_STREQ(main_str.c_str(), "szwg-ecomon"); + EXPECT_STREQ(params["username"].c_str(), "logging"); + ASSERT_TRUE(UriUtils::ParseParam("szwg-ecomon?", + &main_str, ¶ms)); + EXPECT_STREQ(main_str.c_str(), "szwg-ecomon"); + + // empty username and password + ASSERT_TRUE(UriUtils::ParseParam("szwg-ecomon?username=,password=", + &main_str, ¶ms)); + EXPECT_STREQ(main_str.c_str(), "szwg-ecomon"); + EXPECT_STREQ(params["username"].c_str(), ""); + EXPECT_STREQ(params["password"].c_str(), ""); +} + +TEST(UriUtilsTest, ShiftTest) { + std::string result; + ASSERT_TRUE(UriUtils::Shift("/hdfs/szwg-ecomon/user", &result, 1, '/')); + EXPECT_STREQ(result.c_str(), "/szwg-ecomon/user"); + ASSERT_TRUE(UriUtils::Shift("/hdfs/szwg-ecomon/user", &result, 2, '/')); + EXPECT_STREQ(result.c_str(), "/user"); +} + +} + +/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */ diff --git a/storage/path/path.cpp b/storage/path/path.cpp index 7b1cc81..47e4cd6 100644 --- a/storage/path/path.cpp +++ b/storage/path/path.cpp @@ -6,8 +6,10 @@ #include "toft/storage/path/path.h" #include #include +#include #include "toft/base/array_size.h" #include "toft/base/string/algorithm.h" +#include "thirdparty/glog/logging.h" namespace toft { @@ -64,6 +66,52 @@ std::string Path::ToAbsolute(const std::string& filepath) return Normalize(Join(cwd, filepath)); } +std::string Path::GetCwd() { + char cwd_buf[4096]; + std::string cwd = getcwd(cwd_buf, sizeof(cwd_buf)); + return cwd; +} + +std::string Path::ToRelative(const std::string& base_directory, + const std::string& file_path) { + std::string absolute_base_directory = Path::ToAbsolute(base_directory); + std::string absolute_file_path = Path::ToAbsolute(file_path); + + std::vector absolute_base_directory_pathes; + std::vector file_path_pathes; + + SplitString(absolute_base_directory, "/", &absolute_base_directory_pathes); + SplitString(absolute_file_path, "/", &file_path_pathes); + + int directory_path_size = static_cast(absolute_base_directory_pathes.size()); + int file_path_size = static_cast(file_path_pathes.size()); + + int i = 0; + + for (; (i < directory_path_size) && (i < file_path_size); ++i) { + if (absolute_base_directory_pathes[i] != file_path_pathes[i]) { + break; + } + } + + int rest_size = directory_path_size - i; + + file_path_pathes.erase(file_path_pathes.begin(), file_path_pathes.begin() + i); + file_path_pathes.insert(file_path_pathes.begin(), rest_size, ".."); + + int size = file_path_pathes.size(); + + if (file_path_pathes.empty()) { + return ""; + } else { + std::string path = file_path_pathes[0]; + for (int i = 1; i < size; ++i) { + path = Join(path, file_path_pathes[i]); + } + return path; + } +} + bool Path::IsAbsolute(const std::string& filepath) { return !filepath.empty() && IsSeparator(filepath[0]); diff --git a/storage/path/path.h b/storage/path/path.h index 65f2ed2..2b678a6 100644 --- a/storage/path/path.h +++ b/storage/path/path.h @@ -54,6 +54,11 @@ class Path { const std::string& p3, const std::string& p4, const std::string& p5, const std::string& p6); + static std::string ToRelative(const std::string& base_directory, + const std::string& file_path); + + static std::string GetCwd(); + private: static std::string DoJoin(const std::string** paths, size_t size); }; diff --git a/system/threading/BUILD b/system/threading/BUILD index 56d4b01..2f47026 100644 --- a/system/threading/BUILD +++ b/system/threading/BUILD @@ -59,6 +59,20 @@ cc_library( ] ) +cc_library( + name = '_lock_free_thread_pool', + srcs = [ + 'lock_free_thread_pool.cpp' + ], + deps = [ + ':_sync_object', + ':_this_thread', + ':_thread', + '//toft/base:closure', + '//toft/system/atomic:atomic' + ] +) + cc_library( name = 'threading', deps = [ @@ -66,7 +80,8 @@ cc_library( ':_this_thread', ':_thread', ':_thread_group', - ':_thread_pool' + ':_thread_pool', + ':_lock_free_thread_pool' ] ) @@ -140,3 +155,18 @@ cc_test( ':threading', ] ) + +cc_test( + name = 'lock_free_queue_test', + srcs = 'lock_free_queue_test.cpp', + deps = [ + ] +) + +cc_test( + name = 'lock_free_thread_pool_test', + srcs = 'lock_free_thread_pool_test.cpp', + deps = [ + ':threading', + ] +) diff --git a/system/threading/lock_free_queue.h b/system/threading/lock_free_queue.h new file mode 100644 index 0000000..8fa4e63 --- /dev/null +++ b/system/threading/lock_free_queue.h @@ -0,0 +1,156 @@ +// Copyright (C) 2013, The Toft Authors. +// Author: An Qin +// +// Description: + +#ifndef TOFT_SYSTEM_THREADING_LOCK_FREE_QUEUE_H +#define TOFT_SYSTEM_THREADING_LOCK_FREE_QUEUE_H + +#include +#include + +namespace toft { + +template +struct _Pointer { +public: + union { + uint64_t ui; + struct { + T* ptr; + size_t count; + }; + }; + + _Pointer() : ptr(NULL), count(0) {} + _Pointer(T* p) : ptr(p), count(0) {} + _Pointer(T* p, size_t c) : ptr(p), count(c) {} + + bool cas(_Pointer const& nval, _Pointer const & cmp) { + bool result; + __asm__ __volatile__( + "lock cmpxchg8b %1\n\t" + "setz %0\n" + : "=q" (result), "+m" (ui) + : "a" (cmp.ptr), "d" (cmp.count), "b" (nval.ptr), "c" (nval.count) + : "cc" + ); + return result; + } + + bool operator==(_Pointer const&x) { return x.ui == ui; } +}; + +template +struct _Pointer { +public: + union { + uint64_t ui[2]; + struct { + T* ptr; + size_t count; + } __attribute__ (( __aligned__( 16 ) )); + }; + + _Pointer() : ptr(NULL), count(0) {} + _Pointer(T* p) : ptr(p), count(0) {} + _Pointer(T* p, size_t c) : ptr(p), count(c) {} + + bool cas(_Pointer const& nval, _Pointer const& cmp) { + bool result; + __asm__ __volatile__ ( + "lock cmpxchg16b %1\n\t" + "setz %0\n" + : "=q" (result), "+m" (ui) + : "a" (cmp.ptr), "d" (cmp.count), "b" (nval.ptr), "c" (nval.count) + : "cc" + ); + return result; + } + + bool operator==(_Pointer const&x) { + return x.ptr == ptr && x.count == count; + } +}; + + +/////////// lock-free queue /////////// + +template +class LockFreeQueue { +public: + struct Node; + typedef _Pointer Pointer; + + struct Node { + T value; + Pointer next; + + Node() : next(NULL) {} + Node(T x, Node* nxt) : value(x), next(nxt) {} + }; + + Pointer m_head, m_tail; + + LockFreeQueue() { + Node *node = new Node(); + m_head.ptr = m_tail.ptr = node; + } + + ~LockFreeQueue() { + Node *node = m_head.ptr; + m_head.ptr = m_tail.ptr = NULL; + delete node; + } + + void Enqueue(T x); + bool Dequeue(T& pvalue); +}; + +template +void LockFreeQueue::Enqueue(T x) { + Node *node = new Node(x, NULL); + Pointer tail, next; + do { + tail = m_tail; + next = tail.ptr->next; + if (tail == m_tail) { + if (next.ptr == NULL) { + if (tail.ptr->next.cas(Pointer(node, next.count + 1), next)) { + break; + } + } else { + m_tail.cas(Pointer(next.ptr, tail.count + 1), tail); + } + } + } while (true); + m_tail.cas(Pointer(node,tail.count + 1), tail); +} + +template +bool LockFreeQueue::Dequeue(T& pvalue) { + Pointer head, tail, next; + do { + head = m_head; + tail = m_tail; + next = head.ptr->next; + if (head == m_head) { + if (head.ptr == tail.ptr) { + if (next.ptr == NULL) return false; + m_tail.cas(Pointer(next.ptr, tail.count + 1), tail); + } else { + pvalue = next.ptr->value; + if (m_head.cas(Pointer(next.ptr, head.count + 1), head)) { + break; + } + } + } + } while (true); + + delete head.ptr; + return true; +} + +} // namespace toft + +#endif // TOFT_SYSTEM_THREADING_LOCK_FREE_QUEUE_H diff --git a/system/threading/lock_free_queue_test.cpp b/system/threading/lock_free_queue_test.cpp new file mode 100644 index 0000000..ad94e3d --- /dev/null +++ b/system/threading/lock_free_queue_test.cpp @@ -0,0 +1,29 @@ +// Copyright (C) 2013, The Toft Authors. +// Author: An Qin +// +// Description: + +#include "toft/system/threading/lock_free_queue.h" + +#include "thirdparty/gtest/gtest.h" + +namespace toft { + +TEST(LockFreeQueue, General) { + LockFreeQueue my_queue; + + for (int i = 0; i < 10; ++i) { + int* value = (int*) malloc(sizeof(int)); + *value = i; + my_queue.Enqueue(value); + } + int* value = NULL; + for (int i = 0; i < 10; ++i) { + my_queue.Dequeue(value); + EXPECT_EQ(*value, i); + delete value; + value = NULL; + } +} + +} // namespace toft diff --git a/system/threading/lock_free_thread_pool.cpp b/system/threading/lock_free_thread_pool.cpp new file mode 100644 index 0000000..cf7658f --- /dev/null +++ b/system/threading/lock_free_thread_pool.cpp @@ -0,0 +1,215 @@ +// Copyright (C) 2013, The Toft Authors. +// Author: An Qin +// +// Description: + +#include "toft/system/threading/lock_free_thread_pool.h" + +#include "thirdparty/glog/logging.h" + +#include "toft/system/threading/this_thread.h" + +namespace toft { + +LockFreeThreadPool::LockFreeThreadPool(int32_t min_thread_num, + int32_t max_thread_num, + int32_t idle_timeout_ms, + size_t stack_size) + : m_min_thread_num(min_thread_num), + m_max_thread_num(max_thread_num), + m_cur_thread_num(0), m_cur_busy_thread_num(0), + m_pending_task_num(0), + m_idle_timeout(idle_timeout_ms), + m_stack_size(stack_size), + m_exited(false) { + CHECK_GE(m_min_thread_num, 0); + if (m_max_thread_num < 0) { + m_max_thread_num = m_min_thread_num + 1; + } + + if (m_max_thread_num < m_min_thread_num) { + m_max_thread_num = m_min_thread_num; + } +} + +LockFreeThreadPool::~LockFreeThreadPool() { + Terminate(); +} + +void LockFreeThreadPool::Terminate(bool is_wait) { + if (!m_exited) { + if (is_wait) { + while (m_pending_task_num > 0) { + ThisThread::Sleep(1); + } + } + m_exited = true; + } + + while (m_cur_thread_num > 0) { + for (int32_t i = 0; i < m_cur_thread_num; ++i) { + m_event_new_task.Set(); + } + m_event_relase_all_task.TimedWait(m_idle_timeout); + } + + ReleaseAllCompleteTasks(); + ReleaseAllPendingTasks(); + + ThreadNode* thread_node = NULL; + while (m_idle_threads.Dequeue(thread_node)) { + CHECK(thread_node != NULL); + thread_node->Join(); + delete thread_node; + thread_node = NULL; + } +} + +void LockFreeThreadPool::AddTask(Closure* callback) { + AddTaskInternal(false, callback, NULL); +} + +void LockFreeThreadPool::AddTask(std::function callback) { + AddTaskInternal(false, NULL, callback); +} + +void LockFreeThreadPool::AddTaskInternal(bool is_priority, + Closure* callback, + std::function function) { + CHECK(!m_exited); + TaskNode* task_node = PickCompleteTask(true); + task_node->callback = callback; + task_node->function = function; + + AddPendingTask(task_node, is_priority); + if (NeedNewThread()) { + AddThreadNodeToList(); + } + m_event_new_task.Set(); +} + +bool LockFreeThreadPool::NeedNewThread() { + if (m_cur_thread_num >= m_max_thread_num) { + return false; + } + if (m_cur_thread_num < m_min_thread_num) { + return true; + } + + if (m_cur_busy_thread_num == m_cur_thread_num) { + return true; + } + + return false; +} + +void LockFreeThreadPool::Task::Run() const { + if (callback) { + callback->Run(); + } else if (function) { + function(); + } +} + +void LockFreeThreadPool::ThreadRunner() { + m_cur_busy_thread_num++; + + TaskNode* task_node = NULL; + while (!m_exited) { + task_node = PickPendingTask(); + if (task_node) { + task_node->Run(); + AddCompleteTask(task_node); + } else { + m_cur_busy_thread_num--; + if (!m_event_new_task.TimedWait(m_idle_timeout) + && m_cur_thread_num > m_min_thread_num) { + break; + } + m_cur_busy_thread_num++; + } + } +} + +void LockFreeThreadPool::ThreadRuntine(ThreadNode* thread_node) { + ThreadRunner(); + + m_idle_threads.Enqueue(thread_node); + + m_cur_busy_thread_num--; + m_cur_thread_num--; + + if (m_cur_thread_num == 0) { + m_event_relase_all_task.Set(); + } +} + +void LockFreeThreadPool::AddPendingTask(TaskNode* task_node, bool is_priority) { + m_pending_tasks.Enqueue(task_node); + m_pending_task_num++; +} + +void LockFreeThreadPool::AddCompleteTask(TaskNode* task_node) { + m_completed_tasks.Enqueue(task_node); +} + +void LockFreeThreadPool::ReleaseAllCompleteTasks() { + TaskNode* task_node = NULL; + while (m_completed_tasks.Dequeue(task_node)) { + CHECK(task_node != NULL); + delete task_node; + } +} + +void LockFreeThreadPool::ReleaseAllPendingTasks() { + TaskNode* task_node = NULL; + while (m_pending_tasks.Dequeue(task_node)) { + CHECK(task_node != NULL); + m_pending_task_num--; + delete task_node->callback; + delete task_node; + } +} + +LockFreeThreadPool::TaskNode* LockFreeThreadPool::PickPendingTask() { + TaskNode* task_node = NULL; + if (!m_pending_tasks.Dequeue(task_node)) { + task_node = NULL; + } else { + m_pending_task_num--; + } + + return task_node; +} + +LockFreeThreadPool::TaskNode* LockFreeThreadPool::PickCompleteTask(bool is_new) { + TaskNode* task_node = NULL; + if (!m_completed_tasks.Dequeue(task_node)) { + task_node = NULL; + } + + if (!task_node && is_new) { + task_node = new TaskNode; + } + + return task_node; +} + +void LockFreeThreadPool::AddThreadNodeToList() { + ThreadNode* thread_node = NULL; + if (m_idle_threads.Dequeue(thread_node)) { + thread_node->Join(); + thread_node->~ThreadNode(); + new (thread_node) ThreadNode(); + } else { + thread_node = new ThreadNode(); + } + + m_cur_thread_num++; + if (m_stack_size > 0) { + thread_node->SetStackSize(m_stack_size); + } + thread_node->Start(std::bind(&LockFreeThreadPool::ThreadRuntine, this, thread_node)); +} + +} // namespace toft diff --git a/system/threading/lock_free_thread_pool.h b/system/threading/lock_free_thread_pool.h new file mode 100644 index 0000000..49e615d --- /dev/null +++ b/system/threading/lock_free_thread_pool.h @@ -0,0 +1,87 @@ +// Copyright (C) 2013, The Toft Authors. +// Author: An Qin +// +// Description: + +#ifndef TOFT_SYSTEM_THREADING_LOCK_FREE_THREAD_POOL_H +#define TOFT_SYSTEM_THREADING_LOCK_FREE_THREAD_POOL_H + +#include + +#include "toft/base/closure.h" +#include "toft/base/intrusive_list.h" +#include "toft/system/atomic/atomic.h" +#include "toft/system/threading/event.h" +#include "toft/system/threading/lock_free_queue.h" +#include "toft/system/threading/thread.h" + +namespace toft { + +class LockFreeThreadPool { +public: + explicit LockFreeThreadPool(int32_t min_thread_num = 0, + int32_t max_thread_num = -1, + int32_t idle_timeout_ms = 60000, /* in ms */ + size_t stack_size = 0); + ~LockFreeThreadPool(); + + void Terminate(bool is_wait = true); + + void AddTask(Closure* callback); + void AddTask(std::function callback); + +private: + struct Task { + Closure* callback; + std::function function; + + void Run() const; + }; + + struct TaskNode : public Task { + list_node link; + }; + + struct ThreadNode : public Thread { + list_node link; + }; + + void AddTaskInternal(bool is_priority, + Closure* callback, + std::function function); + + bool NeedNewThread(); + void ThreadRunner(); + void ThreadRuntine(ThreadNode* thread_node); + + void AddPendingTask(TaskNode* task_node, bool is_priority = false); + void AddCompleteTask(TaskNode* task_node); + void ReleaseAllPendingTasks(); + void ReleaseAllCompleteTasks(); + + TaskNode* PickPendingTask(); + TaskNode* PickCompleteTask(bool is_new = false); + + void AddThreadNodeToList(); + +private: + AutoResetEvent m_event_new_task; + AutoResetEvent m_event_relase_all_task; + + LockFreeQueue m_pending_tasks; + LockFreeQueue m_completed_tasks; + LockFreeQueue m_idle_threads; + + int32_t m_min_thread_num; + int32_t m_max_thread_num; + Atomic m_cur_thread_num; + Atomic m_cur_busy_thread_num; + Atomic m_pending_task_num; + int32_t m_idle_timeout; + size_t m_stack_size; + Atomic m_exited; +}; + +} // namespace toft + +#endif // TOFT_SYSTEM_THREADING_LOCK_FREE_THREAD_POOL_H diff --git a/system/threading/lock_free_thread_pool_test.cpp b/system/threading/lock_free_thread_pool_test.cpp new file mode 100644 index 0000000..0370ff0 --- /dev/null +++ b/system/threading/lock_free_thread_pool_test.cpp @@ -0,0 +1,60 @@ +// Copyright (C) 2013, The Toft Authors. +// Author: An Qin +// +// Description: + +#include "toft/system/threading/lock_free_thread_pool.h" + +#include "thirdparty/glog/logging.h" +#include "thirdparty/gtest/gtest.h" +#include "toft/base/closure.h" +#include "toft/base/functional.h" + +namespace toft { + +class Foo { +public: + void test1() {} + + void test2(int32_t param1) {} +}; + +TEST(LockFreeThreadPool, Closure) { + LockFreeThreadPool threadpool(4, 4); + Foo foo; + for (int i = 0; i < 10; ++i) { + for (int j = 0; j < 20; ++j) { + threadpool.AddTask(NewClosure(&foo, &Foo::test1)); + threadpool.AddTask( + NewClosure( + &foo, &Foo::test2, static_cast(i*20+j))); + } + } +} + +TEST(LockFreeThreadPool, Function) { + LockFreeThreadPool threadpool(4, 4); + Foo foo; + for (int i = 0; i < 10; ++i) { + for (int j = 0; j < 20; ++j) { + threadpool.AddTask(NewClosure(&foo, &Foo::test1)); + threadpool.AddTask( + std::bind(&Foo::test2, &foo, static_cast(i*20+j))); + } + } +} + +TEST(LockFreeThreadPool, Performance) { + LockFreeThreadPool threadpool(1000, 1000); + Foo foo; + for (int i = 0; i < 100; ++i) { + for (int j = 0; j < 200; ++j) { + threadpool.AddTask(NewClosure(&foo, &Foo::test1)); + threadpool.AddTask( + NewClosure( + &foo, &Foo::test2, static_cast(i*20+j))); + } + } +} + +} // namespace toft